1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.james.ai.classic;
21
22 import java.io.BufferedReader;
23 import java.io.ByteArrayOutputStream;
24 import java.io.StringReader;
25 import java.sql.Connection;
26 import java.text.DecimalFormat;
27 import java.util.Collection;
28 import java.util.Iterator;
29
30 import javax.annotation.Resource;
31 import javax.mail.MessagingException;
32 import javax.mail.internet.MimeMessage;
33 import javax.sql.DataSource;
34
35 import org.apache.mailet.Mail;
36 import org.apache.mailet.MailAddress;
37 import org.apache.mailet.base.GenericMailet;
38 import org.apache.mailet.base.RFC2822Headers;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public class BayesianAnalysis extends GenericMailet {
124
125
126
127 private final JDBCUtil theJDBCUtil = new JDBCUtil() {
128 protected void delegatedLog(String logString) {
129 log("BayesianAnalysis: " + logString);
130 }
131 };
132
133
134
135
136 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
137 protected void delegatedLog(String logString) {
138 log("BayesianAnalysis: " + logString);
139 }
140 };
141
142 private DataSource datasource;
143 private String repositoryPath;
144
145 private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability";
146 private static final String HEADER_NAME = "X-MessageIsSpamProbability";
147 private static final long CORPUS_RELOAD_INTERVAL = 600000;
148 private String headerName;
149 private boolean ignoreLocalSender = false;
150 private boolean tagSubject = true;
151
152
153
154
155
156
157 public String getMailetInfo() {
158 return "BayesianAnalysis Mailet";
159 }
160
161
162
163
164 private int maxSize = 100000;
165
166
167
168
169 private long lastCorpusLoadTime;
170
171 private SystemContext fs;
172
173
174
175
176
177
178 public int getMaxSize() {
179
180 return this.maxSize;
181 }
182
183
184
185
186
187
188
189 public void setMaxSize(int maxSize) {
190
191 this.maxSize = maxSize;
192 }
193
194
195
196
197
198
199 public long getLastCorpusLoadTime() {
200
201 return this.lastCorpusLoadTime;
202 }
203
204 @Resource(name = "datasource")
205 public void setDataSource(DataSource datasource) {
206 this.datasource = datasource;
207 }
208
209 @Resource(name = "filesystem")
210 public void setFileSystem(SystemContext fs) {
211 this.fs = fs;
212 }
213
214
215
216
217 private void touchLastCorpusLoadTime() {
218
219 this.lastCorpusLoadTime = System.currentTimeMillis();
220 }
221
222
223
224
225
226
227
228 public void init() throws MessagingException {
229 repositoryPath = getInitParameter("repositoryPath");
230
231 if (repositoryPath == null) {
232 throw new MessagingException("repositoryPath is null");
233 }
234
235 headerName = getInitParameter("headerName", HEADER_NAME);
236
237 ignoreLocalSender = Boolean.valueOf(getInitParameter("ignoreLocalSender")).booleanValue();
238
239 if (ignoreLocalSender) {
240 log("Will ignore messages coming from local senders");
241 } else {
242 log("Will analyze messages coming from local senders");
243 }
244
245 String maxSizeParam = getInitParameter("maxSize");
246 if (maxSizeParam != null) {
247 setMaxSize(Integer.parseInt(maxSizeParam));
248 }
249 log("maxSize: " + getMaxSize());
250
251 String tag = getInitParameter("tagSubject");
252 if (tag != null && tag.equals("false")) {
253 tagSubject = false;
254 }
255
256 initDb();
257
258 CorpusLoader corpusLoader = new CorpusLoader(this);
259 corpusLoader.setDaemon(true);
260 corpusLoader.start();
261
262 }
263
264 private void initDb() throws MessagingException {
265
266 try {
267 analyzer.initSqlQueries(datasource.getConnection(), fs.readXml("sqlResources.xml"));
268 } catch (Exception e) {
269 throw new MessagingException("Exception initializing queries", e);
270 }
271
272 try {
273 loadData(datasource.getConnection());
274 } catch (java.sql.SQLException se) {
275 throw new MessagingException("SQLException loading data", se);
276 }
277 }
278
279
280
281
282
283
284
285
286
287 public void service(Mail mail) throws MessagingException {
288
289 try {
290 MimeMessage message = mail.getMessage();
291
292 if (ignoreLocalSender) {
293
294 if (mail.getSender() != null && getMailetContext().isLocalServer(mail.getSender().getDomain())) {
295 return;
296 }
297 }
298
299 String[] headerArray = message.getHeader(headerName);
300
301 if (headerArray != null && headerArray.length > 0) {
302 return;
303 }
304
305 ByteArrayOutputStream baos = new ByteArrayOutputStream();
306
307 double probability;
308
309 if (message.getSize() < getMaxSize()) {
310 message.writeTo(baos);
311 probability = analyzer.computeSpamProbability(new BufferedReader(new StringReader(baos.toString())));
312 } else {
313 probability = 0.0;
314 }
315
316 mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(probability));
317 message.setHeader(headerName, Double.toString(probability));
318
319 DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat.getInstance();
320 probabilityForm.applyPattern("##0.##%");
321 String probabilityString = probabilityForm.format(probability);
322
323 String senderString;
324 if (mail.getSender() == null) {
325 senderString = "null";
326 } else {
327 senderString = mail.getSender().toString();
328 }
329 if (probability > 0.1) {
330 @SuppressWarnings("unchecked")
331 final Collection<MailAddress> recipients = mail.getRecipients();
332 log(headerName + ": " + probabilityString + "; From: " + senderString + "; Recipient(s): " + getAddressesString(recipients));
333
334
335 if (tagSubject) {
336 appendToSubject(message, " [" + probabilityString + (probability > 0.9 ? " SPAM" : " spam") + "]");
337 }
338 }
339
340 saveChanges(message);
341
342 } catch (Exception e) {
343 log("Exception: " + e.getMessage(), e);
344 throw new MessagingException("Exception thrown", e);
345 }
346 }
347
348 private void loadData(Connection conn) throws java.sql.SQLException {
349
350 try {
351
352 synchronized (JDBCBayesianAnalyzer.DATABASE_LOCK) {
353 analyzer.tokenCountsClear();
354 analyzer.loadHamNSpam(conn);
355 analyzer.buildCorpus();
356 analyzer.tokenCountsClear();
357 }
358
359 log("BayesianAnalysis Corpus loaded");
360
361 touchLastCorpusLoadTime();
362
363 } finally {
364 if (conn != null) {
365 theJDBCUtil.closeJDBCConnection(conn);
366 }
367 }
368
369 }
370
371 private String getAddressesString(Collection<MailAddress> addresses) {
372 if (addresses == null) {
373 return "null";
374 }
375
376 Iterator<MailAddress> iter = addresses.iterator();
377 StringBuffer sb = new StringBuffer();
378 sb.append('[');
379 for (int i = 0; iter.hasNext(); i++) {
380 sb.append(iter.next());
381 if (i + 1 < addresses.size()) {
382 sb.append(", ");
383 }
384 }
385 sb.append(']');
386 return sb.toString();
387 }
388
389 private void appendToSubject(MimeMessage message, String toAppend) {
390 try {
391 String subject = message.getSubject();
392
393 if (subject == null) {
394 message.setSubject(toAppend, "iso-8859-1");
395 } else {
396 message.setSubject(toAppend + " " + subject, "iso-8859-1");
397 }
398 } catch (MessagingException ex) {
399 }
400 }
401
402
403
404
405 private void saveChanges(MimeMessage message) throws MessagingException {
406 String messageId = message.getMessageID();
407 message.saveChanges();
408 if (messageId != null) {
409 message.setHeader(RFC2822Headers.MESSAGE_ID, messageId);
410 }
411 }
412
413 private static class CorpusLoader extends Thread {
414
415 private BayesianAnalysis analysis;
416
417 private CorpusLoader(BayesianAnalysis analysis) {
418 super("BayesianAnalysis Corpus Loader");
419 this.analysis = analysis;
420 }
421
422
423
424
425 public void run() {
426 analysis.log("CorpusLoader thread started: will wake up every " + CORPUS_RELOAD_INTERVAL + " ms");
427
428 try {
429 Thread.sleep(CORPUS_RELOAD_INTERVAL);
430
431 while (true) {
432 if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer.getLastDatabaseUpdateTime()) {
433 analysis.log("Reloading Corpus ...");
434 try {
435 analysis.loadData(analysis.datasource.getConnection());
436 analysis.log("Corpus reloaded");
437 } catch (java.sql.SQLException se) {
438 analysis.log("SQLException: ", se);
439 }
440
441 }
442
443 if (Thread.interrupted()) {
444 break;
445 }
446 Thread.sleep(CORPUS_RELOAD_INTERVAL);
447 }
448 } catch (InterruptedException ex) {
449 interrupt();
450 }
451 }
452
453 }
454
455 }