1 /****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.ai.classic;
21
22 import java.io.BufferedReader;
23 import java.io.ByteArrayOutputStream;
24 import java.io.StringReader;
25 import java.sql.Connection;
26 import java.util.Enumeration;
27
28 import javax.annotation.Resource;
29 import javax.mail.Header;
30 import javax.mail.MessagingException;
31 import javax.mail.internet.MimeMessage;
32 import javax.sql.DataSource;
33
34 import org.apache.mailet.Mail;
35 import org.apache.mailet.base.GenericMailet;
36
37 /**
38 * <p>
39 * Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet.
40 * </p>
41 *
42 * <p>
43 * The new token frequencies will be stored in a JDBC database.
44 * </p>
45 *
46 * <p>
47 * Sample configuration:
48 * </p>
49 *
50 * <pre>
51 * <code>
52 * <processor name="root">
53 *
54 * <mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder">
55 * <repositoryPath> db://maildb </repositoryPath>
56 * <feedType>ham</feedType>
57 * <!--
58 * Set this to the maximum message size (in bytes) that a message may have
59 * to be analyzed (default is 100000).
60 * -->
61 * <maxSize>100000</maxSize>
62 * </mailet>
63 *
64 * <mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder">
65 * <repositoryPath> db://maildb </repositoryPath>
66 * <feedType>spam</feedType>
67 * <!--
68 * Set this to the maximum message size (in bytes) that a message may have
69 * to be analyzed (default is 100000).
70 * -->
71 * <maxSize>100000</maxSize>
72 * </mailet>
73 *
74 * <processor>
75 * </code>
76 * </pre>
77 *
78 * <p>
79 * The previous example will allow the user to send messages to the server and
80 * use the recipient email address as the indicator for whether the message is
81 * ham or spam.
82 * </p>
83 *
84 * <p>
85 * Using the example above, send good messages (ham not spam) to the email
86 * address "not.spam@thisdomain.com" to pump good messages into the feeder, and
87 * send spam messages (spam not ham) to the email address "spam@thisdomain.com"
88 * to pump spam messages into the feeder.
89 * </p>
90 *
91 * <p>
92 * The bayesian database tables will be updated during the training reflecting
93 * the new data
94 * </p>
95 *
96 * <p>
97 * At the end the mail will be destroyed (ghosted).
98 * </p>
99 *
100 * <p>
101 * <b>The correct approach is to send the original ham/spam message as an
102 * attachment to another message sent to the feeder; all the headers of the
103 * enveloping message will be removed and only the original message's tokens
104 * will be analyzed.</b>
105 * </p>
106 *
107 * <p>
108 * After a training session, the frequency <i>Corpus</i> used by
109 * <code>BayesianAnalysis</code> must be rebuilt from the database, in order to
110 * take advantage of the new token frequencies. Every 10 minutes a special
111 * thread in the <code>BayesianAnalysis</code> mailet will check if any change
112 * was made to the database, and rebuild the corpus if necessary.
113 * </p>
114 *
115 * <p>
116 * Only one message at a time is scanned (the database update activity is
117 * <i>synchronized</i>) in order to avoid too much database locking, as
118 * thousands of rows may be updated just for one message fed.
119 * </p>
120 *
121 * @see BayesianAnalysis
122 * @see BayesianAnalyzer
123 * @see JDBCBayesianAnalyzer
124 * @since 2.3.0
125 */
126
127 public class BayesianAnalysisFeeder extends GenericMailet {
128 /**
129 * The JDBCUtil helper class
130 */
131 private final JDBCUtil theJDBCUtil = new JDBCUtil() {
132 protected void delegatedLog(String logString) {
133 log("BayesianAnalysisFeeder: " + logString);
134 }
135 };
136
137 /**
138 * The JDBCBayesianAnalyzer class that does all the work.
139 */
140 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
141 protected void delegatedLog(String logString) {
142 log("BayesianAnalysisFeeder: " + logString);
143 }
144 };
145
146 private DataSource datasource;
147 private String repositoryPath;
148
149 private String feedType;
150
151 /**
152 * Return a string describing this mailet.
153 *
154 * @return a string describing this mailet
155 */
156 public String getMailetInfo() {
157 return "BayesianAnalysisFeeder Mailet";
158 }
159
160 /**
161 * Holds value of property maxSize.
162 */
163 private int maxSize = 100000;
164
165 private SystemContext fs;
166
167 /**
168 * Getter for property maxSize.
169 *
170 * @return Value of property maxSize.
171 */
172 public int getMaxSize() {
173
174 return this.maxSize;
175 }
176
177 @Resource(name = "datasource")
178 public void setDataSource(DataSource datasource) {
179 this.datasource = datasource;
180 }
181
182 /**
183 * Setter for property maxSize.
184 *
185 * @param maxSize
186 * New value of property maxSize.
187 */
188 public void setMaxSize(int maxSize) {
189
190 this.maxSize = maxSize;
191 }
192
193 @Resource(name = "filesystem")
194 public void setFileSystem(SystemContext fs) {
195 this.fs = fs;
196 }
197
198 /**
199 * Mailet initialization routine.
200 *
201 * @throws MessagingException
202 * if a problem arises
203 */
204 public void init() throws MessagingException {
205 repositoryPath = getInitParameter("repositoryPath");
206
207 if (repositoryPath == null) {
208 throw new MessagingException("repositoryPath is null");
209 }
210
211 feedType = getInitParameter("feedType");
212 if (feedType == null) {
213 throw new MessagingException("feedType is null");
214 }
215
216 String maxSizeParam = getInitParameter("maxSize");
217 if (maxSizeParam != null) {
218 setMaxSize(Integer.parseInt(maxSizeParam));
219 }
220 log("maxSize: " + getMaxSize());
221
222 initDb();
223
224 }
225
226 private void initDb() throws MessagingException {
227
228 try {
229 analyzer.initSqlQueries(datasource.getConnection(), fs.readXml("sqlResources.xml"));
230 } catch (Exception e) {
231 throw new MessagingException("Exception initializing queries", e);
232 }
233
234 }
235
236 /**
237 * Scans the mail and updates the token frequencies in the database.
238 *
239 * The method is synchronized in order to avoid too much database locking,
240 * as thousands of rows may be updated just for one message fed.
241 *
242 * @param mail
243 * The Mail message to be scanned.
244 */
245 public void service(Mail mail) {
246 boolean dbUpdated = false;
247
248 mail.setState(Mail.GHOST);
249
250 ByteArrayOutputStream baos = new ByteArrayOutputStream();
251
252 Connection conn = null;
253
254 try {
255
256 MimeMessage message = mail.getMessage();
257
258 String messageId = message.getMessageID();
259
260 if (message.getSize() > getMaxSize()) {
261 log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize());
262 return;
263 }
264
265 clearAllHeaders(message);
266
267 message.writeTo(baos);
268
269 BufferedReader br = new BufferedReader(new StringReader(baos.toString()));
270
271 // this is synchronized to avoid concurrent update of the corpus
272 synchronized (JDBCBayesianAnalyzer.DATABASE_LOCK) {
273
274 conn = datasource.getConnection();
275
276 if (conn.getAutoCommit()) {
277 conn.setAutoCommit(false);
278 }
279
280 dbUpdated = true;
281
282 // Clear out any existing word/counts etc..
283 analyzer.clear();
284
285 if ("ham".equalsIgnoreCase(feedType)) {
286 log(messageId + " Feeding HAM");
287 // Process the stream as ham (not spam).
288 analyzer.addHam(br);
289
290 // Update storage statistics.
291 analyzer.updateHamTokens(conn);
292 } else {
293 log(messageId + " Feeding SPAM");
294 // Process the stream as spam.
295 analyzer.addSpam(br);
296
297 // Update storage statistics.
298 analyzer.updateSpamTokens(conn);
299 }
300
301 // Commit our changes if necessary.
302 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
303 conn.commit();
304 dbUpdated = false;
305 log(messageId + " Training ended successfully");
306 JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
307 }
308
309 }
310
311 } catch (java.sql.SQLException se) {
312 log("SQLException: " + se.getMessage());
313 } catch (java.io.IOException ioe) {
314 log("IOException: " + ioe.getMessage());
315 } catch (javax.mail.MessagingException me) {
316 log("MessagingException: " + me.getMessage());
317 } finally {
318 // Rollback our changes if necessary.
319 try {
320 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
321 conn.rollback();
322 dbUpdated = false;
323 }
324 } catch (Exception e) {
325 }
326 theJDBCUtil.closeJDBCConnection(conn);
327 }
328 }
329
330 private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException {
331 @SuppressWarnings("rawtypes")
332 Enumeration headers = message.getAllHeaders();
333
334 while (headers.hasMoreElements()) {
335 Header header = (Header) headers.nextElement();
336 try {
337 message.removeHeader(header.getName());
338 } catch (javax.mail.MessagingException me) {
339 }
340 }
341 message.saveChanges();
342 }
343
344 }