1 /************************************************************************
2 * Copyright (c) 2000-2006 The Apache Software Foundation. *
3 * All rights reserved. *
4 * ------------------------------------------------------------------- *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you *
6 * may not use this file except in compliance with the License. You *
7 * may obtain a copy of the License at: *
8 * *
9 * http://www.apache.org/licenses/LICENSE-2.0 *
10 * *
11 * Unless required by applicable law or agreed to in writing, software *
12 * distributed under the License is distributed on an "AS IS" BASIS, *
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14 * implied. See the License for the specific language governing *
15 * permissions and limitations under the License. *
16 ***********************************************************************/
17
18 package org.apache.james.transport.mailets;
19
20 import java.io.BufferedReader;
21 import java.io.StringReader;
22 import java.io.ByteArrayOutputStream;
23
24 import java.sql.Connection;
25 import java.util.Enumeration;
26
27 import javax.mail.internet.MimeMessage;
28 import javax.mail.Header;
29 import javax.mail.MessagingException;
30
31 import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
32 import org.apache.avalon.excalibur.datasource.DataSourceComponent;
33 import org.apache.avalon.framework.service.ServiceManager;
34 import org.apache.james.Constants;
35 import org.apache.mailet.GenericMailet;
36 import org.apache.mailet.Mail;
37 import org.apache.james.util.JDBCUtil;
38
39 import org.apache.james.util.JDBCBayesianAnalyzer;
40
41 /***
42 * <P>Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet.</P>
43 *
44 * <P>The new token frequencies will be stored in a JDBC database.</P>
45 *
46 * <P>Sample configuration:</P>
47 * <PRE><CODE>
48 * <processor name="root">
49 *
50 * <mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder">
51 * <repositoryPath> db://maildb </repositoryPath>
52 * <feedType>ham</feedType>
53 * <!--
54 * Set this to the maximum message size (in bytes) that a message may have
55 * to be analyzed (default is 100000).
56 * -->
57 * <maxSize>100000</maxSize>
58 * </mailet>
59 *
60 * <mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder">
61 * <repositoryPath> db://maildb </repositoryPath>
62 * <feedType>spam</feedType>
63 * <!--
64 * Set this to the maximum message size (in bytes) that a message may have
65 * to be analyzed (default is 100000).
66 * -->
67 * <maxSize>100000</maxSize>
68 * </mailet>
69 *
70 * <processor>
71 * </CODE></PRE>
72 *
73 * <P>The previous example will allow the user to send messages to the server
74 * and use the recipient email address as the indicator for whether the message
75 * is ham or spam.</P>
76 *
77 * <P>Using the example above, send good messages (ham not spam) to the email
78 * address "not.spam@thisdomain.com" to pump good messages into the feeder,
79 * and send spam messages (spam not ham) to the email
80 * address "spam@thisdomain.com" to pump spam messages into the feeder.</P>
81 *
82 * <p>The bayesian database tables will be updated during the training reflecting
83 * the new data</p>
84 *
85 * <P>At the end the mail will be destroyed (ghosted).</P>
86 *
87 * <P><B>The correct approach is to send the original ham/spam message as an attachment
88 * to another message sent to the feeder; all the headers of the enveloping message
89 * will be removed and only the original message's tokens will be analyzed.</B></P>
90 *
91 * <p>After a training session, the frequency <i>Corpus</i> used by <CODE>BayesianAnalysis</CODE>
92 * must be rebuilt from the database, in order to take advantage of the new token frequencies.
93 * Every 10 minutes a special thread in the <CODE>BayesianAnalysis</CODE> mailet will check if any
94 * change was made to the database, and rebuild the corpus if necessary.</p>
95 *
96 * <p>Only one message at a time is scanned (the database update activity is <I>synchronized</I>)
97 * in order to avoid too much database locking,
98 * as thousands of rows may be updated just for one message fed.</p>
99 * @see BayesianAnalysis
100 * @see org.apache.james.util.BayesianAnalyzer
101 * @see org.apache.james.util.JDBCBayesianAnalyzer
102 * @version CVS $Revision: $ $Date: $
103 * @since 2.3.0
104 */
105
106 public class BayesianAnalysisFeeder
107 extends GenericMailet {
108 /***
109 * The JDBCUtil helper class
110 */
111 private final JDBCUtil theJDBCUtil = new JDBCUtil() {
112 protected void delegatedLog(String logString) {
113 log("BayesianAnalysisFeeder: " + logString);
114 }
115 };
116
117 /***
118 * The JDBCBayesianAnalyzer class that does all the work.
119 */
120 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
121 protected void delegatedLog(String logString) {
122 log("BayesianAnalysisFeeder: " + logString);
123 }
124 };
125
126 private DataSourceComponent datasource;
127 private String repositoryPath;
128
129 private String feedType;
130
131 /***
132 * Return a string describing this mailet.
133 *
134 * @return a string describing this mailet
135 */
136 public String getMailetInfo() {
137 return "BayesianAnalysisFeeder Mailet";
138 }
139
140 /***
141 * Holds value of property maxSize.
142 */
143 private int maxSize = 100000;
144
145 /***
146 * Getter for property maxSize.
147 * @return Value of property maxSize.
148 */
149 public int getMaxSize() {
150
151 return this.maxSize;
152 }
153
154 /***
155 * Setter for property maxSize.
156 * @param maxSize New value of property maxSize.
157 */
158 public void setMaxSize(int maxSize) {
159
160 this.maxSize = maxSize;
161 }
162
163 /***
164 * Mailet initialization routine.
165 * @throws MessagingException if a problem arises
166 */
167 public void init() throws MessagingException {
168 repositoryPath = getInitParameter("repositoryPath");
169
170 if (repositoryPath == null) {
171 throw new MessagingException("repositoryPath is null");
172 }
173
174 feedType = getInitParameter("feedType");
175 if (feedType == null) {
176 throw new MessagingException("feedType is null");
177 }
178
179 String maxSizeParam = getInitParameter("maxSize");
180 if (maxSizeParam != null) {
181 setMaxSize(Integer.parseInt(maxSizeParam));
182 }
183 log("maxSize: " + getMaxSize());
184
185 initDb();
186
187 }
188
189 private void initDb() throws MessagingException {
190
191 try {
192 ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
193
194
195 DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
196
197
198 int stindex = repositoryPath.indexOf("://") + 3;
199
200 String datasourceName = repositoryPath.substring(stindex);
201
202 datasource = (DataSourceComponent) datasources.select(datasourceName);
203 } catch (Exception e) {
204 throw new MessagingException("Can't get datasource", e);
205 }
206
207 try {
208 analyzer.initSqlQueries(datasource.getConnection(), getMailetContext());
209 } catch (Exception e) {
210 throw new MessagingException("Exception initializing queries", e);
211 }
212
213 }
214
215 /***
216 * Scans the mail and updates the token frequencies in the database.
217 *
218 * The method is synchronized in order to avoid too much database locking,
219 * as thousands of rows may be updated just for one message fed.
220 *
221 * @param mail The Mail message to be scanned.
222 */
223 public void service(Mail mail) {
224 boolean dbUpdated = false;
225
226 mail.setState(Mail.GHOST);
227
228 ByteArrayOutputStream baos = new ByteArrayOutputStream();
229
230 Connection conn = null;
231
232 try {
233
234 MimeMessage message = mail.getMessage();
235
236 String messageId = message.getMessageID();
237
238 if (message.getSize() > getMaxSize()) {
239 log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize());
240 return;
241 }
242
243 clearAllHeaders(message);
244
245 message.writeTo(baos);
246
247 BufferedReader br = new BufferedReader(new StringReader(baos.toString()));
248
249
250 synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
251
252 conn = datasource.getConnection();
253
254 if (conn.getAutoCommit()) {
255 conn.setAutoCommit(false);
256 }
257
258 dbUpdated = true;
259
260
261 analyzer.clear();
262
263 if ("ham".equalsIgnoreCase(feedType)) {
264 log(messageId + " Feeding HAM");
265
266 analyzer.addHam(br);
267
268
269 analyzer.updateHamTokens(conn);
270 } else {
271 log(messageId + " Feeding SPAM");
272
273 analyzer.addSpam(br);
274
275
276 analyzer.updateSpamTokens(conn);
277 }
278
279
280 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
281 conn.commit();
282 dbUpdated = false;
283 log(messageId + " Training ended successfully");
284 JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
285 }
286
287 }
288
289 } catch (java.sql.SQLException se) {
290 log("SQLException: "
291 + se.getMessage());
292 } catch (java.io.IOException ioe) {
293 log("IOException: "
294 + ioe.getMessage());
295 } catch (javax.mail.MessagingException me) {
296 log("MessagingException: "
297 + me.getMessage());
298 } finally {
299
300 try {
301 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
302 conn.rollback();
303 dbUpdated = false;
304 }
305 } catch (Exception e) {}
306 theJDBCUtil.closeJDBCConnection(conn);
307 }
308 }
309
310 private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException {
311 Enumeration headers = message.getAllHeaders();
312
313 while (headers.hasMoreElements()) {
314 Header header = (Header) headers.nextElement();
315 try {
316 message.removeHeader(header.getName());
317 } catch (javax.mail.MessagingException me) {}
318 }
319 message.saveChanges();
320 }
321
322 }