View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  
22  package org.apache.james.transport.mailets;
23  
24  import java.io.BufferedReader;
25  import java.io.StringReader;
26  import java.io.ByteArrayOutputStream;
27  
28  import java.sql.Connection;
29  import java.util.Enumeration;
30  
31  import javax.mail.internet.MimeMessage;
32  import javax.mail.Header;
33  import javax.mail.MessagingException;
34  
35  import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
36  import org.apache.avalon.excalibur.datasource.DataSourceComponent;
37  import org.apache.avalon.framework.service.ServiceManager;
38  import org.apache.james.Constants;
39  import org.apache.mailet.base.GenericMailet;
40  import org.apache.mailet.Mail;
41  
42  import org.apache.james.util.bayesian.JDBCBayesianAnalyzer;
43  import org.apache.james.util.sql.JDBCUtil;
44  
45  /**
46   * <P>Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet.</P>
47   * 
48   * <P>The new token frequencies will be stored in a JDBC database.</P>
49   * 
50   * <P>Sample configuration:</P>
51   * <PRE><CODE>
52   * &lt;processor name="root"&gt;
53   * 
54   *   &lt;mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder"&gt;
55   *     &lt;repositoryPath&gt; db://maildb &lt;/repositoryPath&gt;
56   *     &lt;feedType&gt;ham&lt;/feedType&gt;
57   *     &lt;!--
58   *       Set this to the maximum message size (in bytes) that a message may have
59   *       to be analyzed (default is 100000).
60   *     --&gt;
61   *     &lt;maxSize&gt;100000&lt;/maxSize&gt;
62   *   &lt;/mailet&gt;
63   * 
64   *   &lt;mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder"&gt;
65   *     &lt;repositoryPath&gt; db://maildb &lt;/repositoryPath&gt;
66   *     &lt;feedType&gt;spam&lt;/feedType&gt;
67   *     &lt;!--
68   *       Set this to the maximum message size (in bytes) that a message may have
69   *       to be analyzed (default is 100000).
70   *     --&gt;
71   *     &lt;maxSize&gt;100000&lt;/maxSize&gt;
72   *   &lt;/mailet&gt;
73   * 
74   * &lt;processor&gt;
75   * </CODE></PRE>
76   * 
77   * <P>The previous example will allow the user to send messages to the server
78   * and use the recipient email address as the indicator for whether the message
79   * is ham or spam.</P>
80   * 
81   * <P>Using the example above, send good messages (ham not spam) to the email
82   * address "not.spam@thisdomain.com" to pump good messages into the feeder,
83   * and send spam messages (spam not ham) to the email
84   * address "spam@thisdomain.com" to pump spam messages into the feeder.</P>
85   * 
86   * <p>The bayesian database tables will be updated during the training reflecting
87   * the new data</p>
88   * 
89   * <P>At the end the mail will be destroyed (ghosted).</P>
90   * 
91   * <P><B>The correct approach is to send the original ham/spam message as an attachment
92   * to another message sent to the feeder; all the headers of the enveloping message
93   * will be removed and only the original message's tokens will be analyzed.</B></P>
94   * 
95   * <p>After a training session, the frequency <i>Corpus</i> used by <CODE>BayesianAnalysis</CODE>
96   * must be rebuilt from the database, in order to take advantage of the new token frequencies.
97   * Every 10 minutes a special thread in the <CODE>BayesianAnalysis</CODE> mailet will check if any
98   * change was made to the database, and rebuild the corpus if necessary.</p>
99   * 
100  * <p>Only one message at a time is scanned (the database update activity is <I>synchronized</I>)
101  * in order to avoid too much database locking,
102  * as thousands of rows may be updated just for one message fed.</p>
103  * @see BayesianAnalysis
104  * @see org.apache.james.util.bayesian.BayesianAnalyzer
105  * @see org.apache.james.util.bayesian.JDBCBayesianAnalyzer
106  * @version CVS $Revision: 717869 $ $Date: 2008-11-15 15:56:18 +0000 (Sat, 15 Nov 2008) $
107  * @since 2.3.0
108  */
109 
110 public class BayesianAnalysisFeeder
111 extends GenericMailet {
112     /**
113      * The JDBCUtil helper class
114      */
115     private final JDBCUtil theJDBCUtil = new JDBCUtil() {
116         protected void delegatedLog(String logString) {
117             log("BayesianAnalysisFeeder: " + logString);
118         }
119     };
120     
121     /**
122      * The JDBCBayesianAnalyzer class that does all the work.
123      */
124     private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
125         protected void delegatedLog(String logString) {
126             log("BayesianAnalysisFeeder: " + logString);
127         }
128     };
129     
130     private DataSourceComponent datasource;
131     private String repositoryPath;
132     
133     private String feedType;
134     
135     /**
136      * Return a string describing this mailet.
137      *
138      * @return a string describing this mailet
139      */
140     public String getMailetInfo() {
141         return "BayesianAnalysisFeeder Mailet";
142     }
143     
144     /**
145      * Holds value of property maxSize.
146      */
147     private int maxSize = 100000;
148     
149     /**
150      * Getter for property maxSize.
151      * @return Value of property maxSize.
152      */
153     public int getMaxSize() {
154 
155         return this.maxSize;
156     }
157 
158     /**
159      * Setter for property maxSize.
160      * @param maxSize New value of property maxSize.
161      */
162     public void setMaxSize(int maxSize) {
163 
164         this.maxSize = maxSize;
165     }
166 
167     /**
168      * Mailet initialization routine.
169      * @throws MessagingException if a problem arises
170      */
171     public void init() throws MessagingException {
172         repositoryPath = getInitParameter("repositoryPath");
173         
174         if (repositoryPath == null) {
175             throw new MessagingException("repositoryPath is null");
176         }
177         
178         feedType = getInitParameter("feedType");
179         if (feedType == null) {
180             throw new MessagingException("feedType is null");
181         }
182         
183         String maxSizeParam = getInitParameter("maxSize");
184         if (maxSizeParam != null) {
185             setMaxSize(Integer.parseInt(maxSizeParam));
186         }
187         log("maxSize: " + getMaxSize());
188         
189         initDb();
190         
191     }
192     
193     private void initDb() throws MessagingException {
194         
195         try {
196             ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
197             
198             // Get the DataSourceSelector block
199             DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
200             
201             // Get the data-source required.
202             int stindex =   repositoryPath.indexOf("://") + 3;
203             
204             String datasourceName = repositoryPath.substring(stindex);
205             
206             datasource = (DataSourceComponent) datasources.select(datasourceName);
207         } catch (Exception e) {
208             throw new MessagingException("Can't get datasource", e);
209         }
210         
211         try {
212             analyzer.initSqlQueries(datasource.getConnection(), getMailetContext().getAttribute("confDir") + "/sqlResources.xml");
213         } catch (Exception e) {
214             throw new MessagingException("Exception initializing queries", e);
215         }        
216         
217     }
218     
219     /**
220      * Scans the mail and updates the token frequencies in the database.
221      *
222      * The method is synchronized in order to avoid too much database locking,
223      * as thousands of rows may be updated just for one message fed.
224      *
225      * @param mail The Mail message to be scanned.
226      */
227     public void service(Mail mail) {
228         boolean dbUpdated = false;
229         
230         mail.setState(Mail.GHOST);
231         
232         ByteArrayOutputStream baos = new ByteArrayOutputStream();
233         
234         Connection conn = null;
235         
236         try {
237             
238             MimeMessage message = mail.getMessage();
239             
240             String messageId = message.getMessageID();
241             
242             if (message.getSize() > getMaxSize()) {
243                 log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize());
244                 return;
245             }
246             
247             clearAllHeaders(message);
248             
249             message.writeTo(baos);
250             
251             BufferedReader br = new BufferedReader(new StringReader(baos.toString()));
252                 
253             // this is synchronized to avoid concurrent update of the corpus
254             synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
255                 
256                 conn = datasource.getConnection();
257                 
258                 if (conn.getAutoCommit()) {
259                     conn.setAutoCommit(false);
260                 }
261                 
262                 dbUpdated = true;
263                 
264                 //Clear out any existing word/counts etc..
265                 analyzer.clear();
266                 
267                 if ("ham".equalsIgnoreCase(feedType)) {
268                     log(messageId + " Feeding HAM");
269                     //Process the stream as ham (not spam).
270                     analyzer.addHam(br);
271                     
272                     //Update storage statistics.
273                     analyzer.updateHamTokens(conn);
274                 } else {
275                     log(messageId + " Feeding SPAM");
276                     //Process the stream as spam.
277                     analyzer.addSpam(br);
278                     
279                     //Update storage statistics.
280                     analyzer.updateSpamTokens(conn);
281                 }
282                 
283                 //Commit our changes if necessary.
284                 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
285                     conn.commit();
286                     dbUpdated = false;
287                     log(messageId + " Training ended successfully");
288                     JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
289                 }
290                 
291             }
292             
293         } catch (java.sql.SQLException se) {
294             log("SQLException: "
295                     + se.getMessage());
296         } catch (java.io.IOException ioe) {
297             log("IOException: "
298                     + ioe.getMessage());
299         } catch (javax.mail.MessagingException me) {
300             log("MessagingException: "
301                     + me.getMessage());
302         } finally {
303             //Rollback our changes if necessary.
304             try {
305                 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
306                     conn.rollback();
307                     dbUpdated = false;
308                 }
309             } catch (Exception e) {}
310             theJDBCUtil.closeJDBCConnection(conn);
311         }
312     }
313     
314     private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException {
315         Enumeration headers = message.getAllHeaders();
316         
317         while (headers.hasMoreElements()) {
318             Header header = (Header) headers.nextElement();
319             try {
320                 message.removeHeader(header.getName());
321             } catch (javax.mail.MessagingException me) {}
322         }
323         message.saveChanges();
324     }
325     
326 }