View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.ai.classic;
21  
22  import java.io.BufferedReader;
23  import java.io.ByteArrayOutputStream;
24  import java.io.StringReader;
25  import java.sql.Connection;
26  import java.util.Enumeration;
27  
28  import javax.annotation.Resource;
29  import javax.mail.Header;
30  import javax.mail.MessagingException;
31  import javax.mail.internet.MimeMessage;
32  import javax.sql.DataSource;
33  
34  import org.apache.mailet.Mail;
35  import org.apache.mailet.base.GenericMailet;
36  
37  /**
38   * <p>
39   * Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet.
40   * </p>
41   * 
42   * <p>
43   * The new token frequencies will be stored in a JDBC database.
44   * </p>
45   * 
46   * <p>
47   * Sample configuration:
48   * </p>
49   * 
50   * <pre>
51   * <code>
52   * &lt;processor name="root"&gt;
53   * 
54   *   &lt;mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder"&gt;
55   *     &lt;repositoryPath&gt; db://maildb &lt;/repositoryPath&gt;
56   *     &lt;feedType&gt;ham&lt;/feedType&gt;
57   *     &lt;!--
58   *       Set this to the maximum message size (in bytes) that a message may have
59   *       to be analyzed (default is 100000).
60   *     --&gt;
61   *     &lt;maxSize&gt;100000&lt;/maxSize&gt;
62   *   &lt;/mailet&gt;
63   * 
64   *   &lt;mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder"&gt;
65   *     &lt;repositoryPath&gt; db://maildb &lt;/repositoryPath&gt;
66   *     &lt;feedType&gt;spam&lt;/feedType&gt;
67   *     &lt;!--
68   *       Set this to the maximum message size (in bytes) that a message may have
69   *       to be analyzed (default is 100000).
70   *     --&gt;
71   *     &lt;maxSize&gt;100000&lt;/maxSize&gt;
72   *   &lt;/mailet&gt;
73   * 
74   * &lt;processor&gt;
75   * </code>
76   * </pre>
77   * 
78   * <p>
79   * The previous example will allow the user to send messages to the server and
80   * use the recipient email address as the indicator for whether the message is
81   * ham or spam.
82   * </p>
83   * 
84   * <p>
85   * Using the example above, send good messages (ham not spam) to the email
86   * address "not.spam@thisdomain.com" to pump good messages into the feeder, and
87   * send spam messages (spam not ham) to the email address "spam@thisdomain.com"
88   * to pump spam messages into the feeder.
89   * </p>
90   * 
91   * <p>
92   * The bayesian database tables will be updated during the training reflecting
93   * the new data
94   * </p>
95   * 
96   * <p>
97   * At the end the mail will be destroyed (ghosted).
98   * </p>
99   * 
100  * <p>
101  * <b>The correct approach is to send the original ham/spam message as an
102  * attachment to another message sent to the feeder; all the headers of the
103  * enveloping message will be removed and only the original message's tokens
104  * will be analyzed.</b>
105  * </p>
106  * 
107  * <p>
108  * After a training session, the frequency <i>Corpus</i> used by
109  * <code>BayesianAnalysis</code> must be rebuilt from the database, in order to
110  * take advantage of the new token frequencies. Every 10 minutes a special
111  * thread in the <code>BayesianAnalysis</code> mailet will check if any change
112  * was made to the database, and rebuild the corpus if necessary.
113  * </p>
114  * 
115  * <p>
116  * Only one message at a time is scanned (the database update activity is
117  * <i>synchronized</i>) in order to avoid too much database locking, as
118  * thousands of rows may be updated just for one message fed.
119  * </p>
120  * 
121  * @see BayesianAnalysis
122  * @see BayesianAnalyzer
123  * @see JDBCBayesianAnalyzer
124  * @since 2.3.0
125  */
126 
127 public class BayesianAnalysisFeeder extends GenericMailet {
128     /**
129      * The JDBCUtil helper class
130      */
131     private final JDBCUtil theJDBCUtil = new JDBCUtil() {
132         protected void delegatedLog(String logString) {
133             log("BayesianAnalysisFeeder: " + logString);
134         }
135     };
136 
137     /**
138      * The JDBCBayesianAnalyzer class that does all the work.
139      */
140     private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
141         protected void delegatedLog(String logString) {
142             log("BayesianAnalysisFeeder: " + logString);
143         }
144     };
145 
146     private DataSource datasource;
147     private String repositoryPath;
148 
149     private String feedType;
150 
151     /**
152      * Return a string describing this mailet.
153      * 
154      * @return a string describing this mailet
155      */
156     public String getMailetInfo() {
157         return "BayesianAnalysisFeeder Mailet";
158     }
159 
160     /**
161      * Holds value of property maxSize.
162      */
163     private int maxSize = 100000;
164 
165     private SystemContext fs;
166 
167     /**
168      * Getter for property maxSize.
169      * 
170      * @return Value of property maxSize.
171      */
172     public int getMaxSize() {
173 
174         return this.maxSize;
175     }
176 
177     @Resource(name = "datasource")
178     public void setDataSource(DataSource datasource) {
179         this.datasource = datasource;
180     }
181 
182     /**
183      * Setter for property maxSize.
184      * 
185      * @param maxSize
186      *            New value of property maxSize.
187      */
188     public void setMaxSize(int maxSize) {
189 
190         this.maxSize = maxSize;
191     }
192 
193     @Resource(name = "filesystem")
194     public void setFileSystem(SystemContext fs) {
195         this.fs = fs;
196     }
197 
198     /**
199      * Mailet initialization routine.
200      * 
201      * @throws MessagingException
202      *             if a problem arises
203      */
204     public void init() throws MessagingException {
205         repositoryPath = getInitParameter("repositoryPath");
206 
207         if (repositoryPath == null) {
208             throw new MessagingException("repositoryPath is null");
209         }
210 
211         feedType = getInitParameter("feedType");
212         if (feedType == null) {
213             throw new MessagingException("feedType is null");
214         }
215 
216         String maxSizeParam = getInitParameter("maxSize");
217         if (maxSizeParam != null) {
218             setMaxSize(Integer.parseInt(maxSizeParam));
219         }
220         log("maxSize: " + getMaxSize());
221 
222         initDb();
223 
224     }
225 
226     private void initDb() throws MessagingException {
227 
228         try {
229             analyzer.initSqlQueries(datasource.getConnection(), fs.readXml("sqlResources.xml"));
230         } catch (Exception e) {
231             throw new MessagingException("Exception initializing queries", e);
232         }
233 
234     }
235 
236     /**
237      * Scans the mail and updates the token frequencies in the database.
238      * 
239      * The method is synchronized in order to avoid too much database locking,
240      * as thousands of rows may be updated just for one message fed.
241      * 
242      * @param mail
243      *            The Mail message to be scanned.
244      */
245     public void service(Mail mail) {
246         boolean dbUpdated = false;
247 
248         mail.setState(Mail.GHOST);
249 
250         ByteArrayOutputStream baos = new ByteArrayOutputStream();
251 
252         Connection conn = null;
253 
254         try {
255 
256             MimeMessage message = mail.getMessage();
257 
258             String messageId = message.getMessageID();
259 
260             if (message.getSize() > getMaxSize()) {
261                 log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize());
262                 return;
263             }
264 
265             clearAllHeaders(message);
266 
267             message.writeTo(baos);
268 
269             BufferedReader br = new BufferedReader(new StringReader(baos.toString()));
270 
271             // this is synchronized to avoid concurrent update of the corpus
272             synchronized (JDBCBayesianAnalyzer.DATABASE_LOCK) {
273 
274                 conn = datasource.getConnection();
275 
276                 if (conn.getAutoCommit()) {
277                     conn.setAutoCommit(false);
278                 }
279 
280                 dbUpdated = true;
281 
282                 // Clear out any existing word/counts etc..
283                 analyzer.clear();
284 
285                 if ("ham".equalsIgnoreCase(feedType)) {
286                     log(messageId + " Feeding HAM");
287                     // Process the stream as ham (not spam).
288                     analyzer.addHam(br);
289 
290                     // Update storage statistics.
291                     analyzer.updateHamTokens(conn);
292                 } else {
293                     log(messageId + " Feeding SPAM");
294                     // Process the stream as spam.
295                     analyzer.addSpam(br);
296 
297                     // Update storage statistics.
298                     analyzer.updateSpamTokens(conn);
299                 }
300 
301                 // Commit our changes if necessary.
302                 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
303                     conn.commit();
304                     dbUpdated = false;
305                     log(messageId + " Training ended successfully");
306                     JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
307                 }
308 
309             }
310 
311         } catch (java.sql.SQLException se) {
312             log("SQLException: " + se.getMessage());
313         } catch (java.io.IOException ioe) {
314             log("IOException: " + ioe.getMessage());
315         } catch (javax.mail.MessagingException me) {
316             log("MessagingException: " + me.getMessage());
317         } finally {
318             // Rollback our changes if necessary.
319             try {
320                 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
321                     conn.rollback();
322                     dbUpdated = false;
323                 }
324             } catch (Exception e) {
325             }
326             theJDBCUtil.closeJDBCConnection(conn);
327         }
328     }
329 
330     private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException {
331         @SuppressWarnings("rawtypes")
332         Enumeration headers = message.getAllHeaders();
333 
334         while (headers.hasMoreElements()) {
335             Header header = (Header) headers.nextElement();
336             try {
337                 message.removeHeader(header.getName());
338             } catch (javax.mail.MessagingException me) {
339             }
340         }
341         message.saveChanges();
342     }
343 
344 }