View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.nntpserver.repository;
21  
22  import org.apache.avalon.framework.activity.Initializable;
23  import org.apache.avalon.framework.configuration.Configurable;
24  import org.apache.avalon.framework.configuration.Configuration;
25  import org.apache.avalon.framework.configuration.ConfigurationException;
26  import org.apache.avalon.framework.context.Context;
27  import org.apache.avalon.framework.context.ContextException;
28  import org.apache.avalon.framework.context.Contextualizable;
29  import org.apache.avalon.framework.logger.AbstractLogEnabled;
30  import org.apache.avalon.framework.logger.LogEnabled;
31  import org.apache.james.context.AvalonContextUtilities;
32  import org.apache.james.util.Lock;
33  import org.apache.james.util.io.IOUtil;
34  
35  import javax.mail.internet.MimeMessage;
36  import java.io.BufferedReader;
37  import java.io.File;
38  import java.io.FileInputStream;
39  import java.io.FileOutputStream;
40  import java.io.InputStreamReader;
41  import java.util.Properties;
42  import java.util.StringTokenizer;
43  
44  /***
45   * Processes entries and sends to appropriate groups.
46   * Eats up inappropriate entries.
47   *
48   */
49  class NNTPSpooler extends AbstractLogEnabled
50          implements Contextualizable, Configurable, Initializable {
51  
52      /***
53       * The spooler context
54       */
55      private Context context;
56  
57      /***
58       * The array of spooler runnables, each associated with a Worker thread
59       */
60      private SpoolerRunnable[] worker;
61  
62      /***
63       * The directory containing entries to be spooled.
64       */
65      private File spoolPath;
66  
67      /***
68       * The String form of the spool directory.
69       */
70      private String spoolPathString;
71  
72      /***
73       * The time the spooler threads sleep between processing
74       */
75      private int threadIdleTime = 0;
76  
77      /***
78       * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
79       */
80      public void contextualize(final Context context)
81              throws ContextException {
82          this.context = context;
83      }
84  
85      /***
86       * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
87       */
88      public void configure( Configuration configuration ) throws ConfigurationException {
89          int threadCount = configuration.getChild("threadCount").getValueAsInteger(1);
90          threadIdleTime = configuration.getChild("threadIdleTime").getValueAsInteger(60 * 1000);
91          spoolPathString = configuration.getChild("spoolPath").getValue();
92          worker = new SpoolerRunnable[threadCount];
93      }
94  
95      /***
96       * @see org.apache.avalon.framework.activity.Initializable#initialize()
97       */
98      public void initialize() throws Exception {
99          //System.out.println(getClass().getName()+": init");
100 
101         try {
102             spoolPath = AvalonContextUtilities.getFile(context, spoolPathString);
103             if ( spoolPath.exists() == false ) {
104                 spoolPath.mkdirs();
105             } else if (!(spoolPath.isDirectory())) {
106                 StringBuffer errorBuffer =
107                     new StringBuffer(128)
108                         .append("Spool directory is improperly configured.  The specified path ")
109                         .append(spoolPathString)
110                         .append(" is not a directory.");
111                 throw new ConfigurationException(errorBuffer.toString());
112             }
113         } catch (Exception e) {
114             getLogger().fatalError(e.getMessage(), e);
115             throw e;
116         }
117 
118         for ( int i = 0 ; i < worker.length ; i++ ) {
119             worker[i] = new SpoolerRunnable(threadIdleTime,spoolPath);
120             if ( worker[i] instanceof LogEnabled ) {
121                 ((LogEnabled)worker[i]).enableLogging(getLogger());
122             }
123         }
124 
125         // TODO: Replace this with a standard Avalon thread pool
126         for ( int i = 0 ; i < worker.length ; i++ ) {
127             new Thread(worker[i],"NNTPSpool-"+i).start();
128         }
129     }
130 
131     /***
132      * Sets the repository used by this spooler.
133      *
134      * @param repo the repository to be used
135      */
136     void setRepository(NNTPRepository repo) {
137         for ( int i = 0 ; i < worker.length ; i++ ) {
138             worker[i].setRepository(repo);
139         }
140     }
141 
142     /***
143      * Sets the article id repository used by this spooler.
144      *
145      * @param articleIDRepo the article id repository to be used
146      */
147     void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
148         for ( int i = 0 ; i < worker.length ; i++ ) {
149             worker[i].setArticleIDRepository(articleIDRepo);
150         }
151     }
152 
153     /***
154      * Returns (and creates, if the directory doesn't already exist) the
155      * spool directory
156      *
157      * @return the spool directory
158      */
159     File getSpoolPath() {
160         return spoolPath;
161     }
162 
163     /***
164      * A static inner class that provides the body for the spool
165      * threads.
166      */
167     static class SpoolerRunnable extends AbstractLogEnabled implements Runnable {
168 
169         private static final Lock lock = new Lock();
170 
171         /***
172          * The directory containing entries to be spooled.
173          */
174         private final File spoolPath;
175 
176         /***
177          * The time the spooler thread sleeps between processing
178          */
179         private final int threadIdleTime;
180 
181         /***
182          * The article ID repository used by this spooler thread
183          */
184         private ArticleIDRepository articleIDRepo;
185 
186         /***
187          * The NNTP repository used by this spooler thread
188          */
189         private NNTPRepository repo;
190 
191         SpoolerRunnable(int threadIdleTime,File spoolPath) {
192             this.threadIdleTime = threadIdleTime;
193             this.spoolPath = spoolPath;
194         }
195 
196         /***
197          * Sets the article id repository used by this spooler thread.
198          *
199          * @param articleIDRepo the article id repository to be used
200          */
201         void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
202             this.articleIDRepo = articleIDRepo;
203         }
204 
205         /***
206          * Sets the repository used by this spooler thread.
207          *
208          * @param repo the repository to be used
209          */
210         void setRepository(NNTPRepository repo) {
211             this.repo = repo;
212         }
213 
214         /***
215          * The threads race to grab a lock. if a thread wins it processes the article,
216          * if it loses it tries to lock and process the next article.
217          */
218         public void run() {
219             getLogger().debug(Thread.currentThread().getName() + " is the NNTP spooler thread.");
220             try {
221                 while ( Thread.currentThread().interrupted() == false ) {
222                     String[] list = spoolPath.list();
223                     if (list.length > 0) getLogger().debug("Files to process: "+list.length);
224                     for ( int i = 0 ; i < list.length ; i++ ) {
225                         if ( lock.lock(list[i]) ) {
226                             File f = new File(spoolPath,list[i]).getAbsoluteFile();
227                             getLogger().debug("Processing file: "+f.getAbsolutePath());
228                             try {
229                                 process(f);
230                             } catch(Throwable ex) {
231                                 getLogger().debug("Exception occured while processing file: "+
232                                                   f.getAbsolutePath(),ex);
233                             } finally {
234                                 lock.unlock(list[i]);
235                             }
236                         }
237                         list[i] = null; // release the string entry;
238                     }
239                     list = null; // release the array;
240                     // this is good for other non idle threads
241                     try {
242                         Thread.currentThread().sleep(threadIdleTime);
243                     } catch(InterruptedException ex) {
244                         // Ignore and continue
245                     }
246                 }
247             } finally {
248                 Thread.currentThread().interrupted();
249             }
250         }
251 
252         /***
253          * Process a file stored in the spool.
254          *
255          * @param f the spool file being processed
256          */
257         private void process(File spoolFile) throws Exception {
258             StringBuffer logBuffer =
259                 new StringBuffer(160)
260                         .append("process: ")
261                         .append(spoolFile.getAbsolutePath())
262                         .append(",")
263                         .append(spoolFile.getCanonicalPath());
264             getLogger().debug(logBuffer.toString());
265             final MimeMessage msg;
266             String articleID;
267             // TODO: Why is this a block?
268             {   // Get the message for copying to destination groups.
269                 FileInputStream fin = new FileInputStream(spoolFile);
270                 try {
271                     msg = new MimeMessage(null,fin);
272                 } finally {
273                     IOUtil.shutdownStream(fin);
274                 }
275 
276                 String lineCount = null;
277                 String[] lineCountHeader = msg.getHeader("Lines");
278                 if (lineCountHeader == null || lineCountHeader.length == 0) {
279                     BufferedReader rdr = new BufferedReader(new InputStreamReader(msg.getDataHandler().getInputStream()));
280                     int lines = 0;
281                     while (rdr.readLine() != null) {
282                         lines++;
283                     }
284 
285                     lineCount = Integer.toString(lines);
286                     rdr.close();
287 
288                     msg.setHeader("Lines", lineCount);
289                 }
290 
291                 // ensure no duplicates exist.
292                 String[] idheader = msg.getHeader("Message-Id");
293                 articleID = ((idheader != null && (idheader.length > 0))? idheader[0] : null);
294                 if ((articleID != null) && ( articleIDRepo.isExists(articleID))) {
295                     getLogger().debug("Message already exists: "+articleID);
296                     if (spoolFile.delete() == false)
297                         getLogger().error("Could not delete duplicate message from spool: " + spoolFile.getAbsolutePath());
298                     return;
299                 }
300                 if ( articleID == null || lineCount != null) {
301                     if (articleID == null) {
302                         articleID = articleIDRepo.generateArticleID();
303                         msg.setHeader("Message-Id", articleID);
304                     }
305                     FileOutputStream fout = new FileOutputStream(spoolFile);
306                     try {
307                         msg.writeTo(fout);
308                     } finally {
309                         IOUtil.shutdownStream(fout);
310                     }
311                 }
312             }
313 
314             String[] headers = msg.getHeader("Newsgroups");
315             Properties prop = new Properties();
316             if (headers != null) {
317                 for ( int i = 0 ; i < headers.length ; i++ ) {
318                     StringTokenizer tokenizer = new StringTokenizer(headers[i],",");
319                     while ( tokenizer.hasMoreTokens() ) {
320                         String groupName = tokenizer.nextToken().trim();
321                         getLogger().debug("Copying message to group: "+groupName);
322                         NNTPGroup group = repo.getGroup(groupName);
323                         if ( group == null ) {
324                             getLogger().error("Couldn't add article with article ID " + articleID + " to group " + groupName + " - group not found.");
325                             continue;
326                         }
327 
328                         FileInputStream newsStream = new FileInputStream(spoolFile);
329                         try {
330                             NNTPArticle article = group.addArticle(newsStream);
331                             prop.setProperty(group.getName(),article.getArticleNumber() + "");
332                         } finally {
333                             IOUtil.shutdownStream(newsStream);
334                         }
335                     }
336                 }
337             }
338             articleIDRepo.addArticle(articleID,prop);
339             boolean delSuccess = spoolFile.delete();
340             if ( delSuccess == false ) {
341                 getLogger().error("Could not delete file: " + spoolFile.getAbsolutePath());
342             }
343         }
344     } // class SpoolerRunnable
345 }