View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.transport.mailets;
21  
22  import java.io.BufferedReader;
23  import java.io.StringReader;
24  import java.io.ByteArrayOutputStream;
25  
26  import java.sql.Connection;
27  import java.util.Enumeration;
28  
29  import javax.mail.internet.MimeMessage;
30  import javax.mail.Header;
31  import javax.mail.MessagingException;
32  
33  import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
34  import org.apache.avalon.excalibur.datasource.DataSourceComponent;
35  import org.apache.avalon.framework.service.ServiceManager;
36  import org.apache.james.Constants;
37  import org.apache.mailet.GenericMailet;
38  import org.apache.mailet.Mail;
39  import org.apache.james.util.JDBCUtil;
40  
41  import org.apache.james.util.JDBCBayesianAnalyzer;
42  
43  /***
44   * <P>Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet.</P>
45   * 
46   * <P>The new token frequencies will be stored in a JDBC database.</P>
47   * 
48   * <P>Sample configuration:</P>
49   * <PRE><CODE>
50   * &lt;processor name="root"&gt;
51   * 
52   *   &lt;mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder"&gt;
53   *     &lt;repositoryPath&gt; db://maildb &lt;/repositoryPath&gt;
54   *     &lt;feedType&gt;ham&lt;/feedType&gt;
55   *     &lt;!--
56   *       Set this to the maximum message size (in bytes) that a message may have
57   *       to be analyzed (default is 100000).
58   *     --&gt;
59   *     &lt;maxSize&gt;100000&lt;/maxSize&gt;
60   *   &lt;/mailet&gt;
61   * 
62   *   &lt;mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder"&gt;
63   *     &lt;repositoryPath&gt; db://maildb &lt;/repositoryPath&gt;
64   *     &lt;feedType&gt;spam&lt;/feedType&gt;
65   *     &lt;!--
66   *       Set this to the maximum message size (in bytes) that a message may have
67   *       to be analyzed (default is 100000).
68   *     --&gt;
69   *     &lt;maxSize&gt;100000&lt;/maxSize&gt;
70   *   &lt;/mailet&gt;
71   * 
72   * &lt;processor&gt;
73   * </CODE></PRE>
74   * 
75   * <P>The previous example will allow the user to send messages to the server
76   * and use the recipient email address as the indicator for whether the message
77   * is ham or spam.</P>
78   * 
79   * <P>Using the example above, send good messages (ham not spam) to the email
80   * address "not.spam@thisdomain.com" to pump good messages into the feeder,
81   * and send spam messages (spam not ham) to the email
82   * address "spam@thisdomain.com" to pump spam messages into the feeder.</P>
83   * 
84   * <p>The bayesian database tables will be updated during the training reflecting
85   * the new data</p>
86   * 
87   * <P>At the end the mail will be destroyed (ghosted).</P>
88   * 
89   * <P><B>The correct approach is to send the original ham/spam message as an attachment
90   * to another message sent to the feeder; all the headers of the enveloping message
91   * will be removed and only the original message's tokens will be analyzed.</B></P>
92   * 
93   * <p>After a training session, the frequency <i>Corpus</i> used by <CODE>BayesianAnalysis</CODE>
94   * must be rebuilt from the database, in order to take advantage of the new token frequencies.
95   * Every 10 minutes a special thread in the <CODE>BayesianAnalysis</CODE> mailet will check if any
96   * change was made to the database, and rebuild the corpus if necessary.</p>
97   * 
98   * <p>Only one message at a time is scanned (the database update activity is <I>synchronized</I>)
99   * in order to avoid too much database locking,
100  * as thousands of rows may be updated just for one message fed.</p>
101  * @see BayesianAnalysis
102  * @see org.apache.james.util.BayesianAnalyzer
103  * @see org.apache.james.util.JDBCBayesianAnalyzer
104  * @version CVS $Revision: $ $Date: $
105  * @since 2.3.0
106  */
107 
108 public class BayesianAnalysisFeeder
109 extends GenericMailet {
110     /***
111      * The JDBCUtil helper class
112      */
113     private final JDBCUtil theJDBCUtil = new JDBCUtil() {
114         protected void delegatedLog(String logString) {
115             log("BayesianAnalysisFeeder: " + logString);
116         }
117     };
118     
119     /***
120      * The JDBCBayesianAnalyzer class that does all the work.
121      */
122     private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
123         protected void delegatedLog(String logString) {
124             log("BayesianAnalysisFeeder: " + logString);
125         }
126     };
127     
128     private DataSourceComponent datasource;
129     private String repositoryPath;
130     
131     private String feedType;
132     
133     /***
134      * Return a string describing this mailet.
135      *
136      * @return a string describing this mailet
137      */
138     public String getMailetInfo() {
139         return "BayesianAnalysisFeeder Mailet";
140     }
141     
142     /***
143      * Holds value of property maxSize.
144      */
145     private int maxSize = 100000;
146     
147     /***
148      * Getter for property maxSize.
149      * @return Value of property maxSize.
150      */
151     public int getMaxSize() {
152 
153         return this.maxSize;
154     }
155 
156     /***
157      * Setter for property maxSize.
158      * @param maxSize New value of property maxSize.
159      */
160     public void setMaxSize(int maxSize) {
161 
162         this.maxSize = maxSize;
163     }
164 
165     /***
166      * Mailet initialization routine.
167      * @throws MessagingException if a problem arises
168      */
169     public void init() throws MessagingException {
170         repositoryPath = getInitParameter("repositoryPath");
171         
172         if (repositoryPath == null) {
173             throw new MessagingException("repositoryPath is null");
174         }
175         
176         feedType = getInitParameter("feedType");
177         if (feedType == null) {
178             throw new MessagingException("feedType is null");
179         }
180         
181         String maxSizeParam = getInitParameter("maxSize");
182         if (maxSizeParam != null) {
183             setMaxSize(Integer.parseInt(maxSizeParam));
184         }
185         log("maxSize: " + getMaxSize());
186         
187         initDb();
188         
189     }
190     
191     private void initDb() throws MessagingException {
192         
193         try {
194             ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
195             
196             // Get the DataSourceSelector block
197             DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
198             
199             // Get the data-source required.
200             int stindex =   repositoryPath.indexOf("://") + 3;
201             
202             String datasourceName = repositoryPath.substring(stindex);
203             
204             datasource = (DataSourceComponent) datasources.select(datasourceName);
205         } catch (Exception e) {
206             throw new MessagingException("Can't get datasource", e);
207         }
208         
209         try {
210             analyzer.initSqlQueries(datasource.getConnection(), getMailetContext());
211         } catch (Exception e) {
212             throw new MessagingException("Exception initializing queries", e);
213         }        
214         
215     }
216     
217     /***
218      * Scans the mail and updates the token frequencies in the database.
219      *
220      * The method is synchronized in order to avoid too much database locking,
221      * as thousands of rows may be updated just for one message fed.
222      *
223      * @param mail The Mail message to be scanned.
224      */
225     public void service(Mail mail) {
226         boolean dbUpdated = false;
227         
228         mail.setState(Mail.GHOST);
229         
230         ByteArrayOutputStream baos = new ByteArrayOutputStream();
231         
232         Connection conn = null;
233         
234         try {
235             
236             MimeMessage message = mail.getMessage();
237             
238             String messageId = message.getMessageID();
239             
240             if (message.getSize() > getMaxSize()) {
241                 log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize());
242                 return;
243             }
244             
245             clearAllHeaders(message);
246             
247             message.writeTo(baos);
248             
249             BufferedReader br = new BufferedReader(new StringReader(baos.toString()));
250                 
251             // this is synchronized to avoid concurrent update of the corpus
252             synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
253                 
254                 conn = datasource.getConnection();
255                 
256                 if (conn.getAutoCommit()) {
257                     conn.setAutoCommit(false);
258                 }
259                 
260                 dbUpdated = true;
261                 
262                 //Clear out any existing word/counts etc..
263                 analyzer.clear();
264                 
265                 if ("ham".equalsIgnoreCase(feedType)) {
266                     log(messageId + " Feeding HAM");
267                     //Process the stream as ham (not spam).
268                     analyzer.addHam(br);
269                     
270                     //Update storage statistics.
271                     analyzer.updateHamTokens(conn);
272                 } else {
273                     log(messageId + " Feeding SPAM");
274                     //Process the stream as spam.
275                     analyzer.addSpam(br);
276                     
277                     //Update storage statistics.
278                     analyzer.updateSpamTokens(conn);
279                 }
280                 
281                 //Commit our changes if necessary.
282                 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
283                     conn.commit();
284                     dbUpdated = false;
285                     log(messageId + " Training ended successfully");
286                     JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
287                 }
288                 
289             }
290             
291         } catch (java.sql.SQLException se) {
292             log("SQLException: "
293                     + se.getMessage());
294         } catch (java.io.IOException ioe) {
295             log("IOException: "
296                     + ioe.getMessage());
297         } catch (javax.mail.MessagingException me) {
298             log("MessagingException: "
299                     + me.getMessage());
300         } finally {
301             //Rollback our changes if necessary.
302             try {
303                 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
304                     conn.rollback();
305                     dbUpdated = false;
306                 }
307             } catch (Exception e) {}
308             theJDBCUtil.closeJDBCConnection(conn);
309         }
310     }
311     
312     private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException {
313         Enumeration headers = message.getAllHeaders();
314         
315         while (headers.hasMoreElements()) {
316             Header header = (Header) headers.nextElement();
317             try {
318                 message.removeHeader(header.getName());
319             } catch (javax.mail.MessagingException me) {}
320         }
321         message.saveChanges();
322     }
323     
324 }