1 /************************************************************************
2 * Copyright (c) 2000-2006 The Apache Software Foundation. *
3 * All rights reserved. *
4 * ------------------------------------------------------------------- *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you *
6 * may not use this file except in compliance with the License. You *
7 * may obtain a copy of the License at: *
8 * *
9 * http://www.apache.org/licenses/LICENSE-2.0 *
10 * *
11 * Unless required by applicable law or agreed to in writing, software *
12 * distributed under the License is distributed on an "AS IS" BASIS, *
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14 * implied. See the License for the specific language governing *
15 * permissions and limitations under the License. *
16 ***********************************************************************/
17
18 package org.apache.james.transport.mailets;
19
20 import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
21 import org.apache.avalon.excalibur.datasource.DataSourceComponent;
22 import org.apache.avalon.framework.service.ServiceManager;
23 import org.apache.james.Constants;
24 import org.apache.james.util.JDBCBayesianAnalyzer;
25 import org.apache.james.util.JDBCUtil;
26 import org.apache.mailet.GenericMailet;
27 import org.apache.mailet.Mail;
28 import org.apache.mailet.MailAddress;
29 import org.apache.mailet.RFC2822Headers;
30 import org.apache.mailet.dates.RFC822DateFormat;
31
32 import javax.mail.Message;
33 import javax.mail.MessagingException;
34 import javax.mail.Session;
35 import javax.mail.internet.InternetAddress;
36 import javax.mail.internet.MimeBodyPart;
37 import javax.mail.internet.MimeMessage;
38 import javax.mail.internet.MimeMultipart;
39
40 import java.io.BufferedReader;
41 import java.io.ByteArrayOutputStream;
42 import java.io.StringReader;
43 import java.sql.Connection;
44 import java.text.DecimalFormat;
45 import java.util.Collection;
46 import java.util.HashSet;
47 import java.util.Iterator;
48 import java.util.Set;
49
50 /***
51 * <P>Spam detection mailet using bayesian analysis techniques.</P>
52 *
53 * <P>Sets an email message header indicating the
54 * probability that an email message is SPAM.</P>
55 *
56 * <P>Based upon the principals described in:
57 * <a href="http://www.paulgraham.com/spam.html">A Plan For Spam</a>
58 * by Paul Graham.
59 * Extended to Paul Grahams' <a href="http://paulgraham.com/better.html">Better Bayesian Filtering</a>.</P>
60 *
61 * <P>The analysis capabilities are based on token frequencies (the <I>Corpus</I>)
62 * learned through a training process (see {@link BayesianAnalysisFeeder})
63 * and stored in a JDBC database.
64 * After a training session, the Corpus must be rebuilt from the database in order to
65 * acquire the new frequencies.
66 * Every 10 minutes a special thread in this mailet will check if any
67 * change was made to the database by the feeder, and rebuild the corpus if necessary.</p>
68 *
69 * <p>A <CODE>org.apache.james.spam.probability</CODE> mail attribute will be created
70 * containing the computed spam probability as a {@link java.lang.Double}.
71 * The <CODE>headerName</CODE> message header string will be created containing such
72 * probability in floating point representation.</p>
73 *
74 * <P>Sample configuration:</P>
75 * <PRE><CODE>
76 * <mailet match="All" class="BayesianAnalysis">
77 * <repositoryPath>db://maildb</repositoryPath>
78 * <!--
79 * Set this to the header name to add with the spam probability
80 * (default is "X-MessageIsSpamProbability").
81 * -->
82 * <headerName>X-MessageIsSpamProbability</headerName>
83 * <!--
84 * Set this to true if you want to ignore messages coming from local senders
85 * (default is false).
86 * By local sender we mean a return-path with a local server part (server listed
87 * in <servernames> in config.xml).
88 * -->
89 * <ignoreLocalSender>true</ignoreLocalSender>
90 * <!--
91 * Set this to the maximum message size (in bytes) that a message may have
92 * to be considered spam (default is 100000).
93 * -->
94 * <maxSize>100000</maxSize>
95 * </mailet>
96 * </CODE></PRE>
97 *
98 * <P>The probability of being spam is pre-pended to the subject if
99 * it is > 0.1 (10%).</P>
100 *
101 * <P>The required tables are automatically created if not already there (see sqlResources.xml).
102 * The token field in both the ham and spam tables is <B>case sensitive</B>.</P>
103 * @see BayesianAnalysisFeeder
104 * @see org.apache.james.util.BayesianAnalyzer
105 * @see org.apache.james.util.JDBCBayesianAnalyzer
106 * @version CVS $Revision: $ $Date: $
107 * @since 2.3.0
108 */
109
110 public class BayesianAnalysis
111 extends GenericMailet {
112 /***
113 * The JDBCUtil helper class
114 */
115 private final JDBCUtil theJDBCUtil = new JDBCUtil() {
116 protected void delegatedLog(String logString) {
117 log("BayesianAnalysis: " + logString);
118 }
119 };
120
121 /***
122 * The JDBCBayesianAnalyzer class that does all the work.
123 */
124 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
125 protected void delegatedLog(String logString) {
126 log("BayesianAnalysis: " + logString);
127 }
128 };
129
130 private DataSourceComponent datasource;
131 private String repositoryPath;
132
133 private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability";
134 private static final String HEADER_NAME = "X-MessageIsSpamProbability";
135 private static final long CORPUS_RELOAD_INTERVAL = 600000;
136 private String headerName;
137 private boolean ignoreLocalSender = false;
138
139 /*** The date format object used to generate RFC 822 compliant date headers. */
140 private RFC822DateFormat rfc822DateFormat = new RFC822DateFormat();
141
142 /***
143 * Return a string describing this mailet.
144 *
145 * @return a string describing this mailet
146 */
147 public String getMailetInfo() {
148 return "BayesianAnalysis Mailet";
149 }
150
151 /***
152 * Holds value of property maxSize.
153 */
154 private int maxSize = 100000;
155
156 /***
157 * Holds value of property lastCorpusLoadTime.
158 */
159 private long lastCorpusLoadTime;
160
161 /***
162 * Getter for property maxSize.
163 * @return Value of property maxSize.
164 */
165 public int getMaxSize() {
166
167 return this.maxSize;
168 }
169
170 /***
171 * Setter for property maxSize.
172 * @param maxSize New value of property maxSize.
173 */
174 public void setMaxSize(int maxSize) {
175
176 this.maxSize = maxSize;
177 }
178
179 /***
180 * Getter for property lastCorpusLoadTime.
181 * @return Value of property lastCorpusLoadTime.
182 */
183 public long getLastCorpusLoadTime() {
184
185 return this.lastCorpusLoadTime;
186 }
187
188 /***
189 * Sets lastCorpusLoadTime to System.currentTimeMillis().
190 */
191 private void touchLastCorpusLoadTime() {
192
193 this.lastCorpusLoadTime = System.currentTimeMillis();
194 }
195
196 /***
197 * Mailet initialization routine.
198 * @throws MessagingException if a problem arises
199 */
200 public void init() throws MessagingException {
201 repositoryPath = getInitParameter("repositoryPath");
202
203 if (repositoryPath == null) {
204 throw new MessagingException("repositoryPath is null");
205 }
206
207 headerName = getInitParameter("headerName",HEADER_NAME);
208
209 ignoreLocalSender = Boolean.valueOf(getInitParameter("ignoreLocalSender")).booleanValue();
210
211 if (ignoreLocalSender) {
212 log("Will ignore messages coming from local senders");
213 } else {
214 log("Will analyze messages coming from local senders");
215 }
216
217 String maxSizeParam = getInitParameter("maxSize");
218 if (maxSizeParam != null) {
219 setMaxSize(Integer.parseInt(maxSizeParam));
220 }
221 log("maxSize: " + getMaxSize());
222
223 initDb();
224
225 CorpusLoader corpusLoader = new CorpusLoader(this);
226 corpusLoader.setDaemon(true);
227 corpusLoader.start();
228
229 }
230
231 private void initDb() throws MessagingException {
232
233 try {
234 ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
235
236
237 DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
238
239
240 int stindex = repositoryPath.indexOf("://") + 3;
241
242 String datasourceName = repositoryPath.substring(stindex);
243
244 datasource = (DataSourceComponent) datasources.select(datasourceName);
245 } catch (Exception e) {
246 throw new MessagingException("Can't get datasource", e);
247 }
248
249 try {
250 analyzer.initSqlQueries(datasource.getConnection(), getMailetContext());
251 } catch (Exception e) {
252 throw new MessagingException("Exception initializing queries", e);
253 }
254
255 try {
256 loadData(datasource.getConnection());
257 } catch (java.sql.SQLException se) {
258 throw new MessagingException("SQLException loading data", se);
259 }
260 }
261
262 /***
263 * Scans the mail and determines the spam probability.
264 *
265 * @param mail The Mail message to be scanned.
266 * @throws MessagingException if a problem arises
267 */
268 public void service(Mail mail) throws MessagingException {
269
270 try {
271 MimeMessage message = mail.getMessage();
272
273 if (ignoreLocalSender) {
274
275 if (mail.getSender() != null
276 && getMailetContext().isLocalServer(mail.getSender().getHost())) {
277 return;
278 }
279 }
280
281 String [] headerArray = message.getHeader(headerName);
282
283 if (headerArray != null && headerArray.length > 0) {
284 return;
285 }
286
287 ByteArrayOutputStream baos = new ByteArrayOutputStream();
288
289 double probability;
290
291 if (message.getSize() < getMaxSize()) {
292 message.writeTo(baos);
293 probability = analyzer.computeSpamProbability(new BufferedReader(new StringReader(baos.toString())));
294 } else {
295 probability = 0.0;
296 }
297
298 mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(probability));
299 message.setHeader(headerName, Double.toString(probability));
300
301 DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat.getInstance();
302 probabilityForm.applyPattern("##0.##%");
303 String probabilityString = probabilityForm.format(probability);
304
305 String senderString;
306 if (mail.getSender() == null) {
307 senderString = "null";
308 } else {
309 senderString = mail.getSender().toString();
310 }
311 if (probability > 0.1) {
312 log(headerName
313 + ": "
314 + probabilityString
315 + "; From: "
316 + senderString
317 + "; Recipient(s): "
318 + getAddressesString(mail.getRecipients()));
319
320 appendToSubject(message,
321 " [" + probabilityString
322 + (probability > 0.9 ? " SPAM" : " spam") + "]");
323 }
324
325 saveChanges(message);
326
327 } catch (Exception e) {
328 log("Exception: "
329 + e.getMessage(), e);
330 throw new MessagingException("Exception thrown", e);
331 }
332 }
333
334 private void loadData(Connection conn)
335 throws java.sql.SQLException {
336
337 try {
338
339 synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
340 analyzer.tokenCountsClear();
341 analyzer.loadHamNSpam(conn);
342 analyzer.buildCorpus();
343 analyzer.tokenCountsClear();
344 }
345
346 log("BayesianAnalysis Corpus loaded");
347
348 touchLastCorpusLoadTime();
349
350 } finally {
351 if (conn != null) {
352 theJDBCUtil.closeJDBCConnection(conn);
353 }
354 }
355
356 }
357
358 private String getAddressesString(Collection addresses) {
359 if (addresses == null) {
360 return "null";
361 }
362
363 Iterator iter = addresses.iterator();
364 StringBuffer sb = new StringBuffer();
365 sb.append('[');
366 for (int i = 0; iter.hasNext(); i++) {
367 sb.append(iter.next());
368 if (i + 1 < addresses.size()) {
369 sb.append(", ");
370 }
371 }
372 sb.append(']');
373 return sb.toString();
374 }
375
376 private void appendToSubject(MimeMessage message, String toAppend) {
377 try {
378 String subject = message.getSubject();
379
380 if (subject == null) {
381 message.setSubject(toAppend, "iso-8859-1");
382 } else {
383 message.setSubject(toAppend + " " + subject, "iso-8859-1");
384 }
385 } catch (MessagingException ex) {}
386 }
387
388 private void sendReplyFromPostmaster(Mail mail, String stringContent) throws MessagingException {
389 try {
390 MailAddress notifier = getMailetContext().getPostmaster();
391
392 MailAddress senderMailAddress = mail.getSender();
393
394 MimeMessage message = mail.getMessage();
395
396 MimeMessage reply = new MimeMessage(Session.getDefaultInstance(System.getProperties(), null));
397
398
399 InternetAddress[] rcptAddr = new InternetAddress[1];
400 rcptAddr[0] = senderMailAddress.toInternetAddress();
401 reply.setRecipients(Message.RecipientType.TO, rcptAddr);
402
403
404 reply.setFrom(notifier.toInternetAddress());
405
406
407 MimeMultipart multipart = new MimeMultipart();
408
409 MimeBodyPart part = new MimeBodyPart();
410 part.setContent(stringContent, "text/plain");
411 part.setHeader(RFC2822Headers.CONTENT_TYPE, "text/plain");
412 multipart.addBodyPart(part);
413
414 reply.setContent(multipart);
415 reply.setHeader(RFC2822Headers.CONTENT_TYPE, multipart.getContentType());
416
417
418 Set recipients = new HashSet();
419 recipients.add(senderMailAddress);
420
421
422 if (reply.getHeader(RFC2822Headers.DATE)==null){
423 reply.setHeader(RFC2822Headers.DATE, rfc822DateFormat.format(new java.util.Date()));
424 }
425 String subject = message.getSubject();
426 if (subject == null) {
427 subject = "";
428 }
429 if (subject.indexOf("Re:") == 0){
430 reply.setSubject(subject);
431 } else {
432 reply.setSubject("Re:" + subject);
433 }
434 reply.setHeader(RFC2822Headers.IN_REPLY_TO, message.getMessageID());
435
436
437 getMailetContext().sendMail(notifier, recipients, reply);
438 } catch (Exception e) {
439 log("Exception found sending reply", e);
440 }
441 }
442
443 /***
444 * Saves changes resetting the original message id.
445 */
446 private void saveChanges(MimeMessage message) throws MessagingException {
447 String messageId = message.getMessageID();
448 message.saveChanges();
449 if (messageId != null) {
450 message.setHeader(RFC2822Headers.MESSAGE_ID, messageId);
451 }
452 }
453
454 private static class CorpusLoader extends Thread {
455
456 private BayesianAnalysis analysis;
457
458 private CorpusLoader(BayesianAnalysis analysis) {
459 super("BayesianAnalysis Corpus Loader");
460 this.analysis = analysis;
461 }
462
463 /*** Thread entry point.
464 */
465 public void run() {
466 analysis.log("CorpusLoader thread started: will wake up every " + CORPUS_RELOAD_INTERVAL + " ms");
467
468 try {
469 Thread.sleep(CORPUS_RELOAD_INTERVAL);
470
471 while (true) {
472 if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer.getLastDatabaseUpdateTime()) {
473 analysis.log("Reloading Corpus ...");
474 try {
475 analysis.loadData(analysis.datasource.getConnection());
476 analysis.log("Corpus reloaded");
477 } catch (java.sql.SQLException se) {
478 analysis.log("SQLException: ", se);
479 }
480
481 }
482
483 if (Thread.interrupted()) {
484 break;
485 }
486 Thread.sleep(CORPUS_RELOAD_INTERVAL);
487 }
488 }
489 catch (InterruptedException ex) {
490 interrupt();
491 }
492 }
493
494 }
495
496 }