1 /****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20
21
22 package org.apache.james.transport.mailets;
23
24 import java.io.BufferedReader;
25 import java.io.StringReader;
26 import java.io.ByteArrayOutputStream;
27
28 import java.sql.Connection;
29 import java.util.Enumeration;
30
31 import javax.mail.internet.MimeMessage;
32 import javax.mail.Header;
33 import javax.mail.MessagingException;
34
35 import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
36 import org.apache.avalon.excalibur.datasource.DataSourceComponent;
37 import org.apache.avalon.framework.service.ServiceManager;
38 import org.apache.james.Constants;
39 import org.apache.mailet.base.GenericMailet;
40 import org.apache.mailet.Mail;
41
42 import org.apache.james.util.bayesian.JDBCBayesianAnalyzer;
43 import org.apache.james.util.sql.JDBCUtil;
44
45 /**
46 * <P>Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet.</P>
47 *
48 * <P>The new token frequencies will be stored in a JDBC database.</P>
49 *
50 * <P>Sample configuration:</P>
51 * <PRE><CODE>
52 * <processor name="root">
53 *
54 * <mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder">
55 * <repositoryPath> db://maildb </repositoryPath>
56 * <feedType>ham</feedType>
57 * <!--
58 * Set this to the maximum message size (in bytes) that a message may have
59 * to be analyzed (default is 100000).
60 * -->
61 * <maxSize>100000</maxSize>
62 * </mailet>
63 *
64 * <mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder">
65 * <repositoryPath> db://maildb </repositoryPath>
66 * <feedType>spam</feedType>
67 * <!--
68 * Set this to the maximum message size (in bytes) that a message may have
69 * to be analyzed (default is 100000).
70 * -->
71 * <maxSize>100000</maxSize>
72 * </mailet>
73 *
74 * <processor>
75 * </CODE></PRE>
76 *
77 * <P>The previous example will allow the user to send messages to the server
78 * and use the recipient email address as the indicator for whether the message
79 * is ham or spam.</P>
80 *
81 * <P>Using the example above, send good messages (ham not spam) to the email
82 * address "not.spam@thisdomain.com" to pump good messages into the feeder,
83 * and send spam messages (spam not ham) to the email
84 * address "spam@thisdomain.com" to pump spam messages into the feeder.</P>
85 *
86 * <p>The bayesian database tables will be updated during the training reflecting
87 * the new data</p>
88 *
89 * <P>At the end the mail will be destroyed (ghosted).</P>
90 *
91 * <P><B>The correct approach is to send the original ham/spam message as an attachment
92 * to another message sent to the feeder; all the headers of the enveloping message
93 * will be removed and only the original message's tokens will be analyzed.</B></P>
94 *
95 * <p>After a training session, the frequency <i>Corpus</i> used by <CODE>BayesianAnalysis</CODE>
96 * must be rebuilt from the database, in order to take advantage of the new token frequencies.
97 * Every 10 minutes a special thread in the <CODE>BayesianAnalysis</CODE> mailet will check if any
98 * change was made to the database, and rebuild the corpus if necessary.</p>
99 *
100 * <p>Only one message at a time is scanned (the database update activity is <I>synchronized</I>)
101 * in order to avoid too much database locking,
102 * as thousands of rows may be updated just for one message fed.</p>
103 * @see BayesianAnalysis
104 * @see org.apache.james.util.bayesian.BayesianAnalyzer
105 * @see org.apache.james.util.bayesian.JDBCBayesianAnalyzer
106 * @version CVS $Revision: 717869 $ $Date: 2008-11-15 15:56:18 +0000 (Sat, 15 Nov 2008) $
107 * @since 2.3.0
108 */
109
110 public class BayesianAnalysisFeeder
111 extends GenericMailet {
112 /**
113 * The JDBCUtil helper class
114 */
115 private final JDBCUtil theJDBCUtil = new JDBCUtil() {
116 protected void delegatedLog(String logString) {
117 log("BayesianAnalysisFeeder: " + logString);
118 }
119 };
120
121 /**
122 * The JDBCBayesianAnalyzer class that does all the work.
123 */
124 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
125 protected void delegatedLog(String logString) {
126 log("BayesianAnalysisFeeder: " + logString);
127 }
128 };
129
130 private DataSourceComponent datasource;
131 private String repositoryPath;
132
133 private String feedType;
134
135 /**
136 * Return a string describing this mailet.
137 *
138 * @return a string describing this mailet
139 */
140 public String getMailetInfo() {
141 return "BayesianAnalysisFeeder Mailet";
142 }
143
144 /**
145 * Holds value of property maxSize.
146 */
147 private int maxSize = 100000;
148
149 /**
150 * Getter for property maxSize.
151 * @return Value of property maxSize.
152 */
153 public int getMaxSize() {
154
155 return this.maxSize;
156 }
157
158 /**
159 * Setter for property maxSize.
160 * @param maxSize New value of property maxSize.
161 */
162 public void setMaxSize(int maxSize) {
163
164 this.maxSize = maxSize;
165 }
166
167 /**
168 * Mailet initialization routine.
169 * @throws MessagingException if a problem arises
170 */
171 public void init() throws MessagingException {
172 repositoryPath = getInitParameter("repositoryPath");
173
174 if (repositoryPath == null) {
175 throw new MessagingException("repositoryPath is null");
176 }
177
178 feedType = getInitParameter("feedType");
179 if (feedType == null) {
180 throw new MessagingException("feedType is null");
181 }
182
183 String maxSizeParam = getInitParameter("maxSize");
184 if (maxSizeParam != null) {
185 setMaxSize(Integer.parseInt(maxSizeParam));
186 }
187 log("maxSize: " + getMaxSize());
188
189 initDb();
190
191 }
192
193 private void initDb() throws MessagingException {
194
195 try {
196 ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
197
198 // Get the DataSourceSelector block
199 DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
200
201 // Get the data-source required.
202 int stindex = repositoryPath.indexOf("://") + 3;
203
204 String datasourceName = repositoryPath.substring(stindex);
205
206 datasource = (DataSourceComponent) datasources.select(datasourceName);
207 } catch (Exception e) {
208 throw new MessagingException("Can't get datasource", e);
209 }
210
211 try {
212 analyzer.initSqlQueries(datasource.getConnection(), getMailetContext().getAttribute("confDir") + "/sqlResources.xml");
213 } catch (Exception e) {
214 throw new MessagingException("Exception initializing queries", e);
215 }
216
217 }
218
219 /**
220 * Scans the mail and updates the token frequencies in the database.
221 *
222 * The method is synchronized in order to avoid too much database locking,
223 * as thousands of rows may be updated just for one message fed.
224 *
225 * @param mail The Mail message to be scanned.
226 */
227 public void service(Mail mail) {
228 boolean dbUpdated = false;
229
230 mail.setState(Mail.GHOST);
231
232 ByteArrayOutputStream baos = new ByteArrayOutputStream();
233
234 Connection conn = null;
235
236 try {
237
238 MimeMessage message = mail.getMessage();
239
240 String messageId = message.getMessageID();
241
242 if (message.getSize() > getMaxSize()) {
243 log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize());
244 return;
245 }
246
247 clearAllHeaders(message);
248
249 message.writeTo(baos);
250
251 BufferedReader br = new BufferedReader(new StringReader(baos.toString()));
252
253 // this is synchronized to avoid concurrent update of the corpus
254 synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
255
256 conn = datasource.getConnection();
257
258 if (conn.getAutoCommit()) {
259 conn.setAutoCommit(false);
260 }
261
262 dbUpdated = true;
263
264 //Clear out any existing word/counts etc..
265 analyzer.clear();
266
267 if ("ham".equalsIgnoreCase(feedType)) {
268 log(messageId + " Feeding HAM");
269 //Process the stream as ham (not spam).
270 analyzer.addHam(br);
271
272 //Update storage statistics.
273 analyzer.updateHamTokens(conn);
274 } else {
275 log(messageId + " Feeding SPAM");
276 //Process the stream as spam.
277 analyzer.addSpam(br);
278
279 //Update storage statistics.
280 analyzer.updateSpamTokens(conn);
281 }
282
283 //Commit our changes if necessary.
284 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
285 conn.commit();
286 dbUpdated = false;
287 log(messageId + " Training ended successfully");
288 JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
289 }
290
291 }
292
293 } catch (java.sql.SQLException se) {
294 log("SQLException: "
295 + se.getMessage());
296 } catch (java.io.IOException ioe) {
297 log("IOException: "
298 + ioe.getMessage());
299 } catch (javax.mail.MessagingException me) {
300 log("MessagingException: "
301 + me.getMessage());
302 } finally {
303 //Rollback our changes if necessary.
304 try {
305 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
306 conn.rollback();
307 dbUpdated = false;
308 }
309 } catch (Exception e) {}
310 theJDBCUtil.closeJDBCConnection(conn);
311 }
312 }
313
314 private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException {
315 Enumeration headers = message.getAllHeaders();
316
317 while (headers.hasMoreElements()) {
318 Header header = (Header) headers.nextElement();
319 try {
320 message.removeHeader(header.getName());
321 } catch (javax.mail.MessagingException me) {}
322 }
323 message.saveChanges();
324 }
325
326 }