View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  
22  package org.apache.james.transport.mailets;
23  
24  import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
25  import org.apache.avalon.excalibur.datasource.DataSourceComponent;
26  import org.apache.avalon.framework.service.ServiceManager;
27  import org.apache.james.Constants;
28  import org.apache.james.util.bayesian.JDBCBayesianAnalyzer;
29  import org.apache.james.util.sql.JDBCUtil;
30  import org.apache.mailet.base.GenericMailet;
31  import org.apache.mailet.Mail;
32  import org.apache.mailet.base.RFC2822Headers;
33  
34  import javax.mail.MessagingException;
35  import javax.mail.internet.MimeMessage;
36  
37  
38  import java.io.BufferedReader;
39  import java.io.ByteArrayOutputStream;
40  import java.io.StringReader;
41  import java.sql.Connection;
42  import java.text.DecimalFormat;
43  import java.util.Collection;
44  import java.util.Iterator;
45  
46  /**
47   * <P>Spam detection mailet using bayesian analysis techniques.</P>
48   * 
49   * <P>Sets an email message header indicating the
50   * probability that an email message is SPAM.</P>
51   * 
52   * <P>Based upon the principals described in:
53   *   <a href="http://www.paulgraham.com/spam.html">A Plan For Spam</a>
54   *   by Paul Graham.
55   * Extended to Paul Grahams' <a href="http://paulgraham.com/better.html">Better Bayesian Filtering</a>.</P>
56   * 
57   * <P>The analysis capabilities are based on token frequencies (the <I>Corpus</I>) 
58   * learned through a training process (see {@link BayesianAnalysisFeeder})
59   * and stored in a JDBC database.
60   * After a training session, the Corpus must be rebuilt from the database in order to
61   * acquire the new frequencies.
62   * Every 10 minutes a special thread in this mailet will check if any
63   * change was made to the database by the feeder, and rebuild the corpus if necessary.</p>
64   * 
65   * <p>A <CODE>org.apache.james.spam.probability</CODE> mail attribute will be created
66   * containing the computed spam probability as a {@link java.lang.Double}.
67   * The <CODE>headerName</CODE> message header string will be created containing such
68   * probability in floating point representation.</p>
69   * 
70   * <P>Sample configuration:</P>
71   * <PRE><CODE>
72   * &lt;mailet match="All" class="BayesianAnalysis"&gt;
73   *   &lt;repositoryPath&gt;db://maildb&lt;/repositoryPath&gt;
74   *   &lt;!--
75   *     Set this to the header name to add with the spam probability
76   *     (default is "X-MessageIsSpamProbability").
77   *   --&gt;
78   *   &lt;headerName&gt;X-MessageIsSpamProbability&lt;/headerName&gt;
79   *   &lt;!--
80   *     Set this to true if you want to ignore messages coming from local senders
81   *     (default is false).
82   *     By local sender we mean a return-path with a local server part (server listed
83   *     in &lt;servernames&gt; in config.xml).
84   *   --&gt;
85   *   &lt;ignoreLocalSender&gt;true&lt;/ignoreLocalSender&gt;
86   *   &lt;!--
87   *     Set this to the maximum message size (in bytes) that a message may have
88   *     to be considered spam (default is 100000).
89   *   --&gt;
90   *   &lt;maxSize&gt;100000&lt;/maxSize&gt;
91   *   &lt;!--
92   *     Set this to false if you not want to tag the message if spam is detected (Default is true).
93   *   --&gt;
94   *   &lt;tagSubject&gt;true&lt;/tagSubject&gt;
95   * &lt;/mailet&gt;
96   * </CODE></PRE>
97   * 
98   * <P>The probability of being spam is pre-pended to the subject if
99   * it is &gt; 0.1 (10%).</P>
100  * 
101  * <P>The required tables are automatically created if not already there (see sqlResources.xml).
102  * The token field in both the ham and spam tables is <B>case sensitive</B>.</P>
103  * @see BayesianAnalysisFeeder
104  * @see org.apache.james.util.bayesian.BayesianAnalyzer
105  * @see org.apache.james.util.bayesian.JDBCBayesianAnalyzer
106  * @version CVS $Revision: 717869 $ $Date: 2008-11-15 15:56:18 +0000 (Sat, 15 Nov 2008) $
107  * @since 2.3.0
108  */
109 
110 public class BayesianAnalysis
111 extends GenericMailet {
112     /**
113      * The JDBCUtil helper class
114      */
115     private final JDBCUtil theJDBCUtil = new JDBCUtil() {
116         protected void delegatedLog(String logString) {
117             log("BayesianAnalysis: " + logString);
118         }
119     };
120 
121     /**
122      * The JDBCBayesianAnalyzer class that does all the work.
123      */
124     private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
125         protected void delegatedLog(String logString) {
126             log("BayesianAnalysis: " + logString);
127         }
128     };
129     
130     private DataSourceComponent datasource;
131     private String repositoryPath;
132     
133     private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability";
134     private static final String HEADER_NAME = "X-MessageIsSpamProbability";
135     private static final long CORPUS_RELOAD_INTERVAL = 600000;
136     private String headerName;
137     private boolean ignoreLocalSender = false;
138     private boolean tagSubject = true;
139     
140     /**
141      * Return a string describing this mailet.
142      *
143      * @return a string describing this mailet
144      */
145     public String getMailetInfo() {
146         return "BayesianAnalysis Mailet";
147     }
148     
149     /**
150      * Holds value of property maxSize.
151      */
152     private int maxSize = 100000;
153 
154     /**
155      * Holds value of property lastCorpusLoadTime.
156      */
157     private long lastCorpusLoadTime;
158     
159     /**
160      * Getter for property maxSize.
161      * @return Value of property maxSize.
162      */
163     public int getMaxSize() {
164 
165         return this.maxSize;
166     }
167 
168     /**
169      * Setter for property maxSize.
170      * @param maxSize New value of property maxSize.
171      */
172     public void setMaxSize(int maxSize) {
173 
174         this.maxSize = maxSize;
175     }
176 
177     /**
178      * Getter for property lastCorpusLoadTime.
179      * @return Value of property lastCorpusLoadTime.
180      */
181     public long getLastCorpusLoadTime() {
182         
183         return this.lastCorpusLoadTime;
184     }
185     
186     /**
187      * Sets lastCorpusLoadTime to System.currentTimeMillis().
188      */
189     private void touchLastCorpusLoadTime() {
190         
191         this.lastCorpusLoadTime = System.currentTimeMillis();
192     }
193     
194     /**
195      * Mailet initialization routine.
196      * @throws MessagingException if a problem arises
197      */
198     public void init() throws MessagingException {
199         repositoryPath = getInitParameter("repositoryPath");
200         
201         if (repositoryPath == null) {
202             throw new MessagingException("repositoryPath is null");
203         }
204         
205         headerName = getInitParameter("headerName",HEADER_NAME);
206         
207         ignoreLocalSender = Boolean.valueOf(getInitParameter("ignoreLocalSender")).booleanValue();
208         
209         if (ignoreLocalSender) {
210             log("Will ignore messages coming from local senders");
211         } else {
212             log("Will analyze messages coming from local senders");
213         }
214         
215         String maxSizeParam = getInitParameter("maxSize");
216         if (maxSizeParam != null) {
217             setMaxSize(Integer.parseInt(maxSizeParam));
218         }
219         log("maxSize: " + getMaxSize());
220         
221         String tag = getInitParameter("tagSubject");
222         if (tag != null && tag.equals("false")) {
223             tagSubject = false;
224         }
225         
226         initDb();
227         
228             CorpusLoader corpusLoader = new CorpusLoader(this);
229             corpusLoader.setDaemon(true);
230             corpusLoader.start();
231             
232     }
233     
234     private void initDb() throws MessagingException {
235         
236         try {
237             ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
238             
239             // Get the DataSourceSelector block
240             DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
241             
242             // Get the data-source required.
243             int stindex =   repositoryPath.indexOf("://") + 3;
244             
245             String datasourceName = repositoryPath.substring(stindex);
246             
247             datasource = (DataSourceComponent) datasources.select(datasourceName);
248         } catch (Exception e) {
249             throw new MessagingException("Can't get datasource", e);
250         }
251         
252         try {
253             analyzer.initSqlQueries(datasource.getConnection(), getMailetContext().getAttribute("confDir") + "/sqlResources.xml");
254         } catch (Exception e) {
255             throw new MessagingException("Exception initializing queries", e);
256         }        
257         
258         try {
259             loadData(datasource.getConnection());
260         } catch (java.sql.SQLException se) {
261             throw new MessagingException("SQLException loading data", se);
262         }        
263     }
264     
265     /**
266      * Scans the mail and determines the spam probability.
267      *
268      * @param mail The Mail message to be scanned.
269      * @throws MessagingException if a problem arises
270      */
271     public void service(Mail mail) throws MessagingException {
272         
273         try {
274             MimeMessage message = mail.getMessage();
275             
276             if (ignoreLocalSender) {
277                 // ignore the message if the sender is local
278                 if (mail.getSender() != null
279                         && getMailetContext().isLocalServer(mail.getSender().getHost())) {
280                     return;
281                 }
282             }
283             
284             String [] headerArray = message.getHeader(headerName);
285             // ignore the message if already analyzed
286             if (headerArray != null && headerArray.length > 0) {
287                 return;
288             }
289             
290             ByteArrayOutputStream baos = new ByteArrayOutputStream();
291             
292             double probability;
293             
294             if (message.getSize() < getMaxSize()) {
295                 message.writeTo(baos);
296                 probability = analyzer.computeSpamProbability(new BufferedReader(new StringReader(baos.toString())));
297             } else {
298                 probability = 0.0;
299             }
300             
301             mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(probability));
302             message.setHeader(headerName, Double.toString(probability));
303             
304             DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat.getInstance();
305             probabilityForm.applyPattern("##0.##%");
306             String probabilityString = probabilityForm.format(probability);
307             
308             String senderString;
309             if (mail.getSender() == null) {
310                 senderString = "null";
311             } else {
312                 senderString = mail.getSender().toString();
313             }
314             if (probability > 0.1) {
315                 log(headerName
316                         + ": "
317                         + probabilityString
318                         + "; From: "
319                         + senderString
320                         + "; Recipient(s): "
321                         + getAddressesString(mail.getRecipients()));
322                 
323                 
324                 // Check if we should tag the subject
325                 if (tagSubject) {
326                     appendToSubject(message,
327                             " [" + probabilityString
328                             + (probability > 0.9 ? " SPAM" : " spam") + "]");
329                 }
330             }
331             
332             saveChanges(message);
333             
334         } catch (Exception e) {
335             log("Exception: "
336                     + e.getMessage(), e);
337             throw new MessagingException("Exception thrown", e);
338         }
339     }
340     
341     private void loadData(Connection conn)
342     throws java.sql.SQLException {
343         
344         try {
345             // this is synchronized to avoid concurrent update of the corpus
346             synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
347                 analyzer.tokenCountsClear();
348                 analyzer.loadHamNSpam(conn);
349                 analyzer.buildCorpus();
350                 analyzer.tokenCountsClear();
351             }
352             
353             log("BayesianAnalysis Corpus loaded");
354             
355             touchLastCorpusLoadTime();
356             
357         } finally {
358             if (conn != null) {
359                 theJDBCUtil.closeJDBCConnection(conn);
360             }
361         }
362         
363     }
364     
365     private String getAddressesString(Collection addresses) {
366         if (addresses == null) {
367             return "null";
368         }
369         
370         Iterator iter = addresses.iterator();
371         StringBuffer sb = new StringBuffer();
372         sb.append('[');
373         for (int i = 0; iter.hasNext(); i++) {
374             sb.append(iter.next());
375             if (i + 1 < addresses.size()) {
376                 sb.append(", ");
377             }
378         }
379         sb.append(']');
380         return sb.toString();
381     }
382     
383     private void appendToSubject(MimeMessage message, String toAppend) {
384         try {
385             String subject = message.getSubject();
386             
387             if (subject == null) {
388                 message.setSubject(toAppend, "iso-8859-1");
389             } else {
390                 message.setSubject(toAppend + " " + subject, "iso-8859-1");
391             }
392         } catch (MessagingException ex) {}
393     }
394        
395     /**
396      * Saves changes resetting the original message id.
397      */
398     private void saveChanges(MimeMessage message) throws MessagingException {
399         String messageId = message.getMessageID();
400         message.saveChanges();
401         if (messageId != null) {
402             message.setHeader(RFC2822Headers.MESSAGE_ID, messageId);
403         }
404     }
405 
406     private static class CorpusLoader extends Thread {
407         
408         private BayesianAnalysis analysis;
409         
410         private CorpusLoader(BayesianAnalysis analysis) {
411             super("BayesianAnalysis Corpus Loader");
412             this.analysis = analysis;
413         }
414         
415         /** Thread entry point.
416          */
417         public void run() {
418         analysis.log("CorpusLoader thread started: will wake up every " + CORPUS_RELOAD_INTERVAL + " ms");
419         
420         try {
421             Thread.sleep(CORPUS_RELOAD_INTERVAL);
422 
423             while (true) {
424                 if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer.getLastDatabaseUpdateTime()) {
425                     analysis.log("Reloading Corpus ...");
426                     try {
427                         analysis.loadData(analysis.datasource.getConnection());
428                         analysis.log("Corpus reloaded");
429                     } catch (java.sql.SQLException se) {
430                         analysis.log("SQLException: ", se);
431                     }
432                     
433                 }
434                 
435                 if (Thread.interrupted()) {
436                     break;
437                 }
438                 Thread.sleep(CORPUS_RELOAD_INTERVAL);
439             }
440         }
441         catch (InterruptedException ex) {
442             interrupt();
443         }
444         }
445         
446     }
447     
448 }