1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.apache.james.transport.mailets;
23
24 import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
25 import org.apache.avalon.excalibur.datasource.DataSourceComponent;
26 import org.apache.avalon.framework.service.ServiceManager;
27 import org.apache.james.Constants;
28 import org.apache.james.util.bayesian.JDBCBayesianAnalyzer;
29 import org.apache.james.util.sql.JDBCUtil;
30 import org.apache.mailet.base.GenericMailet;
31 import org.apache.mailet.Mail;
32 import org.apache.mailet.base.RFC2822Headers;
33
34 import javax.mail.MessagingException;
35 import javax.mail.internet.MimeMessage;
36
37
38 import java.io.BufferedReader;
39 import java.io.ByteArrayOutputStream;
40 import java.io.StringReader;
41 import java.sql.Connection;
42 import java.text.DecimalFormat;
43 import java.util.Collection;
44 import java.util.Iterator;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 public class BayesianAnalysis
111 extends GenericMailet {
112
113
114
115 private final JDBCUtil theJDBCUtil = new JDBCUtil() {
116 protected void delegatedLog(String logString) {
117 log("BayesianAnalysis: " + logString);
118 }
119 };
120
121
122
123
124 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
125 protected void delegatedLog(String logString) {
126 log("BayesianAnalysis: " + logString);
127 }
128 };
129
130 private DataSourceComponent datasource;
131 private String repositoryPath;
132
133 private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability";
134 private static final String HEADER_NAME = "X-MessageIsSpamProbability";
135 private static final long CORPUS_RELOAD_INTERVAL = 600000;
136 private String headerName;
137 private boolean ignoreLocalSender = false;
138 private boolean tagSubject = true;
139
140
141
142
143
144
145 public String getMailetInfo() {
146 return "BayesianAnalysis Mailet";
147 }
148
149
150
151
152 private int maxSize = 100000;
153
154
155
156
157 private long lastCorpusLoadTime;
158
159
160
161
162
163 public int getMaxSize() {
164
165 return this.maxSize;
166 }
167
168
169
170
171
172 public void setMaxSize(int maxSize) {
173
174 this.maxSize = maxSize;
175 }
176
177
178
179
180
181 public long getLastCorpusLoadTime() {
182
183 return this.lastCorpusLoadTime;
184 }
185
186
187
188
189 private void touchLastCorpusLoadTime() {
190
191 this.lastCorpusLoadTime = System.currentTimeMillis();
192 }
193
194
195
196
197
198 public void init() throws MessagingException {
199 repositoryPath = getInitParameter("repositoryPath");
200
201 if (repositoryPath == null) {
202 throw new MessagingException("repositoryPath is null");
203 }
204
205 headerName = getInitParameter("headerName",HEADER_NAME);
206
207 ignoreLocalSender = Boolean.valueOf(getInitParameter("ignoreLocalSender")).booleanValue();
208
209 if (ignoreLocalSender) {
210 log("Will ignore messages coming from local senders");
211 } else {
212 log("Will analyze messages coming from local senders");
213 }
214
215 String maxSizeParam = getInitParameter("maxSize");
216 if (maxSizeParam != null) {
217 setMaxSize(Integer.parseInt(maxSizeParam));
218 }
219 log("maxSize: " + getMaxSize());
220
221 String tag = getInitParameter("tagSubject");
222 if (tag != null && tag.equals("false")) {
223 tagSubject = false;
224 }
225
226 initDb();
227
228 CorpusLoader corpusLoader = new CorpusLoader(this);
229 corpusLoader.setDaemon(true);
230 corpusLoader.start();
231
232 }
233
234 private void initDb() throws MessagingException {
235
236 try {
237 ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
238
239
240 DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
241
242
243 int stindex = repositoryPath.indexOf("://") + 3;
244
245 String datasourceName = repositoryPath.substring(stindex);
246
247 datasource = (DataSourceComponent) datasources.select(datasourceName);
248 } catch (Exception e) {
249 throw new MessagingException("Can't get datasource", e);
250 }
251
252 try {
253 analyzer.initSqlQueries(datasource.getConnection(), getMailetContext().getAttribute("confDir") + "/sqlResources.xml");
254 } catch (Exception e) {
255 throw new MessagingException("Exception initializing queries", e);
256 }
257
258 try {
259 loadData(datasource.getConnection());
260 } catch (java.sql.SQLException se) {
261 throw new MessagingException("SQLException loading data", se);
262 }
263 }
264
265
266
267
268
269
270
271 public void service(Mail mail) throws MessagingException {
272
273 try {
274 MimeMessage message = mail.getMessage();
275
276 if (ignoreLocalSender) {
277
278 if (mail.getSender() != null
279 && getMailetContext().isLocalServer(mail.getSender().getHost())) {
280 return;
281 }
282 }
283
284 String [] headerArray = message.getHeader(headerName);
285
286 if (headerArray != null && headerArray.length > 0) {
287 return;
288 }
289
290 ByteArrayOutputStream baos = new ByteArrayOutputStream();
291
292 double probability;
293
294 if (message.getSize() < getMaxSize()) {
295 message.writeTo(baos);
296 probability = analyzer.computeSpamProbability(new BufferedReader(new StringReader(baos.toString())));
297 } else {
298 probability = 0.0;
299 }
300
301 mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(probability));
302 message.setHeader(headerName, Double.toString(probability));
303
304 DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat.getInstance();
305 probabilityForm.applyPattern("##0.##%");
306 String probabilityString = probabilityForm.format(probability);
307
308 String senderString;
309 if (mail.getSender() == null) {
310 senderString = "null";
311 } else {
312 senderString = mail.getSender().toString();
313 }
314 if (probability > 0.1) {
315 log(headerName
316 + ": "
317 + probabilityString
318 + "; From: "
319 + senderString
320 + "; Recipient(s): "
321 + getAddressesString(mail.getRecipients()));
322
323
324
325 if (tagSubject) {
326 appendToSubject(message,
327 " [" + probabilityString
328 + (probability > 0.9 ? " SPAM" : " spam") + "]");
329 }
330 }
331
332 saveChanges(message);
333
334 } catch (Exception e) {
335 log("Exception: "
336 + e.getMessage(), e);
337 throw new MessagingException("Exception thrown", e);
338 }
339 }
340
341 private void loadData(Connection conn)
342 throws java.sql.SQLException {
343
344 try {
345
346 synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
347 analyzer.tokenCountsClear();
348 analyzer.loadHamNSpam(conn);
349 analyzer.buildCorpus();
350 analyzer.tokenCountsClear();
351 }
352
353 log("BayesianAnalysis Corpus loaded");
354
355 touchLastCorpusLoadTime();
356
357 } finally {
358 if (conn != null) {
359 theJDBCUtil.closeJDBCConnection(conn);
360 }
361 }
362
363 }
364
365 private String getAddressesString(Collection addresses) {
366 if (addresses == null) {
367 return "null";
368 }
369
370 Iterator iter = addresses.iterator();
371 StringBuffer sb = new StringBuffer();
372 sb.append('[');
373 for (int i = 0; iter.hasNext(); i++) {
374 sb.append(iter.next());
375 if (i + 1 < addresses.size()) {
376 sb.append(", ");
377 }
378 }
379 sb.append(']');
380 return sb.toString();
381 }
382
383 private void appendToSubject(MimeMessage message, String toAppend) {
384 try {
385 String subject = message.getSubject();
386
387 if (subject == null) {
388 message.setSubject(toAppend, "iso-8859-1");
389 } else {
390 message.setSubject(toAppend + " " + subject, "iso-8859-1");
391 }
392 } catch (MessagingException ex) {}
393 }
394
395
396
397
398 private void saveChanges(MimeMessage message) throws MessagingException {
399 String messageId = message.getMessageID();
400 message.saveChanges();
401 if (messageId != null) {
402 message.setHeader(RFC2822Headers.MESSAGE_ID, messageId);
403 }
404 }
405
406 private static class CorpusLoader extends Thread {
407
408 private BayesianAnalysis analysis;
409
410 private CorpusLoader(BayesianAnalysis analysis) {
411 super("BayesianAnalysis Corpus Loader");
412 this.analysis = analysis;
413 }
414
415
416
417 public void run() {
418 analysis.log("CorpusLoader thread started: will wake up every " + CORPUS_RELOAD_INTERVAL + " ms");
419
420 try {
421 Thread.sleep(CORPUS_RELOAD_INTERVAL);
422
423 while (true) {
424 if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer.getLastDatabaseUpdateTime()) {
425 analysis.log("Reloading Corpus ...");
426 try {
427 analysis.loadData(analysis.datasource.getConnection());
428 analysis.log("Corpus reloaded");
429 } catch (java.sql.SQLException se) {
430 analysis.log("SQLException: ", se);
431 }
432
433 }
434
435 if (Thread.interrupted()) {
436 break;
437 }
438 Thread.sleep(CORPUS_RELOAD_INTERVAL);
439 }
440 }
441 catch (InterruptedException ex) {
442 interrupt();
443 }
444 }
445
446 }
447
448 }