1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.apache.james.nntpserver.repository;
23
24 import org.apache.avalon.framework.activity.Initializable;
25 import org.apache.avalon.framework.configuration.Configurable;
26 import org.apache.avalon.framework.configuration.Configuration;
27 import org.apache.avalon.framework.configuration.ConfigurationException;
28 import org.apache.avalon.framework.container.ContainerUtil;
29 import org.apache.avalon.framework.logger.AbstractLogEnabled;
30 import org.apache.avalon.framework.service.ServiceException;
31 import org.apache.avalon.framework.service.ServiceManager;
32 import org.apache.avalon.framework.service.Serviceable;
33 import org.apache.james.services.FileSystem;
34 import org.apache.james.util.Lock;
35 import org.apache.james.util.io.IOUtil;
36
37 import javax.mail.internet.MimeMessage;
38
39 import java.io.BufferedReader;
40 import java.io.File;
41 import java.io.FileInputStream;
42 import java.io.FileOutputStream;
43 import java.io.InputStreamReader;
44 import java.util.Properties;
45 import java.util.StringTokenizer;
46
47
48
49
50
51
52 class NNTPSpooler extends AbstractLogEnabled
53 implements Serviceable, Configurable, Initializable {
54
55
56
57
58 private SpoolerRunnable[] worker;
59
60
61
62
63 private File spoolPath;
64
65
66
67
68 private String spoolPathString;
69
70
71
72
73 private int threadIdleTime = 0;
74
75
76
77
78 private FileSystem fileSystem;
79
80
81
82
83 public void service(final ServiceManager serviceManager)
84 throws ServiceException {
85 setFileSystem((FileSystem) serviceManager.lookup(FileSystem.ROLE));
86 }
87
88
89
90
91 public void configure( Configuration configuration ) throws ConfigurationException {
92 int threadCount = configuration.getChild("threadCount").getValueAsInteger(1);
93 threadIdleTime = configuration.getChild("threadIdleTime").getValueAsInteger(60 * 1000);
94 spoolPathString = configuration.getChild("spoolPath").getValue();
95 worker = new SpoolerRunnable[threadCount];
96 }
97
98
99
100
101 public void initialize() throws Exception {
102
103
104 try {
105 spoolPath = fileSystem.getFile(spoolPathString);
106 if ( spoolPath.exists() == false ) {
107 spoolPath.mkdirs();
108 } else if (!(spoolPath.isDirectory())) {
109 StringBuffer errorBuffer =
110 new StringBuffer(128)
111 .append("Spool directory is improperly configured. The specified path ")
112 .append(spoolPathString)
113 .append(" is not a directory.");
114 throw new ConfigurationException(errorBuffer.toString());
115 }
116 } catch (Exception e) {
117 getLogger().fatalError(e.getMessage(), e);
118 throw e;
119 }
120
121 for ( int i = 0 ; i < worker.length ; i++ ) {
122 worker[i] = new SpoolerRunnable(threadIdleTime,spoolPath);
123 ContainerUtil.enableLogging(worker[i], getLogger());
124 }
125
126
127 for ( int i = 0 ; i < worker.length ; i++ ) {
128 new Thread(worker[i],"NNTPSpool-"+i).start();
129 }
130 }
131
132
133
134
135
136
137 void setRepository(NNTPRepository repo) {
138 for ( int i = 0 ; i < worker.length ; i++ ) {
139 worker[i].setRepository(repo);
140 }
141 }
142
143
144
145
146
147
148 void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
149 for ( int i = 0 ; i < worker.length ; i++ ) {
150 worker[i].setArticleIDRepository(articleIDRepo);
151 }
152 }
153
154
155
156
157
158
159
160 File getSpoolPath() {
161 return spoolPath;
162 }
163
164
165
166
167
168 static class SpoolerRunnable extends AbstractLogEnabled implements Runnable {
169
170 private static final Lock lock = new Lock();
171
172
173
174
175 private final File spoolPath;
176
177
178
179
180 private final int threadIdleTime;
181
182
183
184
185 private ArticleIDRepository articleIDRepo;
186
187
188
189
190 private NNTPRepository repo;
191
192 SpoolerRunnable(int threadIdleTime,File spoolPath) {
193 this.threadIdleTime = threadIdleTime;
194 this.spoolPath = spoolPath;
195 }
196
197
198
199
200
201
202 void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
203 this.articleIDRepo = articleIDRepo;
204 }
205
206
207
208
209
210
211 void setRepository(NNTPRepository repo) {
212 this.repo = repo;
213 }
214
215
216
217
218
219 public void run() {
220 getLogger().debug(Thread.currentThread().getName() + " is the NNTP spooler thread.");
221 try {
222 while ( Thread.interrupted() == false ) {
223 String[] list = spoolPath.list();
224 if (list.length > 0) getLogger().debug("Files to process: "+list.length);
225 for ( int i = 0 ; i < list.length ; i++ ) {
226 if ( lock.lock(list[i]) ) {
227 File f = new File(spoolPath,list[i]).getAbsoluteFile();
228 getLogger().debug("Processing file: "+f.getAbsolutePath());
229 try {
230 process(f);
231 } catch(Throwable ex) {
232 getLogger().debug("Exception occured while processing file: "+
233 f.getAbsolutePath(),ex);
234 } finally {
235 lock.unlock(list[i]);
236 }
237 }
238 list[i] = null;
239 }
240 list = null;
241
242 try {
243 Thread.sleep(threadIdleTime);
244 } catch(InterruptedException ex) {
245
246 }
247 }
248 } finally {
249 Thread.interrupted();
250 }
251 }
252
253
254
255
256
257
258 private void process(File spoolFile) throws Exception {
259 StringBuffer logBuffer =
260 new StringBuffer(160)
261 .append("process: ")
262 .append(spoolFile.getAbsolutePath())
263 .append(",")
264 .append(spoolFile.getCanonicalPath());
265 getLogger().debug(logBuffer.toString());
266 final MimeMessage msg;
267 String articleID;
268
269 {
270 FileInputStream fin = new FileInputStream(spoolFile);
271 try {
272 msg = new MimeMessage(null,fin);
273 } finally {
274 IOUtil.shutdownStream(fin);
275 }
276
277 String lineCount = null;
278 String[] lineCountHeader = msg.getHeader("Lines");
279 if (lineCountHeader == null || lineCountHeader.length == 0) {
280 BufferedReader rdr = new BufferedReader(new InputStreamReader(msg.getDataHandler().getInputStream()));
281 int lines = 0;
282 while (rdr.readLine() != null) {
283 lines++;
284 }
285
286 lineCount = Integer.toString(lines);
287 rdr.close();
288
289 msg.setHeader("Lines", lineCount);
290 }
291
292
293 String[] idheader = msg.getHeader("Message-Id");
294 articleID = ((idheader != null && (idheader.length > 0))? idheader[0] : null);
295 if ((articleID != null) && ( articleIDRepo.isExists(articleID))) {
296 getLogger().debug("Message already exists: "+articleID);
297 if (spoolFile.delete() == false)
298 getLogger().error("Could not delete duplicate message from spool: " + spoolFile.getAbsolutePath());
299 return;
300 }
301 if ( articleID == null || lineCount != null) {
302 if (articleID == null) {
303 articleID = articleIDRepo.generateArticleID();
304 msg.setHeader("Message-Id", articleID);
305 }
306 FileOutputStream fout = new FileOutputStream(spoolFile);
307 try {
308 msg.writeTo(fout);
309 } finally {
310 IOUtil.shutdownStream(fout);
311 }
312 }
313 }
314
315 String[] headers = msg.getHeader("Newsgroups");
316 Properties prop = new Properties();
317 if (headers != null) {
318 for ( int i = 0 ; i < headers.length ; i++ ) {
319 StringTokenizer tokenizer = new StringTokenizer(headers[i],",");
320 while ( tokenizer.hasMoreTokens() ) {
321 String groupName = tokenizer.nextToken().trim();
322 getLogger().debug("Copying message to group: "+groupName);
323 NNTPGroup group = repo.getGroup(groupName);
324 if ( group == null ) {
325 getLogger().error("Couldn't add article with article ID " + articleID + " to group " + groupName + " - group not found.");
326 continue;
327 }
328
329 FileInputStream newsStream = new FileInputStream(spoolFile);
330 try {
331 NNTPArticle article = group.addArticle(newsStream);
332 prop.setProperty(group.getName(),article.getArticleNumber() + "");
333 } finally {
334 IOUtil.shutdownStream(newsStream);
335 }
336 }
337 }
338 }
339 articleIDRepo.addArticle(articleID,prop);
340 boolean delSuccess = spoolFile.delete();
341 if ( delSuccess == false ) {
342 getLogger().error("Could not delete file: " + spoolFile.getAbsolutePath());
343 }
344 }
345 }
346
347
348
349
350
351
352 public void setFileSystem(FileSystem fileSystem) {
353 this.fileSystem = fileSystem;
354 }
355 }