1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.nntpserver.repository;
21
22 import org.apache.avalon.framework.activity.Initializable;
23 import org.apache.avalon.framework.configuration.Configurable;
24 import org.apache.avalon.framework.configuration.Configuration;
25 import org.apache.avalon.framework.configuration.ConfigurationException;
26 import org.apache.avalon.framework.context.Context;
27 import org.apache.avalon.framework.context.ContextException;
28 import org.apache.avalon.framework.context.Contextualizable;
29 import org.apache.avalon.framework.logger.AbstractLogEnabled;
30 import org.apache.avalon.framework.logger.LogEnabled;
31 import org.apache.james.context.AvalonContextUtilities;
32 import org.apache.james.util.Lock;
33 import org.apache.james.util.io.IOUtil;
34
35 import javax.mail.internet.MimeMessage;
36 import java.io.BufferedReader;
37 import java.io.File;
38 import java.io.FileInputStream;
39 import java.io.FileOutputStream;
40 import java.io.InputStreamReader;
41 import java.util.Properties;
42 import java.util.StringTokenizer;
43
44 /***
45 * Processes entries and sends to appropriate groups.
46 * Eats up inappropriate entries.
47 *
48 */
49 class NNTPSpooler extends AbstractLogEnabled
50 implements Contextualizable, Configurable, Initializable {
51
52 /***
53 * The spooler context
54 */
55 private Context context;
56
57 /***
58 * The array of spooler runnables, each associated with a Worker thread
59 */
60 private SpoolerRunnable[] worker;
61
62 /***
63 * The directory containing entries to be spooled.
64 */
65 private File spoolPath;
66
67 /***
68 * The String form of the spool directory.
69 */
70 private String spoolPathString;
71
72 /***
73 * The time the spooler threads sleep between processing
74 */
75 private int threadIdleTime = 0;
76
77 /***
78 * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
79 */
80 public void contextualize(final Context context)
81 throws ContextException {
82 this.context = context;
83 }
84
85 /***
86 * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
87 */
88 public void configure( Configuration configuration ) throws ConfigurationException {
89 int threadCount = configuration.getChild("threadCount").getValueAsInteger(1);
90 threadIdleTime = configuration.getChild("threadIdleTime").getValueAsInteger(60 * 1000);
91 spoolPathString = configuration.getChild("spoolPath").getValue();
92 worker = new SpoolerRunnable[threadCount];
93 }
94
95 /***
96 * @see org.apache.avalon.framework.activity.Initializable#initialize()
97 */
98 public void initialize() throws Exception {
99
100
101 try {
102 spoolPath = AvalonContextUtilities.getFile(context, spoolPathString);
103 if ( spoolPath.exists() == false ) {
104 spoolPath.mkdirs();
105 } else if (!(spoolPath.isDirectory())) {
106 StringBuffer errorBuffer =
107 new StringBuffer(128)
108 .append("Spool directory is improperly configured. The specified path ")
109 .append(spoolPathString)
110 .append(" is not a directory.");
111 throw new ConfigurationException(errorBuffer.toString());
112 }
113 } catch (Exception e) {
114 getLogger().fatalError(e.getMessage(), e);
115 throw e;
116 }
117
118 for ( int i = 0 ; i < worker.length ; i++ ) {
119 worker[i] = new SpoolerRunnable(threadIdleTime,spoolPath);
120 if ( worker[i] instanceof LogEnabled ) {
121 ((LogEnabled)worker[i]).enableLogging(getLogger());
122 }
123 }
124
125
126 for ( int i = 0 ; i < worker.length ; i++ ) {
127 new Thread(worker[i],"NNTPSpool-"+i).start();
128 }
129 }
130
131 /***
132 * Sets the repository used by this spooler.
133 *
134 * @param repo the repository to be used
135 */
136 void setRepository(NNTPRepository repo) {
137 for ( int i = 0 ; i < worker.length ; i++ ) {
138 worker[i].setRepository(repo);
139 }
140 }
141
142 /***
143 * Sets the article id repository used by this spooler.
144 *
145 * @param articleIDRepo the article id repository to be used
146 */
147 void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
148 for ( int i = 0 ; i < worker.length ; i++ ) {
149 worker[i].setArticleIDRepository(articleIDRepo);
150 }
151 }
152
153 /***
154 * Returns (and creates, if the directory doesn't already exist) the
155 * spool directory
156 *
157 * @return the spool directory
158 */
159 File getSpoolPath() {
160 return spoolPath;
161 }
162
163 /***
164 * A static inner class that provides the body for the spool
165 * threads.
166 */
167 static class SpoolerRunnable extends AbstractLogEnabled implements Runnable {
168
169 private static final Lock lock = new Lock();
170
171 /***
172 * The directory containing entries to be spooled.
173 */
174 private final File spoolPath;
175
176 /***
177 * The time the spooler thread sleeps between processing
178 */
179 private final int threadIdleTime;
180
181 /***
182 * The article ID repository used by this spooler thread
183 */
184 private ArticleIDRepository articleIDRepo;
185
186 /***
187 * The NNTP repository used by this spooler thread
188 */
189 private NNTPRepository repo;
190
191 SpoolerRunnable(int threadIdleTime,File spoolPath) {
192 this.threadIdleTime = threadIdleTime;
193 this.spoolPath = spoolPath;
194 }
195
196 /***
197 * Sets the article id repository used by this spooler thread.
198 *
199 * @param articleIDRepo the article id repository to be used
200 */
201 void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
202 this.articleIDRepo = articleIDRepo;
203 }
204
205 /***
206 * Sets the repository used by this spooler thread.
207 *
208 * @param repo the repository to be used
209 */
210 void setRepository(NNTPRepository repo) {
211 this.repo = repo;
212 }
213
214 /***
215 * The threads race to grab a lock. if a thread wins it processes the article,
216 * if it loses it tries to lock and process the next article.
217 */
218 public void run() {
219 getLogger().debug(Thread.currentThread().getName() + " is the NNTP spooler thread.");
220 try {
221 while ( Thread.currentThread().interrupted() == false ) {
222 String[] list = spoolPath.list();
223 if (list.length > 0) getLogger().debug("Files to process: "+list.length);
224 for ( int i = 0 ; i < list.length ; i++ ) {
225 if ( lock.lock(list[i]) ) {
226 File f = new File(spoolPath,list[i]).getAbsoluteFile();
227 getLogger().debug("Processing file: "+f.getAbsolutePath());
228 try {
229 process(f);
230 } catch(Throwable ex) {
231 getLogger().debug("Exception occured while processing file: "+
232 f.getAbsolutePath(),ex);
233 } finally {
234 lock.unlock(list[i]);
235 }
236 }
237 list[i] = null;
238 }
239 list = null;
240
241 try {
242 Thread.currentThread().sleep(threadIdleTime);
243 } catch(InterruptedException ex) {
244
245 }
246 }
247 } finally {
248 Thread.currentThread().interrupted();
249 }
250 }
251
252 /***
253 * Process a file stored in the spool.
254 *
255 * @param f the spool file being processed
256 */
257 private void process(File spoolFile) throws Exception {
258 StringBuffer logBuffer =
259 new StringBuffer(160)
260 .append("process: ")
261 .append(spoolFile.getAbsolutePath())
262 .append(",")
263 .append(spoolFile.getCanonicalPath());
264 getLogger().debug(logBuffer.toString());
265 final MimeMessage msg;
266 String articleID;
267
268 {
269 FileInputStream fin = new FileInputStream(spoolFile);
270 try {
271 msg = new MimeMessage(null,fin);
272 } finally {
273 IOUtil.shutdownStream(fin);
274 }
275
276 String lineCount = null;
277 String[] lineCountHeader = msg.getHeader("Lines");
278 if (lineCountHeader == null || lineCountHeader.length == 0) {
279 BufferedReader rdr = new BufferedReader(new InputStreamReader(msg.getDataHandler().getInputStream()));
280 int lines = 0;
281 while (rdr.readLine() != null) {
282 lines++;
283 }
284
285 lineCount = Integer.toString(lines);
286 rdr.close();
287
288 msg.setHeader("Lines", lineCount);
289 }
290
291
292 String[] idheader = msg.getHeader("Message-Id");
293 articleID = ((idheader != null && (idheader.length > 0))? idheader[0] : null);
294 if ((articleID != null) && ( articleIDRepo.isExists(articleID))) {
295 getLogger().debug("Message already exists: "+articleID);
296 if (spoolFile.delete() == false)
297 getLogger().error("Could not delete duplicate message from spool: " + spoolFile.getAbsolutePath());
298 return;
299 }
300 if ( articleID == null || lineCount != null) {
301 if (articleID == null) {
302 articleID = articleIDRepo.generateArticleID();
303 msg.setHeader("Message-Id", articleID);
304 }
305 FileOutputStream fout = new FileOutputStream(spoolFile);
306 try {
307 msg.writeTo(fout);
308 } finally {
309 IOUtil.shutdownStream(fout);
310 }
311 }
312 }
313
314 String[] headers = msg.getHeader("Newsgroups");
315 Properties prop = new Properties();
316 if (headers != null) {
317 for ( int i = 0 ; i < headers.length ; i++ ) {
318 StringTokenizer tokenizer = new StringTokenizer(headers[i],",");
319 while ( tokenizer.hasMoreTokens() ) {
320 String groupName = tokenizer.nextToken().trim();
321 getLogger().debug("Copying message to group: "+groupName);
322 NNTPGroup group = repo.getGroup(groupName);
323 if ( group == null ) {
324 getLogger().error("Couldn't add article with article ID " + articleID + " to group " + groupName + " - group not found.");
325 continue;
326 }
327
328 FileInputStream newsStream = new FileInputStream(spoolFile);
329 try {
330 NNTPArticle article = group.addArticle(newsStream);
331 prop.setProperty(group.getName(),article.getArticleNumber() + "");
332 } finally {
333 IOUtil.shutdownStream(newsStream);
334 }
335 }
336 }
337 }
338 articleIDRepo.addArticle(articleID,prop);
339 boolean delSuccess = spoolFile.delete();
340 if ( delSuccess == false ) {
341 getLogger().error("Could not delete file: " + spoolFile.getAbsolutePath());
342 }
343 }
344 }
345 }