View Javadoc

1   /************************************************************************
2    * Copyright (c) 2000-2006 The Apache Software Foundation.             *
3    * All rights reserved.                                                *
4    * ------------------------------------------------------------------- *
5    * Licensed under the Apache License, Version 2.0 (the "License"); you *
6    * may not use this file except in compliance with the License. You    *
7    * may obtain a copy of the License at:                                *
8    *                                                                     *
9    *     http://www.apache.org/licenses/LICENSE-2.0                      *
10   *                                                                     *
11   * Unless required by applicable law or agreed to in writing, software *
12   * distributed under the License is distributed on an "AS IS" BASIS,   *
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or     *
14   * implied.  See the License for the specific language governing       *
15   * permissions and limitations under the License.                      *
16   ***********************************************************************/
17  
18  package org.apache.james.transport.mailets;
19  
20  import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
21  import org.apache.avalon.excalibur.datasource.DataSourceComponent;
22  import org.apache.avalon.framework.service.ServiceManager;
23  import org.apache.james.Constants;
24  import org.apache.james.util.JDBCBayesianAnalyzer;
25  import org.apache.james.util.JDBCUtil;
26  import org.apache.mailet.GenericMailet;
27  import org.apache.mailet.Mail;
28  import org.apache.mailet.MailAddress;
29  import org.apache.mailet.RFC2822Headers;
30  import org.apache.mailet.dates.RFC822DateFormat;
31  
32  import javax.mail.Message;
33  import javax.mail.MessagingException;
34  import javax.mail.Session;
35  import javax.mail.internet.InternetAddress;
36  import javax.mail.internet.MimeBodyPart;
37  import javax.mail.internet.MimeMessage;
38  import javax.mail.internet.MimeMultipart;
39  
40  import java.io.BufferedReader;
41  import java.io.ByteArrayOutputStream;
42  import java.io.StringReader;
43  import java.sql.Connection;
44  import java.text.DecimalFormat;
45  import java.util.Collection;
46  import java.util.HashSet;
47  import java.util.Iterator;
48  import java.util.Set;
49  
50  /***
51   * <P>Spam detection mailet using bayesian analysis techniques.</P>
52   * 
53   * <P>Sets an email message header indicating the
54   * probability that an email message is SPAM.</P>
55   * 
56   * <P>Based upon the principals described in:
57   *   <a href="http://www.paulgraham.com/spam.html">A Plan For Spam</a>
58   *   by Paul Graham.
59   * Extended to Paul Grahams' <a href="http://paulgraham.com/better.html">Better Bayesian Filtering</a>.</P>
60   * 
61   * <P>The analysis capabilities are based on token frequencies (the <I>Corpus</I>) 
62   * learned through a training process (see {@link BayesianAnalysisFeeder})
63   * and stored in a JDBC database.
64   * After a training session, the Corpus must be rebuilt from the database in order to
65   * acquire the new frequencies.
66   * Every 10 minutes a special thread in this mailet will check if any
67   * change was made to the database by the feeder, and rebuild the corpus if necessary.</p>
68   * 
69   * <p>A <CODE>org.apache.james.spam.probability</CODE> mail attribute will be created
70   * containing the computed spam probability as a {@link java.lang.Double}.
71   * The <CODE>headerName</CODE> message header string will be created containing such
72   * probability in floating point representation.</p>
73   * 
74   * <P>Sample configuration:</P>
75   * <PRE><CODE>
76   * &lt;mailet match="All" class="BayesianAnalysis"&gt;
77   *   &lt;repositoryPath&gt;db://maildb&lt;/repositoryPath&gt;
78   *   &lt;!--
79   *     Set this to the header name to add with the spam probability
80   *     (default is "X-MessageIsSpamProbability").
81   *   --&gt;
82   *   &lt;headerName&gt;X-MessageIsSpamProbability&lt;/headerName&gt;
83   *   &lt;!--
84   *     Set this to true if you want to ignore messages coming from local senders
85   *     (default is false).
86   *     By local sender we mean a return-path with a local server part (server listed
87   *     in &lt;servernames&gt; in config.xml).
88   *   --&gt;
89   *   &lt;ignoreLocalSender&gt;true&lt;/ignoreLocalSender&gt;
90   *   &lt;!--
91   *     Set this to the maximum message size (in bytes) that a message may have
92   *     to be considered spam (default is 100000).
93   *   --&gt;
94   *   &lt;maxSize&gt;100000&lt;/maxSize&gt;
95   * &lt;/mailet&gt;
96   * </CODE></PRE>
97   * 
98   * <P>The probability of being spam is pre-pended to the subject if
99   * it is &gt; 0.1 (10%).</P>
100  * 
101  * <P>The required tables are automatically created if not already there (see sqlResources.xml).
102  * The token field in both the ham and spam tables is <B>case sensitive</B>.</P>
103  * @see BayesianAnalysisFeeder
104  * @see org.apache.james.util.BayesianAnalyzer
105  * @see org.apache.james.util.JDBCBayesianAnalyzer
106  * @version CVS $Revision: $ $Date: $
107  * @since 2.3.0
108  */
109 
110 public class BayesianAnalysis
111 extends GenericMailet {
112     /***
113      * The JDBCUtil helper class
114      */
115     private final JDBCUtil theJDBCUtil = new JDBCUtil() {
116         protected void delegatedLog(String logString) {
117             log("BayesianAnalysis: " + logString);
118         }
119     };
120 
121     /***
122      * The JDBCBayesianAnalyzer class that does all the work.
123      */
124     private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
125         protected void delegatedLog(String logString) {
126             log("BayesianAnalysis: " + logString);
127         }
128     };
129     
130     private DataSourceComponent datasource;
131     private String repositoryPath;
132     
133     private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability";
134     private static final String HEADER_NAME = "X-MessageIsSpamProbability";
135     private static final long CORPUS_RELOAD_INTERVAL = 600000;
136     private String headerName;
137     private boolean ignoreLocalSender = false;
138         
139     /*** The date format object used to generate RFC 822 compliant date headers. */
140     private RFC822DateFormat rfc822DateFormat = new RFC822DateFormat();
141     
142     /***
143      * Return a string describing this mailet.
144      *
145      * @return a string describing this mailet
146      */
147     public String getMailetInfo() {
148         return "BayesianAnalysis Mailet";
149     }
150     
151     /***
152      * Holds value of property maxSize.
153      */
154     private int maxSize = 100000;
155 
156     /***
157      * Holds value of property lastCorpusLoadTime.
158      */
159     private long lastCorpusLoadTime;
160     
161     /***
162      * Getter for property maxSize.
163      * @return Value of property maxSize.
164      */
165     public int getMaxSize() {
166 
167         return this.maxSize;
168     }
169 
170     /***
171      * Setter for property maxSize.
172      * @param maxSize New value of property maxSize.
173      */
174     public void setMaxSize(int maxSize) {
175 
176         this.maxSize = maxSize;
177     }
178 
179     /***
180      * Getter for property lastCorpusLoadTime.
181      * @return Value of property lastCorpusLoadTime.
182      */
183     public long getLastCorpusLoadTime() {
184         
185         return this.lastCorpusLoadTime;
186     }
187     
188     /***
189      * Sets lastCorpusLoadTime to System.currentTimeMillis().
190      */
191     private void touchLastCorpusLoadTime() {
192         
193         this.lastCorpusLoadTime = System.currentTimeMillis();
194     }
195     
196     /***
197      * Mailet initialization routine.
198      * @throws MessagingException if a problem arises
199      */
200     public void init() throws MessagingException {
201         repositoryPath = getInitParameter("repositoryPath");
202         
203         if (repositoryPath == null) {
204             throw new MessagingException("repositoryPath is null");
205         }
206         
207         headerName = getInitParameter("headerName",HEADER_NAME);
208         
209         ignoreLocalSender = Boolean.valueOf(getInitParameter("ignoreLocalSender")).booleanValue();
210         
211         if (ignoreLocalSender) {
212             log("Will ignore messages coming from local senders");
213         } else {
214             log("Will analyze messages coming from local senders");
215         }
216         
217         String maxSizeParam = getInitParameter("maxSize");
218         if (maxSizeParam != null) {
219             setMaxSize(Integer.parseInt(maxSizeParam));
220         }
221         log("maxSize: " + getMaxSize());
222         
223         initDb();
224         
225             CorpusLoader corpusLoader = new CorpusLoader(this);
226             corpusLoader.setDaemon(true);
227             corpusLoader.start();
228             
229     }
230     
231     private void initDb() throws MessagingException {
232         
233         try {
234             ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
235             
236             // Get the DataSourceSelector block
237             DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
238             
239             // Get the data-source required.
240             int stindex =   repositoryPath.indexOf("://") + 3;
241             
242             String datasourceName = repositoryPath.substring(stindex);
243             
244             datasource = (DataSourceComponent) datasources.select(datasourceName);
245         } catch (Exception e) {
246             throw new MessagingException("Can't get datasource", e);
247         }
248         
249         try {
250             analyzer.initSqlQueries(datasource.getConnection(), getMailetContext());
251         } catch (Exception e) {
252             throw new MessagingException("Exception initializing queries", e);
253         }        
254         
255         try {
256             loadData(datasource.getConnection());
257         } catch (java.sql.SQLException se) {
258             throw new MessagingException("SQLException loading data", se);
259         }        
260     }
261     
262     /***
263      * Scans the mail and determines the spam probability.
264      *
265      * @param mail The Mail message to be scanned.
266      * @throws MessagingException if a problem arises
267      */
268     public void service(Mail mail) throws MessagingException {
269         
270         try {
271             MimeMessage message = mail.getMessage();
272             
273             if (ignoreLocalSender) {
274                 // ignore the message if the sender is local
275                 if (mail.getSender() != null
276                         && getMailetContext().isLocalServer(mail.getSender().getHost())) {
277                     return;
278                 }
279             }
280             
281             String [] headerArray = message.getHeader(headerName);
282             // ignore the message if already analyzed
283             if (headerArray != null && headerArray.length > 0) {
284                 return;
285             }
286             
287             ByteArrayOutputStream baos = new ByteArrayOutputStream();
288             
289             double probability;
290             
291             if (message.getSize() < getMaxSize()) {
292                 message.writeTo(baos);
293                 probability = analyzer.computeSpamProbability(new BufferedReader(new StringReader(baos.toString())));
294             } else {
295                 probability = 0.0;
296             }
297             
298             mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(probability));
299             message.setHeader(headerName, Double.toString(probability));
300             
301             DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat.getInstance();
302             probabilityForm.applyPattern("##0.##%");
303             String probabilityString = probabilityForm.format(probability);
304             
305             String senderString;
306             if (mail.getSender() == null) {
307                 senderString = "null";
308             } else {
309                 senderString = mail.getSender().toString();
310             }
311             if (probability > 0.1) {
312                 log(headerName
313                         + ": "
314                         + probabilityString
315                         + "; From: "
316                         + senderString
317                         + "; Recipient(s): "
318                         + getAddressesString(mail.getRecipients()));
319                 
320                 appendToSubject(message,
321                         " [" + probabilityString
322                         + (probability > 0.9 ? " SPAM" : " spam") + "]");
323             }
324             
325             saveChanges(message);
326             
327         } catch (Exception e) {
328             log("Exception: "
329                     + e.getMessage(), e);
330             throw new MessagingException("Exception thrown", e);
331         }
332     }
333     
334     private void loadData(Connection conn)
335     throws java.sql.SQLException {
336         
337         try {
338             // this is synchronized to avoid concurrent update of the corpus
339             synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
340                 analyzer.tokenCountsClear();
341                 analyzer.loadHamNSpam(conn);
342                 analyzer.buildCorpus();
343                 analyzer.tokenCountsClear();
344             }
345             
346             log("BayesianAnalysis Corpus loaded");
347             
348             touchLastCorpusLoadTime();
349             
350         } finally {
351             if (conn != null) {
352                 theJDBCUtil.closeJDBCConnection(conn);
353             }
354         }
355         
356     }
357     
358     private String getAddressesString(Collection addresses) {
359         if (addresses == null) {
360             return "null";
361         }
362         
363         Iterator iter = addresses.iterator();
364         StringBuffer sb = new StringBuffer();
365         sb.append('[');
366         for (int i = 0; iter.hasNext(); i++) {
367             sb.append(iter.next());
368             if (i + 1 < addresses.size()) {
369                 sb.append(", ");
370             }
371         }
372         sb.append(']');
373         return sb.toString();
374     }
375     
376     private void appendToSubject(MimeMessage message, String toAppend) {
377         try {
378             String subject = message.getSubject();
379             
380             if (subject == null) {
381                 message.setSubject(toAppend, "iso-8859-1");
382             } else {
383                 message.setSubject(toAppend + " " + subject, "iso-8859-1");
384             }
385         } catch (MessagingException ex) {}
386     }
387     
388     private void sendReplyFromPostmaster(Mail mail, String stringContent) throws MessagingException {
389         try {
390             MailAddress notifier = getMailetContext().getPostmaster();
391             
392             MailAddress senderMailAddress = mail.getSender();
393             
394             MimeMessage message = mail.getMessage();
395             //Create the reply message
396             MimeMessage reply = new MimeMessage(Session.getDefaultInstance(System.getProperties(), null));
397             
398             //Create the list of recipients in the Address[] format
399             InternetAddress[] rcptAddr = new InternetAddress[1];
400             rcptAddr[0] = senderMailAddress.toInternetAddress();
401             reply.setRecipients(Message.RecipientType.TO, rcptAddr);
402             
403             //Set the sender...
404             reply.setFrom(notifier.toInternetAddress());
405             
406             //Create the message body
407             MimeMultipart multipart = new MimeMultipart();
408             //Add message as the first mime body part
409             MimeBodyPart part = new MimeBodyPart();
410             part.setContent(stringContent, "text/plain");
411             part.setHeader(RFC2822Headers.CONTENT_TYPE, "text/plain");
412             multipart.addBodyPart(part);
413             
414             reply.setContent(multipart);
415             reply.setHeader(RFC2822Headers.CONTENT_TYPE, multipart.getContentType());
416             
417             //Create the list of recipients in our MailAddress format
418             Set recipients = new HashSet();
419             recipients.add(senderMailAddress);
420             
421             //Set additional headers
422             if (reply.getHeader(RFC2822Headers.DATE)==null){
423                 reply.setHeader(RFC2822Headers.DATE, rfc822DateFormat.format(new java.util.Date()));
424             }
425             String subject = message.getSubject();
426             if (subject == null) {
427                 subject = "";
428             }
429             if (subject.indexOf("Re:") == 0){
430                 reply.setSubject(subject);
431             } else {
432                 reply.setSubject("Re:" + subject);
433             }
434             reply.setHeader(RFC2822Headers.IN_REPLY_TO, message.getMessageID());
435             
436             //Send it off...
437             getMailetContext().sendMail(notifier, recipients, reply);
438         } catch (Exception e) {
439             log("Exception found sending reply", e);
440         }
441     }
442     
443     /***
444      * Saves changes resetting the original message id.
445      */
446     private void saveChanges(MimeMessage message) throws MessagingException {
447         String messageId = message.getMessageID();
448         message.saveChanges();
449         if (messageId != null) {
450             message.setHeader(RFC2822Headers.MESSAGE_ID, messageId);
451         }
452     }
453 
454     private static class CorpusLoader extends Thread {
455         
456         private BayesianAnalysis analysis;
457         
458         private CorpusLoader(BayesianAnalysis analysis) {
459             super("BayesianAnalysis Corpus Loader");
460             this.analysis = analysis;
461         }
462         
463         /*** Thread entry point.
464          */
465         public void run() {
466         analysis.log("CorpusLoader thread started: will wake up every " + CORPUS_RELOAD_INTERVAL + " ms");
467         
468         try {
469             Thread.sleep(CORPUS_RELOAD_INTERVAL);
470 
471             while (true) {
472                 if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer.getLastDatabaseUpdateTime()) {
473                     analysis.log("Reloading Corpus ...");
474                     try {
475                         analysis.loadData(analysis.datasource.getConnection());
476                         analysis.log("Corpus reloaded");
477                     } catch (java.sql.SQLException se) {
478                         analysis.log("SQLException: ", se);
479                     }
480                     
481                 }
482                 
483                 if (Thread.interrupted()) {
484                     break;
485                 }
486                 Thread.sleep(CORPUS_RELOAD_INTERVAL);
487             }
488         }
489         catch (InterruptedException ex) {
490             interrupt();
491         }
492         }
493         
494     }
495     
496 }