1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.transport.mailets;
21
22 import java.io.BufferedReader;
23 import java.io.StringReader;
24 import java.io.ByteArrayOutputStream;
25
26 import java.sql.Connection;
27 import java.util.Enumeration;
28
29 import javax.mail.internet.MimeMessage;
30 import javax.mail.Header;
31 import javax.mail.MessagingException;
32
33 import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
34 import org.apache.avalon.excalibur.datasource.DataSourceComponent;
35 import org.apache.avalon.framework.service.ServiceManager;
36 import org.apache.james.Constants;
37 import org.apache.mailet.GenericMailet;
38 import org.apache.mailet.Mail;
39 import org.apache.james.util.JDBCUtil;
40
41 import org.apache.james.util.JDBCBayesianAnalyzer;
42
43 /***
44 * <P>Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet.</P>
45 *
46 * <P>The new token frequencies will be stored in a JDBC database.</P>
47 *
48 * <P>Sample configuration:</P>
49 * <PRE><CODE>
50 * <processor name="root">
51 *
52 * <mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder">
53 * <repositoryPath> db://maildb </repositoryPath>
54 * <feedType>ham</feedType>
55 * <!--
56 * Set this to the maximum message size (in bytes) that a message may have
57 * to be analyzed (default is 100000).
58 * -->
59 * <maxSize>100000</maxSize>
60 * </mailet>
61 *
62 * <mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder">
63 * <repositoryPath> db://maildb </repositoryPath>
64 * <feedType>spam</feedType>
65 * <!--
66 * Set this to the maximum message size (in bytes) that a message may have
67 * to be analyzed (default is 100000).
68 * -->
69 * <maxSize>100000</maxSize>
70 * </mailet>
71 *
72 * <processor>
73 * </CODE></PRE>
74 *
75 * <P>The previous example will allow the user to send messages to the server
76 * and use the recipient email address as the indicator for whether the message
77 * is ham or spam.</P>
78 *
79 * <P>Using the example above, send good messages (ham not spam) to the email
80 * address "not.spam@thisdomain.com" to pump good messages into the feeder,
81 * and send spam messages (spam not ham) to the email
82 * address "spam@thisdomain.com" to pump spam messages into the feeder.</P>
83 *
84 * <p>The bayesian database tables will be updated during the training reflecting
85 * the new data</p>
86 *
87 * <P>At the end the mail will be destroyed (ghosted).</P>
88 *
89 * <P><B>The correct approach is to send the original ham/spam message as an attachment
90 * to another message sent to the feeder; all the headers of the enveloping message
91 * will be removed and only the original message's tokens will be analyzed.</B></P>
92 *
93 * <p>After a training session, the frequency <i>Corpus</i> used by <CODE>BayesianAnalysis</CODE>
94 * must be rebuilt from the database, in order to take advantage of the new token frequencies.
95 * Every 10 minutes a special thread in the <CODE>BayesianAnalysis</CODE> mailet will check if any
96 * change was made to the database, and rebuild the corpus if necessary.</p>
97 *
98 * <p>Only one message at a time is scanned (the database update activity is <I>synchronized</I>)
99 * in order to avoid too much database locking,
100 * as thousands of rows may be updated just for one message fed.</p>
101 * @see BayesianAnalysis
102 * @see org.apache.james.util.BayesianAnalyzer
103 * @see org.apache.james.util.JDBCBayesianAnalyzer
104 * @version CVS $Revision: $ $Date: $
105 * @since 2.3.0
106 */
107
108 public class BayesianAnalysisFeeder
109 extends GenericMailet {
110 /***
111 * The JDBCUtil helper class
112 */
113 private final JDBCUtil theJDBCUtil = new JDBCUtil() {
114 protected void delegatedLog(String logString) {
115 log("BayesianAnalysisFeeder: " + logString);
116 }
117 };
118
119 /***
120 * The JDBCBayesianAnalyzer class that does all the work.
121 */
122 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
123 protected void delegatedLog(String logString) {
124 log("BayesianAnalysisFeeder: " + logString);
125 }
126 };
127
128 private DataSourceComponent datasource;
129 private String repositoryPath;
130
131 private String feedType;
132
133 /***
134 * Return a string describing this mailet.
135 *
136 * @return a string describing this mailet
137 */
138 public String getMailetInfo() {
139 return "BayesianAnalysisFeeder Mailet";
140 }
141
142 /***
143 * Holds value of property maxSize.
144 */
145 private int maxSize = 100000;
146
147 /***
148 * Getter for property maxSize.
149 * @return Value of property maxSize.
150 */
151 public int getMaxSize() {
152
153 return this.maxSize;
154 }
155
156 /***
157 * Setter for property maxSize.
158 * @param maxSize New value of property maxSize.
159 */
160 public void setMaxSize(int maxSize) {
161
162 this.maxSize = maxSize;
163 }
164
165 /***
166 * Mailet initialization routine.
167 * @throws MessagingException if a problem arises
168 */
169 public void init() throws MessagingException {
170 repositoryPath = getInitParameter("repositoryPath");
171
172 if (repositoryPath == null) {
173 throw new MessagingException("repositoryPath is null");
174 }
175
176 feedType = getInitParameter("feedType");
177 if (feedType == null) {
178 throw new MessagingException("feedType is null");
179 }
180
181 String maxSizeParam = getInitParameter("maxSize");
182 if (maxSizeParam != null) {
183 setMaxSize(Integer.parseInt(maxSizeParam));
184 }
185 log("maxSize: " + getMaxSize());
186
187 initDb();
188
189 }
190
191 private void initDb() throws MessagingException {
192
193 try {
194 ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
195
196
197 DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
198
199
200 int stindex = repositoryPath.indexOf("://") + 3;
201
202 String datasourceName = repositoryPath.substring(stindex);
203
204 datasource = (DataSourceComponent) datasources.select(datasourceName);
205 } catch (Exception e) {
206 throw new MessagingException("Can't get datasource", e);
207 }
208
209 try {
210 analyzer.initSqlQueries(datasource.getConnection(), getMailetContext());
211 } catch (Exception e) {
212 throw new MessagingException("Exception initializing queries", e);
213 }
214
215 }
216
217 /***
218 * Scans the mail and updates the token frequencies in the database.
219 *
220 * The method is synchronized in order to avoid too much database locking,
221 * as thousands of rows may be updated just for one message fed.
222 *
223 * @param mail The Mail message to be scanned.
224 */
225 public void service(Mail mail) {
226 boolean dbUpdated = false;
227
228 mail.setState(Mail.GHOST);
229
230 ByteArrayOutputStream baos = new ByteArrayOutputStream();
231
232 Connection conn = null;
233
234 try {
235
236 MimeMessage message = mail.getMessage();
237
238 String messageId = message.getMessageID();
239
240 if (message.getSize() > getMaxSize()) {
241 log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize());
242 return;
243 }
244
245 clearAllHeaders(message);
246
247 message.writeTo(baos);
248
249 BufferedReader br = new BufferedReader(new StringReader(baos.toString()));
250
251
252 synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
253
254 conn = datasource.getConnection();
255
256 if (conn.getAutoCommit()) {
257 conn.setAutoCommit(false);
258 }
259
260 dbUpdated = true;
261
262
263 analyzer.clear();
264
265 if ("ham".equalsIgnoreCase(feedType)) {
266 log(messageId + " Feeding HAM");
267
268 analyzer.addHam(br);
269
270
271 analyzer.updateHamTokens(conn);
272 } else {
273 log(messageId + " Feeding SPAM");
274
275 analyzer.addSpam(br);
276
277
278 analyzer.updateSpamTokens(conn);
279 }
280
281
282 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
283 conn.commit();
284 dbUpdated = false;
285 log(messageId + " Training ended successfully");
286 JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
287 }
288
289 }
290
291 } catch (java.sql.SQLException se) {
292 log("SQLException: "
293 + se.getMessage());
294 } catch (java.io.IOException ioe) {
295 log("IOException: "
296 + ioe.getMessage());
297 } catch (javax.mail.MessagingException me) {
298 log("MessagingException: "
299 + me.getMessage());
300 } finally {
301
302 try {
303 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
304 conn.rollback();
305 dbUpdated = false;
306 }
307 } catch (Exception e) {}
308 theJDBCUtil.closeJDBCConnection(conn);
309 }
310 }
311
312 private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException {
313 Enumeration headers = message.getAllHeaders();
314
315 while (headers.hasMoreElements()) {
316 Header header = (Header) headers.nextElement();
317 try {
318 message.removeHeader(header.getName());
319 } catch (javax.mail.MessagingException me) {}
320 }
321 message.saveChanges();
322 }
323
324 }