1 /************************************************************************
2 * Copyright (c) 2000-2006 The Apache Software Foundation. *
3 * All rights reserved. *
4 * ------------------------------------------------------------------- *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you *
6 * may not use this file except in compliance with the License. You *
7 * may obtain a copy of the License at: *
8 * *
9 * http://www.apache.org/licenses/LICENSE-2.0 *
10 * *
11 * Unless required by applicable law or agreed to in writing, software *
12 * distributed under the License is distributed on an "AS IS" BASIS, *
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14 * implied. See the License for the specific language governing *
15 * permissions and limitations under the License. *
16 ***********************************************************************/
17
18 package org.apache.james.nntpserver.repository;
19
20 import org.apache.avalon.framework.activity.Initializable;
21 import org.apache.avalon.framework.configuration.Configurable;
22 import org.apache.avalon.framework.configuration.Configuration;
23 import org.apache.avalon.framework.configuration.ConfigurationException;
24 import org.apache.avalon.framework.context.Context;
25 import org.apache.avalon.framework.context.ContextException;
26 import org.apache.avalon.framework.context.Contextualizable;
27 import org.apache.avalon.framework.logger.AbstractLogEnabled;
28 import org.apache.avalon.framework.logger.LogEnabled;
29 import org.apache.james.context.AvalonContextUtilities;
30 import org.apache.james.util.Lock;
31 import org.apache.james.util.io.IOUtil;
32
33 import javax.mail.internet.MimeMessage;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.FileOutputStream;
38 import java.io.InputStreamReader;
39 import java.util.Properties;
40 import java.util.StringTokenizer;
41
42 /***
43 * Processes entries and sends to appropriate groups.
44 * Eats up inappropriate entries.
45 *
46 */
47 class NNTPSpooler extends AbstractLogEnabled
48 implements Contextualizable, Configurable, Initializable {
49
50 /***
51 * The spooler context
52 */
53 private Context context;
54
55 /***
56 * The array of spooler runnables, each associated with a Worker thread
57 */
58 private SpoolerRunnable[] worker;
59
60 /***
61 * The directory containing entries to be spooled.
62 */
63 private File spoolPath;
64
65 /***
66 * The String form of the spool directory.
67 */
68 private String spoolPathString;
69
70 /***
71 * The time the spooler threads sleep between processing
72 */
73 private int threadIdleTime = 0;
74
75 /***
76 * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
77 */
78 public void contextualize(final Context context)
79 throws ContextException {
80 this.context = context;
81 }
82
83 /***
84 * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
85 */
86 public void configure( Configuration configuration ) throws ConfigurationException {
87 int threadCount = configuration.getChild("threadCount").getValueAsInteger(1);
88 threadIdleTime = configuration.getChild("threadIdleTime").getValueAsInteger(60 * 1000);
89 spoolPathString = configuration.getChild("spoolPath").getValue();
90 worker = new SpoolerRunnable[threadCount];
91 }
92
93 /***
94 * @see org.apache.avalon.framework.activity.Initializable#initialize()
95 */
96 public void initialize() throws Exception {
97
98
99 try {
100 spoolPath = AvalonContextUtilities.getFile(context, spoolPathString);
101 if ( spoolPath.exists() == false ) {
102 spoolPath.mkdirs();
103 } else if (!(spoolPath.isDirectory())) {
104 StringBuffer errorBuffer =
105 new StringBuffer(128)
106 .append("Spool directory is improperly configured. The specified path ")
107 .append(spoolPathString)
108 .append(" is not a directory.");
109 throw new ConfigurationException(errorBuffer.toString());
110 }
111 } catch (Exception e) {
112 getLogger().fatalError(e.getMessage(), e);
113 throw e;
114 }
115
116 for ( int i = 0 ; i < worker.length ; i++ ) {
117 worker[i] = new SpoolerRunnable(threadIdleTime,spoolPath);
118 if ( worker[i] instanceof LogEnabled ) {
119 ((LogEnabled)worker[i]).enableLogging(getLogger());
120 }
121 }
122
123
124 for ( int i = 0 ; i < worker.length ; i++ ) {
125 new Thread(worker[i],"NNTPSpool-"+i).start();
126 }
127 }
128
129 /***
130 * Sets the repository used by this spooler.
131 *
132 * @param repo the repository to be used
133 */
134 void setRepository(NNTPRepository repo) {
135 for ( int i = 0 ; i < worker.length ; i++ ) {
136 worker[i].setRepository(repo);
137 }
138 }
139
140 /***
141 * Sets the article id repository used by this spooler.
142 *
143 * @param articleIDRepo the article id repository to be used
144 */
145 void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
146 for ( int i = 0 ; i < worker.length ; i++ ) {
147 worker[i].setArticleIDRepository(articleIDRepo);
148 }
149 }
150
151 /***
152 * Returns (and creates, if the directory doesn't already exist) the
153 * spool directory
154 *
155 * @return the spool directory
156 */
157 File getSpoolPath() {
158 return spoolPath;
159 }
160
161 /***
162 * A static inner class that provides the body for the spool
163 * threads.
164 */
165 static class SpoolerRunnable extends AbstractLogEnabled implements Runnable {
166
167 private static final Lock lock = new Lock();
168
169 /***
170 * The directory containing entries to be spooled.
171 */
172 private final File spoolPath;
173
174 /***
175 * The time the spooler thread sleeps between processing
176 */
177 private final int threadIdleTime;
178
179 /***
180 * The article ID repository used by this spooler thread
181 */
182 private ArticleIDRepository articleIDRepo;
183
184 /***
185 * The NNTP repository used by this spooler thread
186 */
187 private NNTPRepository repo;
188
189 SpoolerRunnable(int threadIdleTime,File spoolPath) {
190 this.threadIdleTime = threadIdleTime;
191 this.spoolPath = spoolPath;
192 }
193
194 /***
195 * Sets the article id repository used by this spooler thread.
196 *
197 * @param articleIDRepo the article id repository to be used
198 */
199 void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
200 this.articleIDRepo = articleIDRepo;
201 }
202
203 /***
204 * Sets the repository used by this spooler thread.
205 *
206 * @param repo the repository to be used
207 */
208 void setRepository(NNTPRepository repo) {
209 this.repo = repo;
210 }
211
212 /***
213 * The threads race to grab a lock. if a thread wins it processes the article,
214 * if it loses it tries to lock and process the next article.
215 */
216 public void run() {
217 getLogger().debug(Thread.currentThread().getName() + " is the NNTP spooler thread.");
218 try {
219 while ( Thread.currentThread().interrupted() == false ) {
220 String[] list = spoolPath.list();
221 if (list.length > 0) getLogger().debug("Files to process: "+list.length);
222 for ( int i = 0 ; i < list.length ; i++ ) {
223 if ( lock.lock(list[i]) ) {
224 File f = new File(spoolPath,list[i]).getAbsoluteFile();
225 getLogger().debug("Processing file: "+f.getAbsolutePath());
226 try {
227 process(f);
228 } catch(Throwable ex) {
229 getLogger().debug("Exception occured while processing file: "+
230 f.getAbsolutePath(),ex);
231 } finally {
232 lock.unlock(list[i]);
233 }
234 }
235 list[i] = null;
236 }
237 list = null;
238
239 try {
240 Thread.currentThread().sleep(threadIdleTime);
241 } catch(InterruptedException ex) {
242
243 }
244 }
245 } finally {
246 Thread.currentThread().interrupted();
247 }
248 }
249
250 /***
251 * Process a file stored in the spool.
252 *
253 * @param f the spool file being processed
254 */
255 private void process(File spoolFile) throws Exception {
256 StringBuffer logBuffer =
257 new StringBuffer(160)
258 .append("process: ")
259 .append(spoolFile.getAbsolutePath())
260 .append(",")
261 .append(spoolFile.getCanonicalPath());
262 getLogger().debug(logBuffer.toString());
263 final MimeMessage msg;
264 String articleID;
265
266 {
267 FileInputStream fin = new FileInputStream(spoolFile);
268 try {
269 msg = new MimeMessage(null,fin);
270 } finally {
271 IOUtil.shutdownStream(fin);
272 }
273
274 String lineCount = null;
275 String[] lineCountHeader = msg.getHeader("Lines");
276 if (lineCountHeader == null || lineCountHeader.length == 0) {
277 BufferedReader rdr = new BufferedReader(new InputStreamReader(msg.getDataHandler().getInputStream()));
278 int lines = 0;
279 while (rdr.readLine() != null) {
280 lines++;
281 }
282
283 lineCount = Integer.toString(lines);
284 rdr.close();
285
286 msg.setHeader("Lines", lineCount);
287 }
288
289
290 String[] idheader = msg.getHeader("Message-Id");
291 articleID = ((idheader != null && (idheader.length > 0))? idheader[0] : null);
292 if ((articleID != null) && ( articleIDRepo.isExists(articleID))) {
293 getLogger().debug("Message already exists: "+articleID);
294 if (spoolFile.delete() == false)
295 getLogger().error("Could not delete duplicate message from spool: " + spoolFile.getAbsolutePath());
296 return;
297 }
298 if ( articleID == null || lineCount != null) {
299 if (articleID == null) {
300 articleID = articleIDRepo.generateArticleID();
301 msg.setHeader("Message-Id", articleID);
302 }
303 FileOutputStream fout = new FileOutputStream(spoolFile);
304 try {
305 msg.writeTo(fout);
306 } finally {
307 IOUtil.shutdownStream(fout);
308 }
309 }
310 }
311
312 String[] headers = msg.getHeader("Newsgroups");
313 Properties prop = new Properties();
314 if (headers != null) {
315 for ( int i = 0 ; i < headers.length ; i++ ) {
316 StringTokenizer tokenizer = new StringTokenizer(headers[i],",");
317 while ( tokenizer.hasMoreTokens() ) {
318 String groupName = tokenizer.nextToken().trim();
319 getLogger().debug("Copying message to group: "+groupName);
320 NNTPGroup group = repo.getGroup(groupName);
321 if ( group == null ) {
322 getLogger().error("Couldn't add article with article ID " + articleID + " to group " + groupName + " - group not found.");
323 continue;
324 }
325
326 FileInputStream newsStream = new FileInputStream(spoolFile);
327 try {
328 NNTPArticle article = group.addArticle(newsStream);
329 prop.setProperty(group.getName(),article.getArticleNumber() + "");
330 } finally {
331 IOUtil.shutdownStream(newsStream);
332 }
333 }
334 }
335 }
336 articleIDRepo.addArticle(articleID,prop);
337 boolean delSuccess = spoolFile.delete();
338 if ( delSuccess == false ) {
339 getLogger().error("Could not delete file: " + spoolFile.getAbsolutePath());
340 }
341 }
342 }
343 }