View Javadoc

1   /************************************************************************
2    * Copyright (c) 2000-2006 The Apache Software Foundation.             *
3    * All rights reserved.                                                *
4    * ------------------------------------------------------------------- *
5    * Licensed under the Apache License, Version 2.0 (the "License"); you *
6    * may not use this file except in compliance with the License. You    *
7    * may obtain a copy of the License at:                                *
8    *                                                                     *
9    *     http://www.apache.org/licenses/LICENSE-2.0                      *
10   *                                                                     *
11   * Unless required by applicable law or agreed to in writing, software *
12   * distributed under the License is distributed on an "AS IS" BASIS,   *
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or     *
14   * implied.  See the License for the specific language governing       *
15   * permissions and limitations under the License.                      *
16   ***********************************************************************/
17  
18  package org.apache.james.transport.mailets;
19  
20  import java.io.BufferedReader;
21  import java.io.StringReader;
22  import java.io.ByteArrayOutputStream;
23  
24  import java.sql.Connection;
25  import java.util.Enumeration;
26  
27  import javax.mail.internet.MimeMessage;
28  import javax.mail.Header;
29  import javax.mail.MessagingException;
30  
31  import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
32  import org.apache.avalon.excalibur.datasource.DataSourceComponent;
33  import org.apache.avalon.framework.service.ServiceManager;
34  import org.apache.james.Constants;
35  import org.apache.mailet.GenericMailet;
36  import org.apache.mailet.Mail;
37  import org.apache.james.util.JDBCUtil;
38  
39  import org.apache.james.util.JDBCBayesianAnalyzer;
40  
41  /***
42   * <P>Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet.</P>
43   * 
44   * <P>The new token frequencies will be stored in a JDBC database.</P>
45   * 
46   * <P>Sample configuration:</P>
47   * <PRE><CODE>
48   * &lt;processor name="root"&gt;
49   * 
50   *   &lt;mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder"&gt;
51   *     &lt;repositoryPath&gt; db://maildb &lt;/repositoryPath&gt;
52   *     &lt;feedType&gt;ham&lt;/feedType&gt;
53   *     &lt;!--
54   *       Set this to the maximum message size (in bytes) that a message may have
55   *       to be analyzed (default is 100000).
56   *     --&gt;
57   *     &lt;maxSize&gt;100000&lt;/maxSize&gt;
58   *   &lt;/mailet&gt;
59   * 
60   *   &lt;mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder"&gt;
61   *     &lt;repositoryPath&gt; db://maildb &lt;/repositoryPath&gt;
62   *     &lt;feedType&gt;spam&lt;/feedType&gt;
63   *     &lt;!--
64   *       Set this to the maximum message size (in bytes) that a message may have
65   *       to be analyzed (default is 100000).
66   *     --&gt;
67   *     &lt;maxSize&gt;100000&lt;/maxSize&gt;
68   *   &lt;/mailet&gt;
69   * 
70   * &lt;processor&gt;
71   * </CODE></PRE>
72   * 
73   * <P>The previous example will allow the user to send messages to the server
74   * and use the recipient email address as the indicator for whether the message
75   * is ham or spam.</P>
76   * 
77   * <P>Using the example above, send good messages (ham not spam) to the email
78   * address "not.spam@thisdomain.com" to pump good messages into the feeder,
79   * and send spam messages (spam not ham) to the email
80   * address "spam@thisdomain.com" to pump spam messages into the feeder.</P>
81   * 
82   * <p>The bayesian database tables will be updated during the training reflecting
83   * the new data</p>
84   * 
85   * <P>At the end the mail will be destroyed (ghosted).</P>
86   * 
87   * <P><B>The correct approach is to send the original ham/spam message as an attachment
88   * to another message sent to the feeder; all the headers of the enveloping message
89   * will be removed and only the original message's tokens will be analyzed.</B></P>
90   * 
91   * <p>After a training session, the frequency <i>Corpus</i> used by <CODE>BayesianAnalysis</CODE>
92   * must be rebuilt from the database, in order to take advantage of the new token frequencies.
93   * Every 10 minutes a special thread in the <CODE>BayesianAnalysis</CODE> mailet will check if any
94   * change was made to the database, and rebuild the corpus if necessary.</p>
95   * 
96   * <p>Only one message at a time is scanned (the database update activity is <I>synchronized</I>)
97   * in order to avoid too much database locking,
98   * as thousands of rows may be updated just for one message fed.</p>
99   * @see BayesianAnalysis
100  * @see org.apache.james.util.BayesianAnalyzer
101  * @see org.apache.james.util.JDBCBayesianAnalyzer
102  * @version CVS $Revision: $ $Date: $
103  * @since 2.3.0
104  */
105 
106 public class BayesianAnalysisFeeder
107 extends GenericMailet {
108     /***
109      * The JDBCUtil helper class
110      */
111     private final JDBCUtil theJDBCUtil = new JDBCUtil() {
112         protected void delegatedLog(String logString) {
113             log("BayesianAnalysisFeeder: " + logString);
114         }
115     };
116     
117     /***
118      * The JDBCBayesianAnalyzer class that does all the work.
119      */
120     private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
121         protected void delegatedLog(String logString) {
122             log("BayesianAnalysisFeeder: " + logString);
123         }
124     };
125     
126     private DataSourceComponent datasource;
127     private String repositoryPath;
128     
129     private String feedType;
130     
131     /***
132      * Return a string describing this mailet.
133      *
134      * @return a string describing this mailet
135      */
136     public String getMailetInfo() {
137         return "BayesianAnalysisFeeder Mailet";
138     }
139     
140     /***
141      * Holds value of property maxSize.
142      */
143     private int maxSize = 100000;
144     
145     /***
146      * Getter for property maxSize.
147      * @return Value of property maxSize.
148      */
149     public int getMaxSize() {
150 
151         return this.maxSize;
152     }
153 
154     /***
155      * Setter for property maxSize.
156      * @param maxSize New value of property maxSize.
157      */
158     public void setMaxSize(int maxSize) {
159 
160         this.maxSize = maxSize;
161     }
162 
163     /***
164      * Mailet initialization routine.
165      * @throws MessagingException if a problem arises
166      */
167     public void init() throws MessagingException {
168         repositoryPath = getInitParameter("repositoryPath");
169         
170         if (repositoryPath == null) {
171             throw new MessagingException("repositoryPath is null");
172         }
173         
174         feedType = getInitParameter("feedType");
175         if (feedType == null) {
176             throw new MessagingException("feedType is null");
177         }
178         
179         String maxSizeParam = getInitParameter("maxSize");
180         if (maxSizeParam != null) {
181             setMaxSize(Integer.parseInt(maxSizeParam));
182         }
183         log("maxSize: " + getMaxSize());
184         
185         initDb();
186         
187     }
188     
189     private void initDb() throws MessagingException {
190         
191         try {
192             ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
193             
194             // Get the DataSourceSelector block
195             DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
196             
197             // Get the data-source required.
198             int stindex =   repositoryPath.indexOf("://") + 3;
199             
200             String datasourceName = repositoryPath.substring(stindex);
201             
202             datasource = (DataSourceComponent) datasources.select(datasourceName);
203         } catch (Exception e) {
204             throw new MessagingException("Can't get datasource", e);
205         }
206         
207         try {
208             analyzer.initSqlQueries(datasource.getConnection(), getMailetContext());
209         } catch (Exception e) {
210             throw new MessagingException("Exception initializing queries", e);
211         }        
212         
213     }
214     
215     /***
216      * Scans the mail and updates the token frequencies in the database.
217      *
218      * The method is synchronized in order to avoid too much database locking,
219      * as thousands of rows may be updated just for one message fed.
220      *
221      * @param mail The Mail message to be scanned.
222      */
223     public void service(Mail mail) {
224         boolean dbUpdated = false;
225         
226         mail.setState(Mail.GHOST);
227         
228         ByteArrayOutputStream baos = new ByteArrayOutputStream();
229         
230         Connection conn = null;
231         
232         try {
233             
234             MimeMessage message = mail.getMessage();
235             
236             String messageId = message.getMessageID();
237             
238             if (message.getSize() > getMaxSize()) {
239                 log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize());
240                 return;
241             }
242             
243             clearAllHeaders(message);
244             
245             message.writeTo(baos);
246             
247             BufferedReader br = new BufferedReader(new StringReader(baos.toString()));
248                 
249             // this is synchronized to avoid concurrent update of the corpus
250             synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
251                 
252                 conn = datasource.getConnection();
253                 
254                 if (conn.getAutoCommit()) {
255                     conn.setAutoCommit(false);
256                 }
257                 
258                 dbUpdated = true;
259                 
260                 //Clear out any existing word/counts etc..
261                 analyzer.clear();
262                 
263                 if ("ham".equalsIgnoreCase(feedType)) {
264                     log(messageId + " Feeding HAM");
265                     //Process the stream as ham (not spam).
266                     analyzer.addHam(br);
267                     
268                     //Update storage statistics.
269                     analyzer.updateHamTokens(conn);
270                 } else {
271                     log(messageId + " Feeding SPAM");
272                     //Process the stream as spam.
273                     analyzer.addSpam(br);
274                     
275                     //Update storage statistics.
276                     analyzer.updateSpamTokens(conn);
277                 }
278                 
279                 //Commit our changes if necessary.
280                 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
281                     conn.commit();
282                     dbUpdated = false;
283                     log(messageId + " Training ended successfully");
284                     JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
285                 }
286                 
287             }
288             
289         } catch (java.sql.SQLException se) {
290             log("SQLException: "
291                     + se.getMessage());
292         } catch (java.io.IOException ioe) {
293             log("IOException: "
294                     + ioe.getMessage());
295         } catch (javax.mail.MessagingException me) {
296             log("MessagingException: "
297                     + me.getMessage());
298         } finally {
299             //Rollback our changes if necessary.
300             try {
301                 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
302                     conn.rollback();
303                     dbUpdated = false;
304                 }
305             } catch (Exception e) {}
306             theJDBCUtil.closeJDBCConnection(conn);
307         }
308     }
309     
310     private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException {
311         Enumeration headers = message.getAllHeaders();
312         
313         while (headers.hasMoreElements()) {
314             Header header = (Header) headers.nextElement();
315             try {
316                 message.removeHeader(header.getName());
317             } catch (javax.mail.MessagingException me) {}
318         }
319         message.saveChanges();
320     }
321     
322 }