001 /**************************************************************** 002 * Licensed to the Apache Software Foundation (ASF) under one * 003 * or more contributor license agreements. See the NOTICE file * 004 * distributed with this work for additional information * 005 * regarding copyright ownership. The ASF licenses this file * 006 * to you under the Apache License, Version 2.0 (the * 007 * "License"); you may not use this file except in compliance * 008 * with the License. You may obtain a copy of the License at * 009 * * 010 * http://www.apache.org/licenses/LICENSE-2.0 * 011 * * 012 * Unless required by applicable law or agreed to in writing, * 013 * software distributed under the License is distributed on an * 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * 015 * KIND, either express or implied. See the License for the * 016 * specific language governing permissions and limitations * 017 * under the License. * 018 ****************************************************************/ 019 020 package org.apache.james.ai.classic; 021 022 import java.io.BufferedReader; 023 import java.io.ByteArrayOutputStream; 024 import java.io.StringReader; 025 import java.sql.Connection; 026 import java.util.Enumeration; 027 028 import javax.annotation.Resource; 029 import javax.mail.Header; 030 import javax.mail.MessagingException; 031 import javax.mail.internet.MimeMessage; 032 import javax.sql.DataSource; 033 034 import org.apache.mailet.Mail; 035 import org.apache.mailet.base.GenericMailet; 036 037 /** 038 * <p> 039 * Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet. 040 * </p> 041 * 042 * <p> 043 * The new token frequencies will be stored in a JDBC database. 044 * </p> 045 * 046 * <p> 047 * Sample configuration: 048 * </p> 049 * 050 * <pre> 051 * <code> 052 * <processor name="root"> 053 * 054 * <mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder"> 055 * <repositoryPath> db://maildb </repositoryPath> 056 * <feedType>ham</feedType> 057 * <!-- 058 * Set this to the maximum message size (in bytes) that a message may have 059 * to be analyzed (default is 100000). 060 * --> 061 * <maxSize>100000</maxSize> 062 * </mailet> 063 * 064 * <mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder"> 065 * <repositoryPath> db://maildb </repositoryPath> 066 * <feedType>spam</feedType> 067 * <!-- 068 * Set this to the maximum message size (in bytes) that a message may have 069 * to be analyzed (default is 100000). 070 * --> 071 * <maxSize>100000</maxSize> 072 * </mailet> 073 * 074 * <processor> 075 * </code> 076 * </pre> 077 * 078 * <p> 079 * The previous example will allow the user to send messages to the server and 080 * use the recipient email address as the indicator for whether the message is 081 * ham or spam. 082 * </p> 083 * 084 * <p> 085 * Using the example above, send good messages (ham not spam) to the email 086 * address "not.spam@thisdomain.com" to pump good messages into the feeder, and 087 * send spam messages (spam not ham) to the email address "spam@thisdomain.com" 088 * to pump spam messages into the feeder. 089 * </p> 090 * 091 * <p> 092 * The bayesian database tables will be updated during the training reflecting 093 * the new data 094 * </p> 095 * 096 * <p> 097 * At the end the mail will be destroyed (ghosted). 098 * </p> 099 * 100 * <p> 101 * <b>The correct approach is to send the original ham/spam message as an 102 * attachment to another message sent to the feeder; all the headers of the 103 * enveloping message will be removed and only the original message's tokens 104 * will be analyzed.</b> 105 * </p> 106 * 107 * <p> 108 * After a training session, the frequency <i>Corpus</i> used by 109 * <code>BayesianAnalysis</code> must be rebuilt from the database, in order to 110 * take advantage of the new token frequencies. Every 10 minutes a special 111 * thread in the <code>BayesianAnalysis</code> mailet will check if any change 112 * was made to the database, and rebuild the corpus if necessary. 113 * </p> 114 * 115 * <p> 116 * Only one message at a time is scanned (the database update activity is 117 * <i>synchronized</i>) in order to avoid too much database locking, as 118 * thousands of rows may be updated just for one message fed. 119 * </p> 120 * 121 * @see BayesianAnalysis 122 * @see BayesianAnalyzer 123 * @see JDBCBayesianAnalyzer 124 * @since 2.3.0 125 */ 126 127 public class BayesianAnalysisFeeder extends GenericMailet { 128 /** 129 * The JDBCUtil helper class 130 */ 131 private final JDBCUtil theJDBCUtil = new JDBCUtil() { 132 protected void delegatedLog(String logString) { 133 log("BayesianAnalysisFeeder: " + logString); 134 } 135 }; 136 137 /** 138 * The JDBCBayesianAnalyzer class that does all the work. 139 */ 140 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() { 141 protected void delegatedLog(String logString) { 142 log("BayesianAnalysisFeeder: " + logString); 143 } 144 }; 145 146 private DataSource datasource; 147 private String repositoryPath; 148 149 private String feedType; 150 151 /** 152 * Return a string describing this mailet. 153 * 154 * @return a string describing this mailet 155 */ 156 public String getMailetInfo() { 157 return "BayesianAnalysisFeeder Mailet"; 158 } 159 160 /** 161 * Holds value of property maxSize. 162 */ 163 private int maxSize = 100000; 164 165 private SystemContext fs; 166 167 /** 168 * Getter for property maxSize. 169 * 170 * @return Value of property maxSize. 171 */ 172 public int getMaxSize() { 173 174 return this.maxSize; 175 } 176 177 @Resource(name = "datasource") 178 public void setDataSource(DataSource datasource) { 179 this.datasource = datasource; 180 } 181 182 /** 183 * Setter for property maxSize. 184 * 185 * @param maxSize 186 * New value of property maxSize. 187 */ 188 public void setMaxSize(int maxSize) { 189 190 this.maxSize = maxSize; 191 } 192 193 @Resource(name = "filesystem") 194 public void setFileSystem(SystemContext fs) { 195 this.fs = fs; 196 } 197 198 /** 199 * Mailet initialization routine. 200 * 201 * @throws MessagingException 202 * if a problem arises 203 */ 204 public void init() throws MessagingException { 205 repositoryPath = getInitParameter("repositoryPath"); 206 207 if (repositoryPath == null) { 208 throw new MessagingException("repositoryPath is null"); 209 } 210 211 feedType = getInitParameter("feedType"); 212 if (feedType == null) { 213 throw new MessagingException("feedType is null"); 214 } 215 216 String maxSizeParam = getInitParameter("maxSize"); 217 if (maxSizeParam != null) { 218 setMaxSize(Integer.parseInt(maxSizeParam)); 219 } 220 log("maxSize: " + getMaxSize()); 221 222 initDb(); 223 224 } 225 226 private void initDb() throws MessagingException { 227 228 try { 229 analyzer.initSqlQueries(datasource.getConnection(), fs.readXml("sqlResources.xml")); 230 } catch (Exception e) { 231 throw new MessagingException("Exception initializing queries", e); 232 } 233 234 } 235 236 /** 237 * Scans the mail and updates the token frequencies in the database. 238 * 239 * The method is synchronized in order to avoid too much database locking, 240 * as thousands of rows may be updated just for one message fed. 241 * 242 * @param mail 243 * The Mail message to be scanned. 244 */ 245 public void service(Mail mail) { 246 boolean dbUpdated = false; 247 248 mail.setState(Mail.GHOST); 249 250 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 251 252 Connection conn = null; 253 254 try { 255 256 MimeMessage message = mail.getMessage(); 257 258 String messageId = message.getMessageID(); 259 260 if (message.getSize() > getMaxSize()) { 261 log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize()); 262 return; 263 } 264 265 clearAllHeaders(message); 266 267 message.writeTo(baos); 268 269 BufferedReader br = new BufferedReader(new StringReader(baos.toString())); 270 271 // this is synchronized to avoid concurrent update of the corpus 272 synchronized (JDBCBayesianAnalyzer.DATABASE_LOCK) { 273 274 conn = datasource.getConnection(); 275 276 if (conn.getAutoCommit()) { 277 conn.setAutoCommit(false); 278 } 279 280 dbUpdated = true; 281 282 // Clear out any existing word/counts etc.. 283 analyzer.clear(); 284 285 if ("ham".equalsIgnoreCase(feedType)) { 286 log(messageId + " Feeding HAM"); 287 // Process the stream as ham (not spam). 288 analyzer.addHam(br); 289 290 // Update storage statistics. 291 analyzer.updateHamTokens(conn); 292 } else { 293 log(messageId + " Feeding SPAM"); 294 // Process the stream as spam. 295 analyzer.addSpam(br); 296 297 // Update storage statistics. 298 analyzer.updateSpamTokens(conn); 299 } 300 301 // Commit our changes if necessary. 302 if (conn != null && dbUpdated && !conn.getAutoCommit()) { 303 conn.commit(); 304 dbUpdated = false; 305 log(messageId + " Training ended successfully"); 306 JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime(); 307 } 308 309 } 310 311 } catch (java.sql.SQLException se) { 312 log("SQLException: " + se.getMessage()); 313 } catch (java.io.IOException ioe) { 314 log("IOException: " + ioe.getMessage()); 315 } catch (javax.mail.MessagingException me) { 316 log("MessagingException: " + me.getMessage()); 317 } finally { 318 // Rollback our changes if necessary. 319 try { 320 if (conn != null && dbUpdated && !conn.getAutoCommit()) { 321 conn.rollback(); 322 dbUpdated = false; 323 } 324 } catch (Exception e) { 325 } 326 theJDBCUtil.closeJDBCConnection(conn); 327 } 328 } 329 330 private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException { 331 @SuppressWarnings("rawtypes") 332 Enumeration headers = message.getAllHeaders(); 333 334 while (headers.hasMoreElements()) { 335 Header header = (Header) headers.nextElement(); 336 try { 337 message.removeHeader(header.getName()); 338 } catch (javax.mail.MessagingException me) { 339 } 340 } 341 message.saveChanges(); 342 } 343 344 }