001 /****************************************************************
002 * Licensed to the Apache Software Foundation (ASF) under one *
003 * or more contributor license agreements. See the NOTICE file *
004 * distributed with this work for additional information *
005 * regarding copyright ownership. The ASF licenses this file *
006 * to you under the Apache License, Version 2.0 (the *
007 * "License"); you may not use this file except in compliance *
008 * with the License. You may obtain a copy of the License at *
009 * *
010 * http://www.apache.org/licenses/LICENSE-2.0 *
011 * *
012 * Unless required by applicable law or agreed to in writing, *
013 * software distributed under the License is distributed on an *
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015 * KIND, either express or implied. See the License for the *
016 * specific language governing permissions and limitations *
017 * under the License. *
018 ****************************************************************/
019
020 package org.apache.james.ai.classic;
021
022 import java.io.BufferedReader;
023 import java.io.ByteArrayOutputStream;
024 import java.io.StringReader;
025 import java.sql.Connection;
026 import java.text.DecimalFormat;
027 import java.util.Collection;
028 import java.util.Iterator;
029
030 import javax.annotation.Resource;
031 import javax.mail.MessagingException;
032 import javax.mail.internet.MimeMessage;
033 import javax.sql.DataSource;
034
035 import org.apache.mailet.Mail;
036 import org.apache.mailet.MailAddress;
037 import org.apache.mailet.base.GenericMailet;
038 import org.apache.mailet.base.RFC2822Headers;
039
040 /**
041 * <p>
042 * Spam detection mailet using bayesian analysis techniques.
043 * </p>
044 *
045 * <p>
046 * Sets an email message header indicating the probability that an email message
047 * is SPAM.
048 * </p>
049 *
050 * <p>
051 * Based upon the principals described in: <a
052 * href="http://www.paulgraham.com/spam.html">A Plan For Spam</a> by Paul
053 * Graham. Extended to Paul Grahams' <a
054 * href="http://paulgraham.com/better.html">Better Bayesian Filtering</a>.
055 * </p>
056 *
057 * <p>
058 * The analysis capabilities are based on token frequencies (the <i>Corpus</i>)
059 * learned through a training process (see {@link BayesianAnalysisFeeder}) and
060 * stored in a JDBC database. After a training session, the Corpus must be
061 * rebuilt from the database in order to acquire the new frequencies. Every 10
062 * minutes a special thread in this mailet will check if any change was made to
063 * the database by the feeder, and rebuild the corpus if necessary.
064 * </p>
065 *
066 * <p>
067 * A <code>org.apache.james.spam.probability</code> mail attribute will be
068 * created containing the computed spam probability as a
069 * {@link java.lang.Double}. The <code>headerName</code> message header string
070 * will be created containing such probability in floating point representation.
071 * </p>
072 *
073 * <p>
074 * Sample configuration:
075 * </p>
076 *
077 * <pre>
078 * <code>
079 * <mailet match="All" class="BayesianAnalysis">
080 * <repositoryPath>db://maildb</repositoryPath>
081 * <!--
082 * Set this to the header name to add with the spam probability
083 * (default is "X-MessageIsSpamProbability").
084 * -->
085 * <headerName>X-MessageIsSpamProbability</headerName>
086 * <!--
087 * Set this to true if you want to ignore messages coming from local senders
088 * (default is false).
089 * By local sender we mean a return-path with a local server part (server listed
090 * in <servernames> in config.xml).
091 * -->
092 * <ignoreLocalSender>true</ignoreLocalSender>
093 * <!--
094 * Set this to the maximum message size (in bytes) that a message may have
095 * to be considered spam (default is 100000).
096 * -->
097 * <maxSize>100000</maxSize>
098 * <!--
099 * Set this to false if you not want to tag the message if spam is detected (Default is true).
100 * -->
101 * <tagSubject>true</tagSubject>
102 * </mailet>
103 * </code>
104 * </pre>
105 *
106 * <p>
107 * The probability of being spam is pre-pended to the subject if it is > 0.1
108 * (10%).
109 * </p>
110 *
111 * <p>
112 * The required tables are automatically created if not already there (see
113 * sqlResources.xml). The token field in both the ham and spam tables is <b>case
114 * sensitive</b>.
115 * </p>
116 *
117 * @see BayesianAnalysisFeeder
118 * @see BayesianAnalyzer
119 * @see JDBCBayesianAnalyzer
120 * @since 2.3.0
121 */
122
123 public class BayesianAnalysis extends GenericMailet {
124 /**
125 * The JDBCUtil helper class
126 */
127 private final JDBCUtil theJDBCUtil = new JDBCUtil() {
128 protected void delegatedLog(String logString) {
129 log("BayesianAnalysis: " + logString);
130 }
131 };
132
133 /**
134 * The JDBCBayesianAnalyzer class that does all the work.
135 */
136 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
137 protected void delegatedLog(String logString) {
138 log("BayesianAnalysis: " + logString);
139 }
140 };
141
142 private DataSource datasource;
143 private String repositoryPath;
144
145 private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability";
146 private static final String HEADER_NAME = "X-MessageIsSpamProbability";
147 private static final long CORPUS_RELOAD_INTERVAL = 600000;
148 private String headerName;
149 private boolean ignoreLocalSender = false;
150 private boolean tagSubject = true;
151
152 /**
153 * Return a string describing this mailet.
154 *
155 * @return a string describing this mailet
156 */
157 public String getMailetInfo() {
158 return "BayesianAnalysis Mailet";
159 }
160
161 /**
162 * Holds value of property maxSize.
163 */
164 private int maxSize = 100000;
165
166 /**
167 * Holds value of property lastCorpusLoadTime.
168 */
169 private long lastCorpusLoadTime;
170
171 private SystemContext fs;
172
173 /**
174 * Getter for property maxSize.
175 *
176 * @return Value of property maxSize.
177 */
178 public int getMaxSize() {
179
180 return this.maxSize;
181 }
182
183 /**
184 * Setter for property maxSize.
185 *
186 * @param maxSize
187 * New value of property maxSize.
188 */
189 public void setMaxSize(int maxSize) {
190
191 this.maxSize = maxSize;
192 }
193
194 /**
195 * Getter for property lastCorpusLoadTime.
196 *
197 * @return Value of property lastCorpusLoadTime.
198 */
199 public long getLastCorpusLoadTime() {
200
201 return this.lastCorpusLoadTime;
202 }
203
204 @Resource(name = "datasource")
205 public void setDataSource(DataSource datasource) {
206 this.datasource = datasource;
207 }
208
209 @Resource(name = "filesystem")
210 public void setFileSystem(SystemContext fs) {
211 this.fs = fs;
212 }
213
214 /**
215 * Sets lastCorpusLoadTime to System.currentTimeMillis().
216 */
217 private void touchLastCorpusLoadTime() {
218
219 this.lastCorpusLoadTime = System.currentTimeMillis();
220 }
221
222 /**
223 * Mailet initialization routine.
224 *
225 * @throws MessagingException
226 * if a problem arises
227 */
228 public void init() throws MessagingException {
229 repositoryPath = getInitParameter("repositoryPath");
230
231 if (repositoryPath == null) {
232 throw new MessagingException("repositoryPath is null");
233 }
234
235 headerName = getInitParameter("headerName", HEADER_NAME);
236
237 ignoreLocalSender = Boolean.valueOf(getInitParameter("ignoreLocalSender")).booleanValue();
238
239 if (ignoreLocalSender) {
240 log("Will ignore messages coming from local senders");
241 } else {
242 log("Will analyze messages coming from local senders");
243 }
244
245 String maxSizeParam = getInitParameter("maxSize");
246 if (maxSizeParam != null) {
247 setMaxSize(Integer.parseInt(maxSizeParam));
248 }
249 log("maxSize: " + getMaxSize());
250
251 String tag = getInitParameter("tagSubject");
252 if (tag != null && tag.equals("false")) {
253 tagSubject = false;
254 }
255
256 initDb();
257
258 CorpusLoader corpusLoader = new CorpusLoader(this);
259 corpusLoader.setDaemon(true);
260 corpusLoader.start();
261
262 }
263
264 private void initDb() throws MessagingException {
265
266 try {
267 analyzer.initSqlQueries(datasource.getConnection(), fs.readXml("sqlResources.xml"));
268 } catch (Exception e) {
269 throw new MessagingException("Exception initializing queries", e);
270 }
271
272 try {
273 loadData(datasource.getConnection());
274 } catch (java.sql.SQLException se) {
275 throw new MessagingException("SQLException loading data", se);
276 }
277 }
278
279 /**
280 * Scans the mail and determines the spam probability.
281 *
282 * @param mail
283 * The Mail message to be scanned.
284 * @throws MessagingException
285 * if a problem arises
286 */
287 public void service(Mail mail) throws MessagingException {
288
289 try {
290 MimeMessage message = mail.getMessage();
291
292 if (ignoreLocalSender) {
293 // ignore the message if the sender is local
294 if (mail.getSender() != null && getMailetContext().isLocalServer(mail.getSender().getDomain())) {
295 return;
296 }
297 }
298
299 String[] headerArray = message.getHeader(headerName);
300 // ignore the message if already analyzed
301 if (headerArray != null && headerArray.length > 0) {
302 return;
303 }
304
305 ByteArrayOutputStream baos = new ByteArrayOutputStream();
306
307 double probability;
308
309 if (message.getSize() < getMaxSize()) {
310 message.writeTo(baos);
311 probability = analyzer.computeSpamProbability(new BufferedReader(new StringReader(baos.toString())));
312 } else {
313 probability = 0.0;
314 }
315
316 mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(probability));
317 message.setHeader(headerName, Double.toString(probability));
318
319 DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat.getInstance();
320 probabilityForm.applyPattern("##0.##%");
321 String probabilityString = probabilityForm.format(probability);
322
323 String senderString;
324 if (mail.getSender() == null) {
325 senderString = "null";
326 } else {
327 senderString = mail.getSender().toString();
328 }
329 if (probability > 0.1) {
330 @SuppressWarnings("unchecked")
331 final Collection<MailAddress> recipients = mail.getRecipients();
332 log(headerName + ": " + probabilityString + "; From: " + senderString + "; Recipient(s): " + getAddressesString(recipients));
333
334 // Check if we should tag the subject
335 if (tagSubject) {
336 appendToSubject(message, " [" + probabilityString + (probability > 0.9 ? " SPAM" : " spam") + "]");
337 }
338 }
339
340 saveChanges(message);
341
342 } catch (Exception e) {
343 log("Exception: " + e.getMessage(), e);
344 throw new MessagingException("Exception thrown", e);
345 }
346 }
347
348 private void loadData(Connection conn) throws java.sql.SQLException {
349
350 try {
351 // this is synchronized to avoid concurrent update of the corpus
352 synchronized (JDBCBayesianAnalyzer.DATABASE_LOCK) {
353 analyzer.tokenCountsClear();
354 analyzer.loadHamNSpam(conn);
355 analyzer.buildCorpus();
356 analyzer.tokenCountsClear();
357 }
358
359 log("BayesianAnalysis Corpus loaded");
360
361 touchLastCorpusLoadTime();
362
363 } finally {
364 if (conn != null) {
365 theJDBCUtil.closeJDBCConnection(conn);
366 }
367 }
368
369 }
370
371 private String getAddressesString(Collection<MailAddress> addresses) {
372 if (addresses == null) {
373 return "null";
374 }
375
376 Iterator<MailAddress> iter = addresses.iterator();
377 StringBuffer sb = new StringBuffer();
378 sb.append('[');
379 for (int i = 0; iter.hasNext(); i++) {
380 sb.append(iter.next());
381 if (i + 1 < addresses.size()) {
382 sb.append(", ");
383 }
384 }
385 sb.append(']');
386 return sb.toString();
387 }
388
389 private void appendToSubject(MimeMessage message, String toAppend) {
390 try {
391 String subject = message.getSubject();
392
393 if (subject == null) {
394 message.setSubject(toAppend, "iso-8859-1");
395 } else {
396 message.setSubject(toAppend + " " + subject, "iso-8859-1");
397 }
398 } catch (MessagingException ex) {
399 }
400 }
401
402 /**
403 * Saves changes resetting the original message id.
404 */
405 private void saveChanges(MimeMessage message) throws MessagingException {
406 String messageId = message.getMessageID();
407 message.saveChanges();
408 if (messageId != null) {
409 message.setHeader(RFC2822Headers.MESSAGE_ID, messageId);
410 }
411 }
412
413 private static class CorpusLoader extends Thread {
414
415 private BayesianAnalysis analysis;
416
417 private CorpusLoader(BayesianAnalysis analysis) {
418 super("BayesianAnalysis Corpus Loader");
419 this.analysis = analysis;
420 }
421
422 /**
423 * Thread entry point.
424 */
425 public void run() {
426 analysis.log("CorpusLoader thread started: will wake up every " + CORPUS_RELOAD_INTERVAL + " ms");
427
428 try {
429 Thread.sleep(CORPUS_RELOAD_INTERVAL);
430
431 while (true) {
432 if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer.getLastDatabaseUpdateTime()) {
433 analysis.log("Reloading Corpus ...");
434 try {
435 analysis.loadData(analysis.datasource.getConnection());
436 analysis.log("Corpus reloaded");
437 } catch (java.sql.SQLException se) {
438 analysis.log("SQLException: ", se);
439 }
440
441 }
442
443 if (Thread.interrupted()) {
444 break;
445 }
446 Thread.sleep(CORPUS_RELOAD_INTERVAL);
447 }
448 } catch (InterruptedException ex) {
449 interrupt();
450 }
451 }
452
453 }
454
455 }