View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  
22  package org.apache.james.nntpserver.repository;
23  
24  import org.apache.avalon.framework.activity.Initializable;
25  import org.apache.avalon.framework.configuration.Configurable;
26  import org.apache.avalon.framework.configuration.Configuration;
27  import org.apache.avalon.framework.configuration.ConfigurationException;
28  import org.apache.avalon.framework.container.ContainerUtil;
29  import org.apache.avalon.framework.logger.AbstractLogEnabled;
30  import org.apache.avalon.framework.service.ServiceException;
31  import org.apache.avalon.framework.service.ServiceManager;
32  import org.apache.avalon.framework.service.Serviceable;
33  import org.apache.james.services.FileSystem;
34  import org.apache.james.util.Lock;
35  import org.apache.james.util.io.IOUtil;
36  
37  import javax.mail.internet.MimeMessage;
38  
39  import java.io.BufferedReader;
40  import java.io.File;
41  import java.io.FileInputStream;
42  import java.io.FileOutputStream;
43  import java.io.InputStreamReader;
44  import java.util.Properties;
45  import java.util.StringTokenizer;
46  
47  /**
48   * Processes entries and sends to appropriate groups.
49   * Eats up inappropriate entries.
50   *
51   */
52  class NNTPSpooler extends AbstractLogEnabled
53          implements Serviceable, Configurable, Initializable {
54  
55      /**
56       * The array of spooler runnables, each associated with a Worker thread
57       */
58      private SpoolerRunnable[] worker;
59  
60      /**
61       * The directory containing entries to be spooled.
62       */
63      private File spoolPath;
64  
65      /**
66       * The String form of the spool directory.
67       */
68      private String spoolPathString;
69  
70      /**
71       * The time the spooler threads sleep between processing
72       */
73      private int threadIdleTime = 0;
74  
75      /**
76       * The filesystem service
77       */
78      private FileSystem fileSystem;
79  
80      /**
81       * @see org.apache.avalon.framework.service.Serviceable#service(org.apache.avalon.framework.service.ServiceManager)
82       */
83      public void service(final ServiceManager serviceManager)
84              throws ServiceException {
85          setFileSystem((FileSystem) serviceManager.lookup(FileSystem.ROLE));
86      }
87  
88      /**
89       * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
90       */
91      public void configure( Configuration configuration ) throws ConfigurationException {
92          int threadCount = configuration.getChild("threadCount").getValueAsInteger(1);
93          threadIdleTime = configuration.getChild("threadIdleTime").getValueAsInteger(60 * 1000);
94          spoolPathString = configuration.getChild("spoolPath").getValue();
95          worker = new SpoolerRunnable[threadCount];
96      }
97  
98      /**
99       * @see org.apache.avalon.framework.activity.Initializable#initialize()
100      */
101     public void initialize() throws Exception {
102         //System.out.println(getClass().getName()+": init");
103 
104         try {
105             spoolPath = fileSystem.getFile(spoolPathString);
106             if ( spoolPath.exists() == false ) {
107                 spoolPath.mkdirs();
108             } else if (!(spoolPath.isDirectory())) {
109                 StringBuffer errorBuffer =
110                     new StringBuffer(128)
111                         .append("Spool directory is improperly configured.  The specified path ")
112                         .append(spoolPathString)
113                         .append(" is not a directory.");
114                 throw new ConfigurationException(errorBuffer.toString());
115             }
116         } catch (Exception e) {
117             getLogger().fatalError(e.getMessage(), e);
118             throw e;
119         }
120 
121         for ( int i = 0 ; i < worker.length ; i++ ) {
122             worker[i] = new SpoolerRunnable(threadIdleTime,spoolPath);
123             ContainerUtil.enableLogging(worker[i], getLogger());
124         }
125 
126         // TODO: Replace this with a standard Avalon thread pool
127         for ( int i = 0 ; i < worker.length ; i++ ) {
128             new Thread(worker[i],"NNTPSpool-"+i).start();
129         }
130     }
131 
132     /**
133      * Sets the repository used by this spooler.
134      *
135      * @param repo the repository to be used
136      */
137     void setRepository(NNTPRepository repo) {
138         for ( int i = 0 ; i < worker.length ; i++ ) {
139             worker[i].setRepository(repo);
140         }
141     }
142 
143     /**
144      * Sets the article id repository used by this spooler.
145      *
146      * @param articleIDRepo the article id repository to be used
147      */
148     void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
149         for ( int i = 0 ; i < worker.length ; i++ ) {
150             worker[i].setArticleIDRepository(articleIDRepo);
151         }
152     }
153 
154     /**
155      * Returns (and creates, if the directory doesn't already exist) the
156      * spool directory
157      *
158      * @return the spool directory
159      */
160     File getSpoolPath() {
161         return spoolPath;
162     }
163 
164     /**
165      * A static inner class that provides the body for the spool
166      * threads.
167      */
168     static class SpoolerRunnable extends AbstractLogEnabled implements Runnable {
169 
170         private static final Lock lock = new Lock();
171 
172         /**
173          * The directory containing entries to be spooled.
174          */
175         private final File spoolPath;
176 
177         /**
178          * The time the spooler thread sleeps between processing
179          */
180         private final int threadIdleTime;
181 
182         /**
183          * The article ID repository used by this spooler thread
184          */
185         private ArticleIDRepository articleIDRepo;
186 
187         /**
188          * The NNTP repository used by this spooler thread
189          */
190         private NNTPRepository repo;
191 
192         SpoolerRunnable(int threadIdleTime,File spoolPath) {
193             this.threadIdleTime = threadIdleTime;
194             this.spoolPath = spoolPath;
195         }
196 
197         /**
198          * Sets the article id repository used by this spooler thread.
199          *
200          * @param articleIDRepo the article id repository to be used
201          */
202         void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
203             this.articleIDRepo = articleIDRepo;
204         }
205 
206         /**
207          * Sets the repository used by this spooler thread.
208          *
209          * @param repo the repository to be used
210          */
211         void setRepository(NNTPRepository repo) {
212             this.repo = repo;
213         }
214 
215         /**
216          * The threads race to grab a lock. if a thread wins it processes the article,
217          * if it loses it tries to lock and process the next article.
218          */
219         public void run() {
220             getLogger().debug(Thread.currentThread().getName() + " is the NNTP spooler thread.");
221             try {
222                 while ( Thread.interrupted() == false ) {
223                     String[] list = spoolPath.list();
224                     if (list.length > 0) getLogger().debug("Files to process: "+list.length);
225                     for ( int i = 0 ; i < list.length ; i++ ) {
226                         if ( lock.lock(list[i]) ) {
227                             File f = new File(spoolPath,list[i]).getAbsoluteFile();
228                             getLogger().debug("Processing file: "+f.getAbsolutePath());
229                             try {
230                                 process(f);
231                             } catch(Throwable ex) {
232                                 getLogger().debug("Exception occured while processing file: "+
233                                                   f.getAbsolutePath(),ex);
234                             } finally {
235                                 lock.unlock(list[i]);
236                             }
237                         }
238                         list[i] = null; // release the string entry;
239                     }
240                     list = null; // release the array;
241                     // this is good for other non idle threads
242                     try {
243                         Thread.sleep(threadIdleTime);
244                     } catch(InterruptedException ex) {
245                         // Ignore and continue
246                     }
247                 }
248             } finally {
249                 Thread.interrupted();
250             }
251         }
252 
253         /**
254          * Process a file stored in the spool.
255          *
256          * @param spoolFile the spool file being processed
257          */
258         private void process(File spoolFile) throws Exception {
259             StringBuffer logBuffer =
260                 new StringBuffer(160)
261                         .append("process: ")
262                         .append(spoolFile.getAbsolutePath())
263                         .append(",")
264                         .append(spoolFile.getCanonicalPath());
265             getLogger().debug(logBuffer.toString());
266             final MimeMessage msg;
267             String articleID;
268             // TODO: Why is this a block?
269             {   // Get the message for copying to destination groups.
270                 FileInputStream fin = new FileInputStream(spoolFile);
271                 try {
272                     msg = new MimeMessage(null,fin);
273                 } finally {
274                     IOUtil.shutdownStream(fin);
275                 }
276 
277                 String lineCount = null;
278                 String[] lineCountHeader = msg.getHeader("Lines");
279                 if (lineCountHeader == null || lineCountHeader.length == 0) {
280                     BufferedReader rdr = new BufferedReader(new InputStreamReader(msg.getDataHandler().getInputStream()));
281                     int lines = 0;
282                     while (rdr.readLine() != null) {
283                         lines++;
284                     }
285 
286                     lineCount = Integer.toString(lines);
287                     rdr.close();
288 
289                     msg.setHeader("Lines", lineCount);
290                 }
291 
292                 // ensure no duplicates exist.
293                 String[] idheader = msg.getHeader("Message-Id");
294                 articleID = ((idheader != null && (idheader.length > 0))? idheader[0] : null);
295                 if ((articleID != null) && ( articleIDRepo.isExists(articleID))) {
296                     getLogger().debug("Message already exists: "+articleID);
297                     if (spoolFile.delete() == false)
298                         getLogger().error("Could not delete duplicate message from spool: " + spoolFile.getAbsolutePath());
299                     return;
300                 }
301                 if ( articleID == null || lineCount != null) {
302                     if (articleID == null) {
303                         articleID = articleIDRepo.generateArticleID();
304                         msg.setHeader("Message-Id", articleID);
305                     }
306                     FileOutputStream fout = new FileOutputStream(spoolFile);
307                     try {
308                         msg.writeTo(fout);
309                     } finally {
310                         IOUtil.shutdownStream(fout);
311                     }
312                 }
313             }
314 
315             String[] headers = msg.getHeader("Newsgroups");
316             Properties prop = new Properties();
317             if (headers != null) {
318                 for ( int i = 0 ; i < headers.length ; i++ ) {
319                     StringTokenizer tokenizer = new StringTokenizer(headers[i],",");
320                     while ( tokenizer.hasMoreTokens() ) {
321                         String groupName = tokenizer.nextToken().trim();
322                         getLogger().debug("Copying message to group: "+groupName);
323                         NNTPGroup group = repo.getGroup(groupName);
324                         if ( group == null ) {
325                             getLogger().error("Couldn't add article with article ID " + articleID + " to group " + groupName + " - group not found.");
326                             continue;
327                         }
328 
329                         FileInputStream newsStream = new FileInputStream(spoolFile);
330                         try {
331                             NNTPArticle article = group.addArticle(newsStream);
332                             prop.setProperty(group.getName(),article.getArticleNumber() + "");
333                         } finally {
334                             IOUtil.shutdownStream(newsStream);
335                         }
336                     }
337                 }
338             }
339             articleIDRepo.addArticle(articleID,prop);
340             boolean delSuccess = spoolFile.delete();
341             if ( delSuccess == false ) {
342                 getLogger().error("Could not delete file: " + spoolFile.getAbsolutePath());
343             }
344         }
345     } // class SpoolerRunnable
346 
347     /**
348      * Setter for the fileSystem service
349      * 
350      * @param fileSystem fs
351      */
352     public void setFileSystem(FileSystem fileSystem) {
353         this.fileSystem = fileSystem;
354     }
355 }