1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.transport.mailets;
21
22 import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
23 import org.apache.avalon.excalibur.datasource.DataSourceComponent;
24 import org.apache.avalon.framework.service.ServiceManager;
25 import org.apache.james.Constants;
26 import org.apache.james.util.JDBCBayesianAnalyzer;
27 import org.apache.james.util.JDBCUtil;
28 import org.apache.mailet.GenericMailet;
29 import org.apache.mailet.Mail;
30 import org.apache.mailet.MailAddress;
31 import org.apache.mailet.RFC2822Headers;
32 import org.apache.mailet.dates.RFC822DateFormat;
33
34 import javax.mail.Message;
35 import javax.mail.MessagingException;
36 import javax.mail.Session;
37 import javax.mail.internet.InternetAddress;
38 import javax.mail.internet.MimeBodyPart;
39 import javax.mail.internet.MimeMessage;
40 import javax.mail.internet.MimeMultipart;
41
42 import java.io.BufferedReader;
43 import java.io.ByteArrayOutputStream;
44 import java.io.StringReader;
45 import java.sql.Connection;
46 import java.text.DecimalFormat;
47 import java.util.Collection;
48 import java.util.HashSet;
49 import java.util.Iterator;
50 import java.util.Set;
51
52 /***
53 * <P>Spam detection mailet using bayesian analysis techniques.</P>
54 *
55 * <P>Sets an email message header indicating the
56 * probability that an email message is SPAM.</P>
57 *
58 * <P>Based upon the principals described in:
59 * <a href="http://www.paulgraham.com/spam.html">A Plan For Spam</a>
60 * by Paul Graham.
61 * Extended to Paul Grahams' <a href="http://paulgraham.com/better.html">Better Bayesian Filtering</a>.</P>
62 *
63 * <P>The analysis capabilities are based on token frequencies (the <I>Corpus</I>)
64 * learned through a training process (see {@link BayesianAnalysisFeeder})
65 * and stored in a JDBC database.
66 * After a training session, the Corpus must be rebuilt from the database in order to
67 * acquire the new frequencies.
68 * Every 10 minutes a special thread in this mailet will check if any
69 * change was made to the database by the feeder, and rebuild the corpus if necessary.</p>
70 *
71 * <p>A <CODE>org.apache.james.spam.probability</CODE> mail attribute will be created
72 * containing the computed spam probability as a {@link java.lang.Double}.
73 * The <CODE>headerName</CODE> message header string will be created containing such
74 * probability in floating point representation.</p>
75 *
76 * <P>Sample configuration:</P>
77 * <PRE><CODE>
78 * <mailet match="All" class="BayesianAnalysis">
79 * <repositoryPath>db://maildb</repositoryPath>
80 * <!--
81 * Set this to the header name to add with the spam probability
82 * (default is "X-MessageIsSpamProbability").
83 * -->
84 * <headerName>X-MessageIsSpamProbability</headerName>
85 * <!--
86 * Set this to true if you want to ignore messages coming from local senders
87 * (default is false).
88 * By local sender we mean a return-path with a local server part (server listed
89 * in <servernames> in config.xml).
90 * -->
91 * <ignoreLocalSender>true</ignoreLocalSender>
92 * <!--
93 * Set this to the maximum message size (in bytes) that a message may have
94 * to be considered spam (default is 100000).
95 * -->
96 * <maxSize>100000</maxSize>
97 * </mailet>
98 * </CODE></PRE>
99 *
100 * <P>The probability of being spam is pre-pended to the subject if
101 * it is > 0.1 (10%).</P>
102 *
103 * <P>The required tables are automatically created if not already there (see sqlResources.xml).
104 * The token field in both the ham and spam tables is <B>case sensitive</B>.</P>
105 * @see BayesianAnalysisFeeder
106 * @see org.apache.james.util.BayesianAnalyzer
107 * @see org.apache.james.util.JDBCBayesianAnalyzer
108 * @version CVS $Revision: $ $Date: $
109 * @since 2.3.0
110 */
111
112 public class BayesianAnalysis
113 extends GenericMailet {
114 /***
115 * The JDBCUtil helper class
116 */
117 private final JDBCUtil theJDBCUtil = new JDBCUtil() {
118 protected void delegatedLog(String logString) {
119 log("BayesianAnalysis: " + logString);
120 }
121 };
122
123 /***
124 * The JDBCBayesianAnalyzer class that does all the work.
125 */
126 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
127 protected void delegatedLog(String logString) {
128 log("BayesianAnalysis: " + logString);
129 }
130 };
131
132 private DataSourceComponent datasource;
133 private String repositoryPath;
134
135 private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability";
136 private static final String HEADER_NAME = "X-MessageIsSpamProbability";
137 private static final long CORPUS_RELOAD_INTERVAL = 600000;
138 private String headerName;
139 private boolean ignoreLocalSender = false;
140
141 /*** The date format object used to generate RFC 822 compliant date headers. */
142 private RFC822DateFormat rfc822DateFormat = new RFC822DateFormat();
143
144 /***
145 * Return a string describing this mailet.
146 *
147 * @return a string describing this mailet
148 */
149 public String getMailetInfo() {
150 return "BayesianAnalysis Mailet";
151 }
152
153 /***
154 * Holds value of property maxSize.
155 */
156 private int maxSize = 100000;
157
158 /***
159 * Holds value of property lastCorpusLoadTime.
160 */
161 private long lastCorpusLoadTime;
162
163 /***
164 * Getter for property maxSize.
165 * @return Value of property maxSize.
166 */
167 public int getMaxSize() {
168
169 return this.maxSize;
170 }
171
172 /***
173 * Setter for property maxSize.
174 * @param maxSize New value of property maxSize.
175 */
176 public void setMaxSize(int maxSize) {
177
178 this.maxSize = maxSize;
179 }
180
181 /***
182 * Getter for property lastCorpusLoadTime.
183 * @return Value of property lastCorpusLoadTime.
184 */
185 public long getLastCorpusLoadTime() {
186
187 return this.lastCorpusLoadTime;
188 }
189
190 /***
191 * Sets lastCorpusLoadTime to System.currentTimeMillis().
192 */
193 private void touchLastCorpusLoadTime() {
194
195 this.lastCorpusLoadTime = System.currentTimeMillis();
196 }
197
198 /***
199 * Mailet initialization routine.
200 * @throws MessagingException if a problem arises
201 */
202 public void init() throws MessagingException {
203 repositoryPath = getInitParameter("repositoryPath");
204
205 if (repositoryPath == null) {
206 throw new MessagingException("repositoryPath is null");
207 }
208
209 headerName = getInitParameter("headerName",HEADER_NAME);
210
211 ignoreLocalSender = Boolean.valueOf(getInitParameter("ignoreLocalSender")).booleanValue();
212
213 if (ignoreLocalSender) {
214 log("Will ignore messages coming from local senders");
215 } else {
216 log("Will analyze messages coming from local senders");
217 }
218
219 String maxSizeParam = getInitParameter("maxSize");
220 if (maxSizeParam != null) {
221 setMaxSize(Integer.parseInt(maxSizeParam));
222 }
223 log("maxSize: " + getMaxSize());
224
225 initDb();
226
227 CorpusLoader corpusLoader = new CorpusLoader(this);
228 corpusLoader.setDaemon(true);
229 corpusLoader.start();
230
231 }
232
233 private void initDb() throws MessagingException {
234
235 try {
236 ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
237
238
239 DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
240
241
242 int stindex = repositoryPath.indexOf("://") + 3;
243
244 String datasourceName = repositoryPath.substring(stindex);
245
246 datasource = (DataSourceComponent) datasources.select(datasourceName);
247 } catch (Exception e) {
248 throw new MessagingException("Can't get datasource", e);
249 }
250
251 try {
252 analyzer.initSqlQueries(datasource.getConnection(), getMailetContext());
253 } catch (Exception e) {
254 throw new MessagingException("Exception initializing queries", e);
255 }
256
257 try {
258 loadData(datasource.getConnection());
259 } catch (java.sql.SQLException se) {
260 throw new MessagingException("SQLException loading data", se);
261 }
262 }
263
264 /***
265 * Scans the mail and determines the spam probability.
266 *
267 * @param mail The Mail message to be scanned.
268 * @throws MessagingException if a problem arises
269 */
270 public void service(Mail mail) throws MessagingException {
271
272 try {
273 MimeMessage message = mail.getMessage();
274
275 if (ignoreLocalSender) {
276
277 if (mail.getSender() != null
278 && getMailetContext().isLocalServer(mail.getSender().getHost())) {
279 return;
280 }
281 }
282
283 String [] headerArray = message.getHeader(headerName);
284
285 if (headerArray != null && headerArray.length > 0) {
286 return;
287 }
288
289 ByteArrayOutputStream baos = new ByteArrayOutputStream();
290
291 double probability;
292
293 if (message.getSize() < getMaxSize()) {
294 message.writeTo(baos);
295 probability = analyzer.computeSpamProbability(new BufferedReader(new StringReader(baos.toString())));
296 } else {
297 probability = 0.0;
298 }
299
300 mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(probability));
301 message.setHeader(headerName, Double.toString(probability));
302
303 DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat.getInstance();
304 probabilityForm.applyPattern("##0.##%");
305 String probabilityString = probabilityForm.format(probability);
306
307 String senderString;
308 if (mail.getSender() == null) {
309 senderString = "null";
310 } else {
311 senderString = mail.getSender().toString();
312 }
313 if (probability > 0.1) {
314 log(headerName
315 + ": "
316 + probabilityString
317 + "; From: "
318 + senderString
319 + "; Recipient(s): "
320 + getAddressesString(mail.getRecipients()));
321
322 appendToSubject(message,
323 " [" + probabilityString
324 + (probability > 0.9 ? " SPAM" : " spam") + "]");
325 }
326
327 saveChanges(message);
328
329 } catch (Exception e) {
330 log("Exception: "
331 + e.getMessage(), e);
332 throw new MessagingException("Exception thrown", e);
333 }
334 }
335
336 private void loadData(Connection conn)
337 throws java.sql.SQLException {
338
339 try {
340
341 synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
342 analyzer.tokenCountsClear();
343 analyzer.loadHamNSpam(conn);
344 analyzer.buildCorpus();
345 analyzer.tokenCountsClear();
346 }
347
348 log("BayesianAnalysis Corpus loaded");
349
350 touchLastCorpusLoadTime();
351
352 } finally {
353 if (conn != null) {
354 theJDBCUtil.closeJDBCConnection(conn);
355 }
356 }
357
358 }
359
360 private String getAddressesString(Collection addresses) {
361 if (addresses == null) {
362 return "null";
363 }
364
365 Iterator iter = addresses.iterator();
366 StringBuffer sb = new StringBuffer();
367 sb.append('[');
368 for (int i = 0; iter.hasNext(); i++) {
369 sb.append(iter.next());
370 if (i + 1 < addresses.size()) {
371 sb.append(", ");
372 }
373 }
374 sb.append(']');
375 return sb.toString();
376 }
377
378 private void appendToSubject(MimeMessage message, String toAppend) {
379 try {
380 String subject = message.getSubject();
381
382 if (subject == null) {
383 message.setSubject(toAppend, "iso-8859-1");
384 } else {
385 message.setSubject(toAppend + " " + subject, "iso-8859-1");
386 }
387 } catch (MessagingException ex) {}
388 }
389
390 private void sendReplyFromPostmaster(Mail mail, String stringContent) throws MessagingException {
391 try {
392 MailAddress notifier = getMailetContext().getPostmaster();
393
394 MailAddress senderMailAddress = mail.getSender();
395
396 MimeMessage message = mail.getMessage();
397
398 MimeMessage reply = new MimeMessage(Session.getDefaultInstance(System.getProperties(), null));
399
400
401 InternetAddress[] rcptAddr = new InternetAddress[1];
402 rcptAddr[0] = senderMailAddress.toInternetAddress();
403 reply.setRecipients(Message.RecipientType.TO, rcptAddr);
404
405
406 reply.setFrom(notifier.toInternetAddress());
407
408
409 MimeMultipart multipart = new MimeMultipart();
410
411 MimeBodyPart part = new MimeBodyPart();
412 part.setContent(stringContent, "text/plain");
413 part.setHeader(RFC2822Headers.CONTENT_TYPE, "text/plain");
414 multipart.addBodyPart(part);
415
416 reply.setContent(multipart);
417 reply.setHeader(RFC2822Headers.CONTENT_TYPE, multipart.getContentType());
418
419
420 Set recipients = new HashSet();
421 recipients.add(senderMailAddress);
422
423
424 if (reply.getHeader(RFC2822Headers.DATE)==null){
425 reply.setHeader(RFC2822Headers.DATE, rfc822DateFormat.format(new java.util.Date()));
426 }
427 String subject = message.getSubject();
428 if (subject == null) {
429 subject = "";
430 }
431 if (subject.indexOf("Re:") == 0){
432 reply.setSubject(subject);
433 } else {
434 reply.setSubject("Re:" + subject);
435 }
436 reply.setHeader(RFC2822Headers.IN_REPLY_TO, message.getMessageID());
437
438
439 getMailetContext().sendMail(notifier, recipients, reply);
440 } catch (Exception e) {
441 log("Exception found sending reply", e);
442 }
443 }
444
445 /***
446 * Saves changes resetting the original message id.
447 */
448 private void saveChanges(MimeMessage message) throws MessagingException {
449 String messageId = message.getMessageID();
450 message.saveChanges();
451 if (messageId != null) {
452 message.setHeader(RFC2822Headers.MESSAGE_ID, messageId);
453 }
454 }
455
456 private static class CorpusLoader extends Thread {
457
458 private BayesianAnalysis analysis;
459
460 private CorpusLoader(BayesianAnalysis analysis) {
461 super("BayesianAnalysis Corpus Loader");
462 this.analysis = analysis;
463 }
464
465 /*** Thread entry point.
466 */
467 public void run() {
468 analysis.log("CorpusLoader thread started: will wake up every " + CORPUS_RELOAD_INTERVAL + " ms");
469
470 try {
471 Thread.sleep(CORPUS_RELOAD_INTERVAL);
472
473 while (true) {
474 if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer.getLastDatabaseUpdateTime()) {
475 analysis.log("Reloading Corpus ...");
476 try {
477 analysis.loadData(analysis.datasource.getConnection());
478 analysis.log("Corpus reloaded");
479 } catch (java.sql.SQLException se) {
480 analysis.log("SQLException: ", se);
481 }
482
483 }
484
485 if (Thread.interrupted()) {
486 break;
487 }
488 Thread.sleep(CORPUS_RELOAD_INTERVAL);
489 }
490 }
491 catch (InterruptedException ex) {
492 interrupt();
493 }
494 }
495
496 }
497
498 }