1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.apache.james.transport.mailets;
23
24 import java.io.BufferedReader;
25 import java.io.StringReader;
26 import java.io.ByteArrayOutputStream;
27
28 import java.sql.Connection;
29 import java.util.Enumeration;
30
31 import javax.mail.internet.MimeMessage;
32 import javax.mail.Header;
33 import javax.mail.MessagingException;
34
35 import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
36 import org.apache.avalon.excalibur.datasource.DataSourceComponent;
37 import org.apache.avalon.framework.service.ServiceManager;
38 import org.apache.james.Constants;
39 import org.apache.mailet.base.GenericMailet;
40 import org.apache.mailet.Mail;
41
42 import org.apache.james.util.bayesian.JDBCBayesianAnalyzer;
43 import org.apache.james.util.sql.JDBCUtil;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 public class BayesianAnalysisFeeder
111 extends GenericMailet {
112
113
114
115 private final JDBCUtil theJDBCUtil = new JDBCUtil() {
116 protected void delegatedLog(String logString) {
117 log("BayesianAnalysisFeeder: " + logString);
118 }
119 };
120
121
122
123
124 private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
125 protected void delegatedLog(String logString) {
126 log("BayesianAnalysisFeeder: " + logString);
127 }
128 };
129
130 private DataSourceComponent datasource;
131 private String repositoryPath;
132
133 private String feedType;
134
135
136
137
138
139
140 public String getMailetInfo() {
141 return "BayesianAnalysisFeeder Mailet";
142 }
143
144
145
146
147 private int maxSize = 100000;
148
149
150
151
152
153 public int getMaxSize() {
154
155 return this.maxSize;
156 }
157
158
159
160
161
162 public void setMaxSize(int maxSize) {
163
164 this.maxSize = maxSize;
165 }
166
167
168
169
170
171 public void init() throws MessagingException {
172 repositoryPath = getInitParameter("repositoryPath");
173
174 if (repositoryPath == null) {
175 throw new MessagingException("repositoryPath is null");
176 }
177
178 feedType = getInitParameter("feedType");
179 if (feedType == null) {
180 throw new MessagingException("feedType is null");
181 }
182
183 String maxSizeParam = getInitParameter("maxSize");
184 if (maxSizeParam != null) {
185 setMaxSize(Integer.parseInt(maxSizeParam));
186 }
187 log("maxSize: " + getMaxSize());
188
189 initDb();
190
191 }
192
193 private void initDb() throws MessagingException {
194
195 try {
196 ServiceManager serviceManager = (ServiceManager) getMailetContext().getAttribute(Constants.AVALON_COMPONENT_MANAGER);
197
198
199 DataSourceSelector datasources = (DataSourceSelector) serviceManager.lookup(DataSourceSelector.ROLE);
200
201
202 int stindex = repositoryPath.indexOf("://") + 3;
203
204 String datasourceName = repositoryPath.substring(stindex);
205
206 datasource = (DataSourceComponent) datasources.select(datasourceName);
207 } catch (Exception e) {
208 throw new MessagingException("Can't get datasource", e);
209 }
210
211 try {
212 analyzer.initSqlQueries(datasource.getConnection(), getMailetContext().getAttribute("confDir") + "/sqlResources.xml");
213 } catch (Exception e) {
214 throw new MessagingException("Exception initializing queries", e);
215 }
216
217 }
218
219
220
221
222
223
224
225
226
227 public void service(Mail mail) {
228 boolean dbUpdated = false;
229
230 mail.setState(Mail.GHOST);
231
232 ByteArrayOutputStream baos = new ByteArrayOutputStream();
233
234 Connection conn = null;
235
236 try {
237
238 MimeMessage message = mail.getMessage();
239
240 String messageId = message.getMessageID();
241
242 if (message.getSize() > getMaxSize()) {
243 log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize());
244 return;
245 }
246
247 clearAllHeaders(message);
248
249 message.writeTo(baos);
250
251 BufferedReader br = new BufferedReader(new StringReader(baos.toString()));
252
253
254 synchronized(JDBCBayesianAnalyzer.DATABASE_LOCK) {
255
256 conn = datasource.getConnection();
257
258 if (conn.getAutoCommit()) {
259 conn.setAutoCommit(false);
260 }
261
262 dbUpdated = true;
263
264
265 analyzer.clear();
266
267 if ("ham".equalsIgnoreCase(feedType)) {
268 log(messageId + " Feeding HAM");
269
270 analyzer.addHam(br);
271
272
273 analyzer.updateHamTokens(conn);
274 } else {
275 log(messageId + " Feeding SPAM");
276
277 analyzer.addSpam(br);
278
279
280 analyzer.updateSpamTokens(conn);
281 }
282
283
284 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
285 conn.commit();
286 dbUpdated = false;
287 log(messageId + " Training ended successfully");
288 JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
289 }
290
291 }
292
293 } catch (java.sql.SQLException se) {
294 log("SQLException: "
295 + se.getMessage());
296 } catch (java.io.IOException ioe) {
297 log("IOException: "
298 + ioe.getMessage());
299 } catch (javax.mail.MessagingException me) {
300 log("MessagingException: "
301 + me.getMessage());
302 } finally {
303
304 try {
305 if (conn != null && dbUpdated && !conn.getAutoCommit()) {
306 conn.rollback();
307 dbUpdated = false;
308 }
309 } catch (Exception e) {}
310 theJDBCUtil.closeJDBCConnection(conn);
311 }
312 }
313
314 private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException {
315 Enumeration headers = message.getAllHeaders();
316
317 while (headers.hasMoreElements()) {
318 Header header = (Header) headers.nextElement();
319 try {
320 message.removeHeader(header.getName());
321 } catch (javax.mail.MessagingException me) {}
322 }
323 message.saveChanges();
324 }
325
326 }