View Javadoc

1   /************************************************************************
2    * Copyright (c) 2000-2006 The Apache Software Foundation.             *
3    * All rights reserved.                                                *
4    * ------------------------------------------------------------------- *
5    * Licensed under the Apache License, Version 2.0 (the "License"); you *
6    * may not use this file except in compliance with the License. You    *
7    * may obtain a copy of the License at:                                *
8    *                                                                     *
9    *     http://www.apache.org/licenses/LICENSE-2.0                      *
10   *                                                                     *
11   * Unless required by applicable law or agreed to in writing, software *
12   * distributed under the License is distributed on an "AS IS" BASIS,   *
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or     *
14   * implied.  See the License for the specific language governing       *
15   * permissions and limitations under the License.                      *
16   ***********************************************************************/
17  
18  package org.apache.james.nntpserver.repository;
19  
20  import org.apache.avalon.framework.activity.Initializable;
21  import org.apache.avalon.framework.configuration.Configurable;
22  import org.apache.avalon.framework.configuration.Configuration;
23  import org.apache.avalon.framework.configuration.ConfigurationException;
24  import org.apache.avalon.framework.context.Context;
25  import org.apache.avalon.framework.context.ContextException;
26  import org.apache.avalon.framework.context.Contextualizable;
27  import org.apache.avalon.framework.logger.AbstractLogEnabled;
28  import org.apache.avalon.framework.logger.LogEnabled;
29  import org.apache.james.context.AvalonContextUtilities;
30  import org.apache.james.util.Lock;
31  import org.apache.james.util.io.IOUtil;
32  
33  import javax.mail.internet.MimeMessage;
34  import java.io.BufferedReader;
35  import java.io.File;
36  import java.io.FileInputStream;
37  import java.io.FileOutputStream;
38  import java.io.InputStreamReader;
39  import java.util.Properties;
40  import java.util.StringTokenizer;
41  
42  /***
43   * Processes entries and sends to appropriate groups.
44   * Eats up inappropriate entries.
45   *
46   */
47  class NNTPSpooler extends AbstractLogEnabled
48          implements Contextualizable, Configurable, Initializable {
49  
50      /***
51       * The spooler context
52       */
53      private Context context;
54  
55      /***
56       * The array of spooler runnables, each associated with a Worker thread
57       */
58      private SpoolerRunnable[] worker;
59  
60      /***
61       * The directory containing entries to be spooled.
62       */
63      private File spoolPath;
64  
65      /***
66       * The String form of the spool directory.
67       */
68      private String spoolPathString;
69  
70      /***
71       * The time the spooler threads sleep between processing
72       */
73      private int threadIdleTime = 0;
74  
75      /***
76       * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
77       */
78      public void contextualize(final Context context)
79              throws ContextException {
80          this.context = context;
81      }
82  
83      /***
84       * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
85       */
86      public void configure( Configuration configuration ) throws ConfigurationException {
87          int threadCount = configuration.getChild("threadCount").getValueAsInteger(1);
88          threadIdleTime = configuration.getChild("threadIdleTime").getValueAsInteger(60 * 1000);
89          spoolPathString = configuration.getChild("spoolPath").getValue();
90          worker = new SpoolerRunnable[threadCount];
91      }
92  
93      /***
94       * @see org.apache.avalon.framework.activity.Initializable#initialize()
95       */
96      public void initialize() throws Exception {
97          //System.out.println(getClass().getName()+": init");
98  
99          try {
100             spoolPath = AvalonContextUtilities.getFile(context, spoolPathString);
101             if ( spoolPath.exists() == false ) {
102                 spoolPath.mkdirs();
103             } else if (!(spoolPath.isDirectory())) {
104                 StringBuffer errorBuffer =
105                     new StringBuffer(128)
106                         .append("Spool directory is improperly configured.  The specified path ")
107                         .append(spoolPathString)
108                         .append(" is not a directory.");
109                 throw new ConfigurationException(errorBuffer.toString());
110             }
111         } catch (Exception e) {
112             getLogger().fatalError(e.getMessage(), e);
113             throw e;
114         }
115 
116         for ( int i = 0 ; i < worker.length ; i++ ) {
117             worker[i] = new SpoolerRunnable(threadIdleTime,spoolPath);
118             if ( worker[i] instanceof LogEnabled ) {
119                 ((LogEnabled)worker[i]).enableLogging(getLogger());
120             }
121         }
122 
123         // TODO: Replace this with a standard Avalon thread pool
124         for ( int i = 0 ; i < worker.length ; i++ ) {
125             new Thread(worker[i],"NNTPSpool-"+i).start();
126         }
127     }
128 
129     /***
130      * Sets the repository used by this spooler.
131      *
132      * @param repo the repository to be used
133      */
134     void setRepository(NNTPRepository repo) {
135         for ( int i = 0 ; i < worker.length ; i++ ) {
136             worker[i].setRepository(repo);
137         }
138     }
139 
140     /***
141      * Sets the article id repository used by this spooler.
142      *
143      * @param articleIDRepo the article id repository to be used
144      */
145     void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
146         for ( int i = 0 ; i < worker.length ; i++ ) {
147             worker[i].setArticleIDRepository(articleIDRepo);
148         }
149     }
150 
151     /***
152      * Returns (and creates, if the directory doesn't already exist) the
153      * spool directory
154      *
155      * @return the spool directory
156      */
157     File getSpoolPath() {
158         return spoolPath;
159     }
160 
161     /***
162      * A static inner class that provides the body for the spool
163      * threads.
164      */
165     static class SpoolerRunnable extends AbstractLogEnabled implements Runnable {
166 
167         private static final Lock lock = new Lock();
168 
169         /***
170          * The directory containing entries to be spooled.
171          */
172         private final File spoolPath;
173 
174         /***
175          * The time the spooler thread sleeps between processing
176          */
177         private final int threadIdleTime;
178 
179         /***
180          * The article ID repository used by this spooler thread
181          */
182         private ArticleIDRepository articleIDRepo;
183 
184         /***
185          * The NNTP repository used by this spooler thread
186          */
187         private NNTPRepository repo;
188 
189         SpoolerRunnable(int threadIdleTime,File spoolPath) {
190             this.threadIdleTime = threadIdleTime;
191             this.spoolPath = spoolPath;
192         }
193 
194         /***
195          * Sets the article id repository used by this spooler thread.
196          *
197          * @param articleIDRepo the article id repository to be used
198          */
199         void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
200             this.articleIDRepo = articleIDRepo;
201         }
202 
203         /***
204          * Sets the repository used by this spooler thread.
205          *
206          * @param repo the repository to be used
207          */
208         void setRepository(NNTPRepository repo) {
209             this.repo = repo;
210         }
211 
212         /***
213          * The threads race to grab a lock. if a thread wins it processes the article,
214          * if it loses it tries to lock and process the next article.
215          */
216         public void run() {
217             getLogger().debug(Thread.currentThread().getName() + " is the NNTP spooler thread.");
218             try {
219                 while ( Thread.currentThread().interrupted() == false ) {
220                     String[] list = spoolPath.list();
221                     if (list.length > 0) getLogger().debug("Files to process: "+list.length);
222                     for ( int i = 0 ; i < list.length ; i++ ) {
223                         if ( lock.lock(list[i]) ) {
224                             File f = new File(spoolPath,list[i]).getAbsoluteFile();
225                             getLogger().debug("Processing file: "+f.getAbsolutePath());
226                             try {
227                                 process(f);
228                             } catch(Throwable ex) {
229                                 getLogger().debug("Exception occured while processing file: "+
230                                                   f.getAbsolutePath(),ex);
231                             } finally {
232                                 lock.unlock(list[i]);
233                             }
234                         }
235                         list[i] = null; // release the string entry;
236                     }
237                     list = null; // release the array;
238                     // this is good for other non idle threads
239                     try {
240                         Thread.currentThread().sleep(threadIdleTime);
241                     } catch(InterruptedException ex) {
242                         // Ignore and continue
243                     }
244                 }
245             } finally {
246                 Thread.currentThread().interrupted();
247             }
248         }
249 
250         /***
251          * Process a file stored in the spool.
252          *
253          * @param f the spool file being processed
254          */
255         private void process(File spoolFile) throws Exception {
256             StringBuffer logBuffer =
257                 new StringBuffer(160)
258                         .append("process: ")
259                         .append(spoolFile.getAbsolutePath())
260                         .append(",")
261                         .append(spoolFile.getCanonicalPath());
262             getLogger().debug(logBuffer.toString());
263             final MimeMessage msg;
264             String articleID;
265             // TODO: Why is this a block?
266             {   // Get the message for copying to destination groups.
267                 FileInputStream fin = new FileInputStream(spoolFile);
268                 try {
269                     msg = new MimeMessage(null,fin);
270                 } finally {
271                     IOUtil.shutdownStream(fin);
272                 }
273 
274                 String lineCount = null;
275                 String[] lineCountHeader = msg.getHeader("Lines");
276                 if (lineCountHeader == null || lineCountHeader.length == 0) {
277                     BufferedReader rdr = new BufferedReader(new InputStreamReader(msg.getDataHandler().getInputStream()));
278                     int lines = 0;
279                     while (rdr.readLine() != null) {
280                         lines++;
281                     }
282 
283                     lineCount = Integer.toString(lines);
284                     rdr.close();
285 
286                     msg.setHeader("Lines", lineCount);
287                 }
288 
289                 // ensure no duplicates exist.
290                 String[] idheader = msg.getHeader("Message-Id");
291                 articleID = ((idheader != null && (idheader.length > 0))? idheader[0] : null);
292                 if ((articleID != null) && ( articleIDRepo.isExists(articleID))) {
293                     getLogger().debug("Message already exists: "+articleID);
294                     if (spoolFile.delete() == false)
295                         getLogger().error("Could not delete duplicate message from spool: " + spoolFile.getAbsolutePath());
296                     return;
297                 }
298                 if ( articleID == null || lineCount != null) {
299                     if (articleID == null) {
300                         articleID = articleIDRepo.generateArticleID();
301                         msg.setHeader("Message-Id", articleID);
302                     }
303                     FileOutputStream fout = new FileOutputStream(spoolFile);
304                     try {
305                         msg.writeTo(fout);
306                     } finally {
307                         IOUtil.shutdownStream(fout);
308                     }
309                 }
310             }
311 
312             String[] headers = msg.getHeader("Newsgroups");
313             Properties prop = new Properties();
314             if (headers != null) {
315                 for ( int i = 0 ; i < headers.length ; i++ ) {
316                     StringTokenizer tokenizer = new StringTokenizer(headers[i],",");
317                     while ( tokenizer.hasMoreTokens() ) {
318                         String groupName = tokenizer.nextToken().trim();
319                         getLogger().debug("Copying message to group: "+groupName);
320                         NNTPGroup group = repo.getGroup(groupName);
321                         if ( group == null ) {
322                             getLogger().error("Couldn't add article with article ID " + articleID + " to group " + groupName + " - group not found.");
323                             continue;
324                         }
325 
326                         FileInputStream newsStream = new FileInputStream(spoolFile);
327                         try {
328                             NNTPArticle article = group.addArticle(newsStream);
329                             prop.setProperty(group.getName(),article.getArticleNumber() + "");
330                         } finally {
331                             IOUtil.shutdownStream(newsStream);
332                         }
333                     }
334                 }
335             }
336             articleIDRepo.addArticle(articleID,prop);
337             boolean delSuccess = spoolFile.delete();
338             if ( delSuccess == false ) {
339                 getLogger().error("Could not delete file: " + spoolFile.getAbsolutePath());
340             }
341         }
342     } // class SpoolerRunnable
343 }