001 /**************************************************************** 002 * Licensed to the Apache Software Foundation (ASF) under one * 003 * or more contributor license agreements. See the NOTICE file * 004 * distributed with this work for additional information * 005 * regarding copyright ownership. The ASF licenses this file * 006 * to you under the Apache License, Version 2.0 (the * 007 * "License"); you may not use this file except in compliance * 008 * with the License. You may obtain a copy of the License at * 009 * * 010 * http://www.apache.org/licenses/LICENSE-2.0 * 011 * * 012 * Unless required by applicable law or agreed to in writing, * 013 * software distributed under the License is distributed on an * 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * 015 * KIND, either express or implied. See the License for the * 016 * specific language governing permissions and limitations * 017 * under the License. * 018 ****************************************************************/ 019 020 package org.apache.james.ai.classic; 021 022 import java.io.BufferedReader; 023 import java.io.ByteArrayOutputStream; 024 import java.io.StringReader; 025 import java.sql.Connection; 026 import java.text.DecimalFormat; 027 import java.util.Collection; 028 import java.util.Iterator; 029 030 import javax.annotation.Resource; 031 import javax.mail.MessagingException; 032 import javax.mail.internet.MimeMessage; 033 import javax.sql.DataSource; 034 035 import org.apache.mailet.Mail; 036 import org.apache.mailet.MailAddress; 037 import org.apache.mailet.base.GenericMailet; 038 import org.apache.mailet.base.RFC2822Headers; 039 040 /** 041 * <p> 042 * Spam detection mailet using bayesian analysis techniques. 043 * </p> 044 * 045 * <p> 046 * Sets an email message header indicating the probability that an email message 047 * is SPAM. 048 * </p> 049 * 050 * <p> 051 * Based upon the principals described in: <a 052 * href="http://www.paulgraham.com/spam.html">A Plan For Spam</a> by Paul 053 * Graham. Extended to Paul Grahams' <a 054 * href="http://paulgraham.com/better.html">Better Bayesian Filtering</a>. 055 * </p> 056 * 057 * <p> 058 * The analysis capabilities are based on token frequencies (the <i>Corpus</i>) 059 * learned through a training process (see {@link BayesianAnalysisFeeder}) and 060 * stored in a JDBC database. After a training session, the Corpus must be 061 * rebuilt from the database in order to acquire the new frequencies. Every 10 062 * minutes a special thread in this mailet will check if any change was made to 063 * the database by the feeder, and rebuild the corpus if necessary. 064 * </p> 065 * 066 * <p> 067 * A <code>org.apache.james.spam.probability</code> mail attribute will be 068 * created containing the computed spam probability as a 069 * {@link java.lang.Double}. The <code>headerName</code> message header string 070 * will be created containing such probability in floating point representation. 071 * </p> 072 * 073 * <p> 074 * Sample configuration: 075 * </p> 076 * 077 * <pre> 078 * <code> 079 * <mailet match="All" class="BayesianAnalysis"> 080 * <repositoryPath>db://maildb</repositoryPath> 081 * <!-- 082 * Set this to the header name to add with the spam probability 083 * (default is "X-MessageIsSpamProbability"). 084 * --> 085 * <headerName>X-MessageIsSpamProbability</headerName> 086 * <!-- 087 * Set this to true if you want to ignore messages coming from local senders 088 * (default is false). 089 * By local sender we mean a return-path with a local server part (server listed 090 * in <servernames> in config.xml). 091 * --> 092 * <ignoreLocalSender>true</ignoreLocalSender> 093 * <!-- 094 * Set this to the maximum message size (in bytes) that a message may have 095 * to be considered spam (default is 100000). 096 * --> 097 * <maxSize>100000</maxSize> 098 * <!-- 099 * Set this to false if you not want to tag the message if spam is detected (Default is true). 100 * --> 101 * <tagSubject>true</tagSubject> 102 * </mailet> 103 * </code> 104 * </pre> 105 * 106 * <p> 107 * The probability of being spam is pre-pended to the subject if it is > 0.1 108 * (10%). 109 * </p> 110 * 111 * <p> 112 * The required tables are automatically created if not already there (see 113 * sqlResources.xml). The token field in both the ham and spam tables is <b>case 114 * sensitive</b>. 115 * </p> 116 * 117 * @see BayesianAnalysisFeeder 118 * @see BayesianAnalyzer 119 * @see JDBCBayesianAnalyzer 120 * @since 2.3.0 121 */ 122 123 public class BayesianAnalysis extends GenericMailet { 124 /** 125 * The JDBCUtil helper class 126 */ 127 private final JDBCUtil theJDBCUtil = new JDBCUtil() { 128 protected void delegatedLog(String logString) { 129 log("BayesianAnalysis: " + logString); 130 } 131 }; 132 133 /** 134 * The JDBCBayesianAnalyzer class that does all the work. 135 */ 136 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() { 137 protected void delegatedLog(String logString) { 138 log("BayesianAnalysis: " + logString); 139 } 140 }; 141 142 private DataSource datasource; 143 private String repositoryPath; 144 145 private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability"; 146 private static final String HEADER_NAME = "X-MessageIsSpamProbability"; 147 private static final long CORPUS_RELOAD_INTERVAL = 600000; 148 private String headerName; 149 private boolean ignoreLocalSender = false; 150 private boolean tagSubject = true; 151 152 /** 153 * Return a string describing this mailet. 154 * 155 * @return a string describing this mailet 156 */ 157 public String getMailetInfo() { 158 return "BayesianAnalysis Mailet"; 159 } 160 161 /** 162 * Holds value of property maxSize. 163 */ 164 private int maxSize = 100000; 165 166 /** 167 * Holds value of property lastCorpusLoadTime. 168 */ 169 private long lastCorpusLoadTime; 170 171 private SystemContext fs; 172 173 /** 174 * Getter for property maxSize. 175 * 176 * @return Value of property maxSize. 177 */ 178 public int getMaxSize() { 179 180 return this.maxSize; 181 } 182 183 /** 184 * Setter for property maxSize. 185 * 186 * @param maxSize 187 * New value of property maxSize. 188 */ 189 public void setMaxSize(int maxSize) { 190 191 this.maxSize = maxSize; 192 } 193 194 /** 195 * Getter for property lastCorpusLoadTime. 196 * 197 * @return Value of property lastCorpusLoadTime. 198 */ 199 public long getLastCorpusLoadTime() { 200 201 return this.lastCorpusLoadTime; 202 } 203 204 @Resource(name = "datasource") 205 public void setDataSource(DataSource datasource) { 206 this.datasource = datasource; 207 } 208 209 @Resource(name = "filesystem") 210 public void setFileSystem(SystemContext fs) { 211 this.fs = fs; 212 } 213 214 /** 215 * Sets lastCorpusLoadTime to System.currentTimeMillis(). 216 */ 217 private void touchLastCorpusLoadTime() { 218 219 this.lastCorpusLoadTime = System.currentTimeMillis(); 220 } 221 222 /** 223 * Mailet initialization routine. 224 * 225 * @throws MessagingException 226 * if a problem arises 227 */ 228 public void init() throws MessagingException { 229 repositoryPath = getInitParameter("repositoryPath"); 230 231 if (repositoryPath == null) { 232 throw new MessagingException("repositoryPath is null"); 233 } 234 235 headerName = getInitParameter("headerName", HEADER_NAME); 236 237 ignoreLocalSender = Boolean.valueOf(getInitParameter("ignoreLocalSender")).booleanValue(); 238 239 if (ignoreLocalSender) { 240 log("Will ignore messages coming from local senders"); 241 } else { 242 log("Will analyze messages coming from local senders"); 243 } 244 245 String maxSizeParam = getInitParameter("maxSize"); 246 if (maxSizeParam != null) { 247 setMaxSize(Integer.parseInt(maxSizeParam)); 248 } 249 log("maxSize: " + getMaxSize()); 250 251 String tag = getInitParameter("tagSubject"); 252 if (tag != null && tag.equals("false")) { 253 tagSubject = false; 254 } 255 256 initDb(); 257 258 CorpusLoader corpusLoader = new CorpusLoader(this); 259 corpusLoader.setDaemon(true); 260 corpusLoader.start(); 261 262 } 263 264 private void initDb() throws MessagingException { 265 266 try { 267 analyzer.initSqlQueries(datasource.getConnection(), fs.readXml("sqlResources.xml")); 268 } catch (Exception e) { 269 throw new MessagingException("Exception initializing queries", e); 270 } 271 272 try { 273 loadData(datasource.getConnection()); 274 } catch (java.sql.SQLException se) { 275 throw new MessagingException("SQLException loading data", se); 276 } 277 } 278 279 /** 280 * Scans the mail and determines the spam probability. 281 * 282 * @param mail 283 * The Mail message to be scanned. 284 * @throws MessagingException 285 * if a problem arises 286 */ 287 public void service(Mail mail) throws MessagingException { 288 289 try { 290 MimeMessage message = mail.getMessage(); 291 292 if (ignoreLocalSender) { 293 // ignore the message if the sender is local 294 if (mail.getSender() != null && getMailetContext().isLocalServer(mail.getSender().getDomain())) { 295 return; 296 } 297 } 298 299 String[] headerArray = message.getHeader(headerName); 300 // ignore the message if already analyzed 301 if (headerArray != null && headerArray.length > 0) { 302 return; 303 } 304 305 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 306 307 double probability; 308 309 if (message.getSize() < getMaxSize()) { 310 message.writeTo(baos); 311 probability = analyzer.computeSpamProbability(new BufferedReader(new StringReader(baos.toString()))); 312 } else { 313 probability = 0.0; 314 } 315 316 mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(probability)); 317 message.setHeader(headerName, Double.toString(probability)); 318 319 DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat.getInstance(); 320 probabilityForm.applyPattern("##0.##%"); 321 String probabilityString = probabilityForm.format(probability); 322 323 String senderString; 324 if (mail.getSender() == null) { 325 senderString = "null"; 326 } else { 327 senderString = mail.getSender().toString(); 328 } 329 if (probability > 0.1) { 330 @SuppressWarnings("unchecked") 331 final Collection<MailAddress> recipients = mail.getRecipients(); 332 log(headerName + ": " + probabilityString + "; From: " + senderString + "; Recipient(s): " + getAddressesString(recipients)); 333 334 // Check if we should tag the subject 335 if (tagSubject) { 336 appendToSubject(message, " [" + probabilityString + (probability > 0.9 ? " SPAM" : " spam") + "]"); 337 } 338 } 339 340 saveChanges(message); 341 342 } catch (Exception e) { 343 log("Exception: " + e.getMessage(), e); 344 throw new MessagingException("Exception thrown", e); 345 } 346 } 347 348 private void loadData(Connection conn) throws java.sql.SQLException { 349 350 try { 351 // this is synchronized to avoid concurrent update of the corpus 352 synchronized (JDBCBayesianAnalyzer.DATABASE_LOCK) { 353 analyzer.tokenCountsClear(); 354 analyzer.loadHamNSpam(conn); 355 analyzer.buildCorpus(); 356 analyzer.tokenCountsClear(); 357 } 358 359 log("BayesianAnalysis Corpus loaded"); 360 361 touchLastCorpusLoadTime(); 362 363 } finally { 364 if (conn != null) { 365 theJDBCUtil.closeJDBCConnection(conn); 366 } 367 } 368 369 } 370 371 private String getAddressesString(Collection<MailAddress> addresses) { 372 if (addresses == null) { 373 return "null"; 374 } 375 376 Iterator<MailAddress> iter = addresses.iterator(); 377 StringBuffer sb = new StringBuffer(); 378 sb.append('['); 379 for (int i = 0; iter.hasNext(); i++) { 380 sb.append(iter.next()); 381 if (i + 1 < addresses.size()) { 382 sb.append(", "); 383 } 384 } 385 sb.append(']'); 386 return sb.toString(); 387 } 388 389 private void appendToSubject(MimeMessage message, String toAppend) { 390 try { 391 String subject = message.getSubject(); 392 393 if (subject == null) { 394 message.setSubject(toAppend, "iso-8859-1"); 395 } else { 396 message.setSubject(toAppend + " " + subject, "iso-8859-1"); 397 } 398 } catch (MessagingException ex) { 399 } 400 } 401 402 /** 403 * Saves changes resetting the original message id. 404 */ 405 private void saveChanges(MimeMessage message) throws MessagingException { 406 String messageId = message.getMessageID(); 407 message.saveChanges(); 408 if (messageId != null) { 409 message.setHeader(RFC2822Headers.MESSAGE_ID, messageId); 410 } 411 } 412 413 private static class CorpusLoader extends Thread { 414 415 private BayesianAnalysis analysis; 416 417 private CorpusLoader(BayesianAnalysis analysis) { 418 super("BayesianAnalysis Corpus Loader"); 419 this.analysis = analysis; 420 } 421 422 /** 423 * Thread entry point. 424 */ 425 public void run() { 426 analysis.log("CorpusLoader thread started: will wake up every " + CORPUS_RELOAD_INTERVAL + " ms"); 427 428 try { 429 Thread.sleep(CORPUS_RELOAD_INTERVAL); 430 431 while (true) { 432 if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer.getLastDatabaseUpdateTime()) { 433 analysis.log("Reloading Corpus ..."); 434 try { 435 analysis.loadData(analysis.datasource.getConnection()); 436 analysis.log("Corpus reloaded"); 437 } catch (java.sql.SQLException se) { 438 analysis.log("SQLException: ", se); 439 } 440 441 } 442 443 if (Thread.interrupted()) { 444 break; 445 } 446 Thread.sleep(CORPUS_RELOAD_INTERVAL); 447 } 448 } catch (InterruptedException ex) { 449 interrupt(); 450 } 451 } 452 453 } 454 455 }