View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.transport.mailets;
21  
22  import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
23  import org.apache.avalon.excalibur.datasource.DataSourceComponent;
24  import org.apache.avalon.framework.service.ServiceManager;
25  import org.apache.james.Constants;
26  import org.apache.james.util.JDBCBayesianAnalyzer;
27  import org.apache.james.util.JDBCUtil;
28  import org.apache.mailet.GenericMailet;
29  import org.apache.mailet.Mail;
30  import org.apache.mailet.MailAddress;
31  import org.apache.mailet.RFC2822Headers;
32  import org.apache.mailet.dates.RFC822DateFormat;
33  
34  import javax.mail.Message;
35  import javax.mail.MessagingException;
36  import javax.mail.Session;
37  import javax.mail.internet.InternetAddress;
38  import javax.mail.internet.MimeBodyPart;
39  import javax.mail.internet.MimeMessage;
40  import javax.mail.internet.MimeMultipart;
41  
42  import java.io.BufferedReader;
43  import java.io.ByteArrayOutputStream;
44  import java.io.StringReader;
45  import java.sql.Connection;
46  import java.text.DecimalFormat;
47  import java.util.Collection;
48  import java.util.HashSet;
49  import java.util.Iterator;
50  import java.util.Set;
51  
52  /***
53   * <P>Spam detection mailet using bayesian analysis techniques.</P>
54   * 
55   * <P>Sets an email message header indicating the
56   * probability that an email message is SPAM.</P>
57   * 
58   * <P>Based upon the principals described in:
59   *   <a href="http://www.paulgraham.com/spam.html">A Plan For Spam</a>
60   *   by Paul Graham.
61   * Extended to Paul Grahams' <a href="http://paulgraham.com/better.html">Better Bayesian Filtering</a>.</P>
62   * 
63   * <P>The analysis capabilities are based on token frequencies (the <I>Corpus</I>) 
64   * learned through a training process (see {@link BayesianAnalysisFeeder})
65   * and stored in a JDBC database.
66   * After a training session, the Corpus must be rebuilt from the database in order to
67   * acquire the new frequencies.
68   * Every 10 minutes a special thread in this mailet will check if any
69   * change was made to the database by the feeder, and rebuild the corpus if necessary.</p>
70   * 
71   * <p>A <CODE>org.apache.james.spam.probability</CODE> mail attribute will be created
72   * containing the computed spam probability as a {@link java.lang.Double}.
73   * The <CODE>headerName</CODE> message header string will be created containing such
74   * probability in floating point representation.</p>
75   * 
76   * <P>Sample configuration:</P>
77   * <PRE><CODE>
78   * &lt;mailet match="All" class="BayesianAnalysis"&gt;
79   *   &lt;repositoryPath&gt;db://maildb&lt;/repositoryPath&gt;
80   *   &lt;!--
81   *     Set this to the header name to add with the spam probability
82   *     (default is "X-MessageIsSpamProbability").
83   *   --&gt;
84   *   &lt;headerName&gt;X-MessageIsSpamProbability&lt;/headerName&gt;
85   *   &lt;!--
86   *     Set this to true if you want to ignore messages coming from local senders
87   *     (default is false).
88   *     By local sender we mean a return-path with a local server part (server listed
89   *     in &lt;servernames&gt; in config.xml).
90   *   --&gt;
91   *   &lt;ignoreLocalSender&gt;true&lt;/ignoreLocalSender&gt;
92   *   &lt;!--
93   *     Set this to the maximum message size (in bytes) that a message may have
94   *     to be considered spam (default is 100000).
95   *   --&gt;
96   *   &lt;maxSize&gt;100000&lt;/maxSize&gt;
97   * &lt;/mailet&gt;
98   * </CODE></PRE>
99   * 
100  * <P>The probability of being spam is pre-pended to the subject if
101  * it is &gt; 0.1 (10%).</P>
102  * 
103  * <P>The required tables are automatically created if not already there (see sqlResources.xml).
104  * The token field in both the ham and spam tables is <B>case sensitive</B>.</P>
105  * @see BayesianAnalysisFeeder
106  * @see org.apache.james.util.BayesianAnalyzer
107  * @see org.apache.james.util.JDBCBayesianAnalyzer
108  * @version CVS $Revision: $ $Date: $
109  * @since 2.3.0
110  */
111 
112 public class BayesianAnalysis
113 extends GenericMailet {
114     /***
115      * The JDBCUtil helper class
116      */
117     private final JDBCUtil theJDBCUtil = new JDBCUtil() {
118         protected void delegatedLog(String logString) {
119             log("BayesianAnalysis: " + logString);
120         }
121     };
122 
123     /***
124      * The JDBCBayesianAnalyzer class that does all the work.
125      */
126     private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
127         protected void delegatedLog(String logString) {
128             log("BayesianAnalysis: " + logString);
129         }
130     };
131     
132     private DataSourceComponent datasource;
133     private String repositoryPath;
134     
135     private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability";
136     private static final String HEADER_NAME = "X-MessageIsSpamProbability";
137     private static final long CORPUS_RELOAD_INTERVAL = 600000;
138     private String headerName;
139     private boolean ignoreLocalSender = false;
140         
141     /*** The date format object used to generate RFC 822 compliant date headers. */
142     private RFC822DateFormat rfc822DateFormat = new RFC822DateFormat();
143     
144     /***
145      * Return a string describing this mailet.
146      *
147      * @return a string describing this mailet
148      */
149     public String getMailetInfo() {
150         return "BayesianAnalysis Mailet";
151     }
152     
153     /***
154      * Holds value of property maxSize.
155      */
156     private int maxSize = 100000;
157 
158     /***
159      * Holds value of property lastCorpusLoadTime.
160      */
161     private long lastCorpusLoadTime;
162     
163     /***
164      * Getter for property maxSize.
165      * @return Value of property maxSize.
166      */
167     public int getMaxSize() {
168 
169         return this.maxSize;
170     }
171 
172     /***
173      * Setter for property maxSize.
174      * @param maxSize New value of property maxSize.
175      */
176     public void setMaxSize(int maxSize) {
177 
178         this.maxSize = maxSize;
179     }
180 
181     /***
182      * Getter for property lastCorpusLoadTime.
183      * @return Value of property lastCorpusLoadTime.
184      */
185     public long getLastCorpusLoadTime() {
186         
187         return this.lastCorpusLoadTime;
188     }
189     
190     /***
191      * Sets lastCorpusLoadTime to System.currentTimeMillis().
192      */
193     private void touchLastCorpusLoadTime() {
194         
195         this.lastCorpusLoadTime = System.currentTimeMillis();
196     }
197     
198     /***
199      * Mailet initialization routine.
200      * @throws MessagingException if a problem arises
201      */
202     public void init() throws MessagingException {
203         repositoryPath = getInitParameter("repositoryPath");
204         
205         if (repositoryPath == null) {
206             throw new MessagingException("repositoryPath is null");
207         }
208         
209         headerName = getInitParameter("headerName",HEADER_NAME);
210         
211         ignoreLocalSender = Boolean.valueOf(getInitParameter("ignoreLocalSender")).booleanValue();
212         
213         if (ignoreLocalSender) {
214             log("Will ignore messages coming from local senders");
215         } else {
216             log("Will analyze messages coming from local senders");
217         }
218         
219         String maxSizeParam = getInitParameter("maxSize");
220         if (maxSizeParam != null) {
221             setMaxSize(Integer.parseInt(maxSizeParam));
222         }
223         log("maxSize: " + getMaxSize());
224         
225         initDb();
226         
227             CorpusLoader corpusLoader = new CorpusLoader(this);
228             corpusLoader.setDaemon(true);
229             corpusLoader.start();
230             
231     }
232     
233     private void initDb() throws MessagingException {
234         
235         try {
236             ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
237             
238             // Get the DataSourceSelector block
239             DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
240             
241             // Get the data-source required.
242             int stindex =   repositoryPath.indexOf("://") + 3;
243             
244             String datasourceName = repositoryPath.substring(stindex);
245             
246             datasource = (DataSourceComponent) datasources.select(datasourceName);
247         } catch (Exception e) {
248             throw new MessagingException("Can't get datasource", e);
249         }
250         
251         try {
252             analyzer.initSqlQueries(datasource.getConnection(), getMailetContext());
253         } catch (Exception e) {
254             throw new MessagingException("Exception initializing queries", e);
255         }        
256         
257         try {
258             loadData(datasource.getConnection());
259         } catch (java.sql.SQLException se) {
260             throw new MessagingException("SQLException loading data", se);
261         }        
262     }
263     
264     /***
265      * Scans the mail and determines the spam probability.
266      *
267      * @param mail The Mail message to be scanned.
268      * @throws MessagingException if a problem arises
269      */
270     public void service(Mail mail) throws MessagingException {
271         
272         try {
273             MimeMessage message = mail.getMessage();
274             
275             if (ignoreLocalSender) {
276                 // ignore the message if the sender is local
277                 if (mail.getSender() != null
278                         && getMailetContext().isLocalServer(mail.getSender().getHost())) {
279                     return;
280                 }
281             }
282             
283             String [] headerArray = message.getHeader(headerName);
284             // ignore the message if already analyzed
285             if (headerArray != null && headerArray.length > 0) {
286                 return;
287             }
288             
289             ByteArrayOutputStream baos = new ByteArrayOutputStream();
290             
291             double probability;
292             
293             if (message.getSize() < getMaxSize()) {
294                 message.writeTo(baos);
295                 probability = analyzer.computeSpamProbability(new BufferedReader(new StringReader(baos.toString())));
296             } else {
297                 probability = 0.0;
298             }
299             
300             mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(probability));
301             message.setHeader(headerName, Double.toString(probability));
302             
303             DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat.getInstance();
304             probabilityForm.applyPattern("##0.##%");
305             String probabilityString = probabilityForm.format(probability);
306             
307             String senderString;
308             if (mail.getSender() == null) {
309                 senderString = "null";
310             } else {
311                 senderString = mail.getSender().toString();
312             }
313             if (probability > 0.1) {
314                 log(headerName
315                         + ": "
316                         + probabilityString
317                         + "; From: "
318                         + senderString
319                         + "; Recipient(s): "
320                         + getAddressesString(mail.getRecipients()));
321                 
322                 appendToSubject(message,
323                         " [" + probabilityString
324                         + (probability > 0.9 ? " SPAM" : " spam") + "]");
325             }
326             
327             saveChanges(message);
328             
329         } catch (Exception e) {
330             log("Exception: "
331                     + e.getMessage(), e);
332             throw new MessagingException("Exception thrown", e);
333         }
334     }
335     
336     private void loadData(Connection conn)
337     throws java.sql.SQLException {
338         
339         try {
340             // this is synchronized to avoid concurrent update of the corpus
341             synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
342                 analyzer.tokenCountsClear();
343                 analyzer.loadHamNSpam(conn);
344                 analyzer.buildCorpus();
345                 analyzer.tokenCountsClear();
346             }
347             
348             log("BayesianAnalysis Corpus loaded");
349             
350             touchLastCorpusLoadTime();
351             
352         } finally {
353             if (conn != null) {
354                 theJDBCUtil.closeJDBCConnection(conn);
355             }
356         }
357         
358     }
359     
360     private String getAddressesString(Collection addresses) {
361         if (addresses == null) {
362             return "null";
363         }
364         
365         Iterator iter = addresses.iterator();
366         StringBuffer sb = new StringBuffer();
367         sb.append('[');
368         for (int i = 0; iter.hasNext(); i++) {
369             sb.append(iter.next());
370             if (i + 1 < addresses.size()) {
371                 sb.append(", ");
372             }
373         }
374         sb.append(']');
375         return sb.toString();
376     }
377     
378     private void appendToSubject(MimeMessage message, String toAppend) {
379         try {
380             String subject = message.getSubject();
381             
382             if (subject == null) {
383                 message.setSubject(toAppend, "iso-8859-1");
384             } else {
385                 message.setSubject(toAppend + " " + subject, "iso-8859-1");
386             }
387         } catch (MessagingException ex) {}
388     }
389     
390     private void sendReplyFromPostmaster(Mail mail, String stringContent) throws MessagingException {
391         try {
392             MailAddress notifier = getMailetContext().getPostmaster();
393             
394             MailAddress senderMailAddress = mail.getSender();
395             
396             MimeMessage message = mail.getMessage();
397             //Create the reply message
398             MimeMessage reply = new MimeMessage(Session.getDefaultInstance(System.getProperties(), null));
399             
400             //Create the list of recipients in the Address[] format
401             InternetAddress[] rcptAddr = new InternetAddress[1];
402             rcptAddr[0] = senderMailAddress.toInternetAddress();
403             reply.setRecipients(Message.RecipientType.TO, rcptAddr);
404             
405             //Set the sender...
406             reply.setFrom(notifier.toInternetAddress());
407             
408             //Create the message body
409             MimeMultipart multipart = new MimeMultipart();
410             //Add message as the first mime body part
411             MimeBodyPart part = new MimeBodyPart();
412             part.setContent(stringContent, "text/plain");
413             part.setHeader(RFC2822Headers.CONTENT_TYPE, "text/plain");
414             multipart.addBodyPart(part);
415             
416             reply.setContent(multipart);
417             reply.setHeader(RFC2822Headers.CONTENT_TYPE, multipart.getContentType());
418             
419             //Create the list of recipients in our MailAddress format
420             Set recipients = new HashSet();
421             recipients.add(senderMailAddress);
422             
423             //Set additional headers
424             if (reply.getHeader(RFC2822Headers.DATE)==null){
425                 reply.setHeader(RFC2822Headers.DATE, rfc822DateFormat.format(new java.util.Date()));
426             }
427             String subject = message.getSubject();
428             if (subject == null) {
429                 subject = "";
430             }
431             if (subject.indexOf("Re:") == 0){
432                 reply.setSubject(subject);
433             } else {
434                 reply.setSubject("Re:" + subject);
435             }
436             reply.setHeader(RFC2822Headers.IN_REPLY_TO, message.getMessageID());
437             
438             //Send it off...
439             getMailetContext().sendMail(notifier, recipients, reply);
440         } catch (Exception e) {
441             log("Exception found sending reply", e);
442         }
443     }
444     
445     /***
446      * Saves changes resetting the original message id.
447      */
448     private void saveChanges(MimeMessage message) throws MessagingException {
449         String messageId = message.getMessageID();
450         message.saveChanges();
451         if (messageId != null) {
452             message.setHeader(RFC2822Headers.MESSAGE_ID, messageId);
453         }
454     }
455 
456     private static class CorpusLoader extends Thread {
457         
458         private BayesianAnalysis analysis;
459         
460         private CorpusLoader(BayesianAnalysis analysis) {
461             super("BayesianAnalysis Corpus Loader");
462             this.analysis = analysis;
463         }
464         
465         /*** Thread entry point.
466          */
467         public void run() {
468         analysis.log("CorpusLoader thread started: will wake up every " + CORPUS_RELOAD_INTERVAL + " ms");
469         
470         try {
471             Thread.sleep(CORPUS_RELOAD_INTERVAL);
472 
473             while (true) {
474                 if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer.getLastDatabaseUpdateTime()) {
475                     analysis.log("Reloading Corpus ...");
476                     try {
477                         analysis.loadData(analysis.datasource.getConnection());
478                         analysis.log("Corpus reloaded");
479                     } catch (java.sql.SQLException se) {
480                         analysis.log("SQLException: ", se);
481                     }
482                     
483                 }
484                 
485                 if (Thread.interrupted()) {
486                     break;
487                 }
488                 Thread.sleep(CORPUS_RELOAD_INTERVAL);
489             }
490         }
491         catch (InterruptedException ex) {
492             interrupt();
493         }
494         }
495         
496     }
497     
498 }