View Javadoc

1   /************************************************************************
2    * Copyright (c) 1999-2006 The Apache Software Foundation.             *
3    * All rights reserved.                                                *
4    * ------------------------------------------------------------------- *
5    * Licensed under the Apache License, Version 2.0 (the "License"); you *
6    * may not use this file except in compliance with the License. You    *
7    * may obtain a copy of the License at:                                *
8    *                                                                     *
9    *     http://www.apache.org/licenses/LICENSE-2.0                      *
10   *                                                                     *
11   * Unless required by applicable law or agreed to in writing, software *
12   * distributed under the License is distributed on an "AS IS" BASIS,   *
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or     *
14   * implied.  See the License for the specific language governing       *
15   * permissions and limitations under the License.                      *
16   ***********************************************************************/
17  
18  package org.apache.james.transport;
19  
20  import org.apache.avalon.framework.activity.Disposable;
21  import org.apache.avalon.framework.activity.Initializable;
22  import org.apache.avalon.framework.configuration.Configurable;
23  import org.apache.avalon.framework.configuration.Configuration;
24  import org.apache.avalon.framework.configuration.ConfigurationException;
25  import org.apache.avalon.framework.container.ContainerUtil;
26  import org.apache.avalon.framework.logger.AbstractLogEnabled;
27  import org.apache.avalon.framework.service.DefaultServiceManager;
28  import org.apache.avalon.framework.service.ServiceException;
29  import org.apache.avalon.framework.service.ServiceManager;
30  import org.apache.avalon.framework.service.Serviceable;
31  import org.apache.james.services.MailetLoader;
32  import org.apache.james.services.MatcherLoader;
33  import org.apache.james.services.SpoolRepository;
34  import org.apache.mailet.Mail;
35  import org.apache.mailet.Mailet;
36  import org.apache.mailet.MailetException;
37  import org.apache.mailet.Matcher;
38  
39  import javax.mail.MessagingException;
40  
41  import java.util.Collection;
42  import java.util.HashMap;
43  import java.util.Iterator;
44  
45  /***
46   * Manages the mail spool.  This class is responsible for retrieving
47   * messages from the spool, directing messages to the appropriate
48   * processor, and removing them from the spool when processing is
49   * complete.
50   *
51   * @version CVS $Revision: 428557 $ $Date: 2006-08-03 22:56:47 +0000 (gio, 03 ago 2006) $
52   */
53  public class JamesSpoolManager
54      extends AbstractLogEnabled
55      implements Serviceable, Configurable, Initializable, Runnable, Disposable {
56  
57      /***
58       * System component manager
59       */
60      private DefaultServiceManager compMgr;
61  
62      /***
63       * The configuration object used by this spool manager.
64       */
65      private Configuration conf;
66  
67      /***
68       * The spool that this manager will process
69       */
70      private SpoolRepository spool;
71  
72      /***
73       * The map of processor names to processors
74       */
75      private HashMap processors;
76  
77      /***
78       * The number of threads used to move mail through the spool.
79       */
80      private int numThreads;
81  
82      /***
83       * The ThreadPool containing worker threads.
84       *
85       * This used to be used, but for threads that lived the entire
86       * lifespan of the application.  Currently commented out.  In
87       * the future, we could use a thread pool to run short-lived
88       * workers, so that we have a smaller number of readers that
89       * accept a message from the spool, and dispatch to a pool of
90       * worker threads that process the message.
91       */
92      // private ThreadPool workerPool;
93  
94      /***
95       * The ThreadManager from which the thread pool is obtained.
96       */
97      // private ThreadManager threadManager;
98  
99      /***
100      * Number of active threads
101      */
102     private int numActive;
103 
104     /***
105      * Spool threads are active
106      */
107     private boolean active;
108 
109     /***
110      * Spool threads
111      */
112     private Collection spoolThreads;
113 
114     /***
115      * @see org.apache.avalon.framework.service.Serviceable#service(ServiceManager)
116      */
117     public void service(ServiceManager comp) throws ServiceException {
118         // threadManager = (ThreadManager) comp.lookup(ThreadManager.ROLE);
119         compMgr = new DefaultServiceManager(comp);
120     }
121 
122     /***
123      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
124      */
125     public void configure(Configuration conf) throws ConfigurationException {
126         this.conf = conf;
127         numThreads = conf.getChild("threads").getValueAsInteger(1);
128     }
129 
130     /***
131      * @see org.apache.avalon.framework.activity.Initializable#initialize()
132      */
133     public void initialize() throws Exception {
134 
135         getLogger().info("JamesSpoolManager init...");
136         spool = (SpoolRepository) compMgr.lookup(SpoolRepository.ROLE);
137 
138         MailetLoader mailetLoader
139         = (MailetLoader) compMgr.lookup(MailetLoader.ROLE);
140         MatcherLoader matchLoader
141         = (MatcherLoader) compMgr.lookup(MatcherLoader.ROLE);
142 
143         //A processor is a Collection of
144         processors = new HashMap();
145 
146         final Configuration[] processorConfs = conf.getChildren( "processor" );
147         for ( int i = 0; i < processorConfs.length; i++ )
148         {
149             Configuration processorConf = processorConfs[i];
150             String processorName = processorConf.getAttribute("name");
151             try {
152                 LinearProcessor processor = new LinearProcessor();
153                 setupLogger(processor, processorName);
154                 processor.setSpool(spool);
155                 processor.initialize();
156                 processors.put(processorName, processor);
157 
158                 final Configuration[] mailetConfs
159                     = processorConf.getChildren( "mailet" );
160                 // Loop through the mailet configuration, load
161                 // all of the matcher and mailets, and add
162                 // them to the processor.
163                 for ( int j = 0; j < mailetConfs.length; j++ )
164                 {
165                     Configuration c = mailetConfs[j];
166                     String mailetClassName = c.getAttribute("class");
167                     String matcherName = c.getAttribute("match");
168                     Mailet mailet = null;
169                     Matcher matcher = null;
170                     try {
171                         matcher = matchLoader.getMatcher(matcherName);
172                         //The matcher itself should log that it's been inited.
173                         if (getLogger().isInfoEnabled()) {
174                             StringBuffer infoBuffer =
175                                 new StringBuffer(64)
176                                         .append("Matcher ")
177                                         .append(matcherName)
178                                         .append(" instantiated.");
179                             getLogger().info(infoBuffer.toString());
180                         }
181                     } catch (MessagingException ex) {
182                         // **** Do better job printing out exception
183                         if (getLogger().isErrorEnabled()) {
184                             StringBuffer errorBuffer =
185                                 new StringBuffer(256)
186                                         .append("Unable to init matcher ")
187                                         .append(matcherName)
188                                         .append(": ")
189                                         .append(ex.toString());
190                             getLogger().error( errorBuffer.toString(), ex );
191                 if (ex.getNextException() != null) {
192                 getLogger().error( "Caused by nested exception: ", ex.getNextException());
193                 }
194                         }
195                         System.err.println("Unable to init matcher " + matcherName);
196                         System.err.println("Check spool manager logs for more details.");
197                         //System.exit(1);
198                         throw ex;
199                     }
200                     try {
201                         mailet = mailetLoader.getMailet(mailetClassName, c);
202                         if (getLogger().isInfoEnabled()) {
203                             StringBuffer infoBuffer =
204                                 new StringBuffer(64)
205                                         .append("Mailet ")
206                                         .append(mailetClassName)
207                                         .append(" instantiated.");
208                             getLogger().info(infoBuffer.toString());
209                         }
210                     } catch (MessagingException ex) {
211                         // **** Do better job printing out exception
212                         if (getLogger().isErrorEnabled()) {
213                             StringBuffer errorBuffer =
214                                 new StringBuffer(256)
215                                         .append("Unable to init mailet ")
216                                         .append(mailetClassName)
217                                         .append(": ")
218                                         .append(ex.toString());
219                             getLogger().error( errorBuffer.toString(), ex );
220                 if (ex.getNextException() != null) {
221                 getLogger().error( "Caused by nested exception: ", ex.getNextException());
222                 }
223                         }
224                         System.err.println("Unable to init mailet " + mailetClassName);
225                         System.err.println("Check spool manager logs for more details.");
226                         //System.exit(1);
227                         throw ex;
228                     }
229                     //Add this pair to the processor
230                     processor.add(matcher, mailet);
231                 }
232 
233                 // Close the processor matcher/mailet lists.
234                 //
235                 // Please note that this is critical to the proper operation
236                 // of the LinearProcessor code.  The processor will not be
237                 // able to service mails until this call is made.
238                 processor.closeProcessorLists();
239 
240                 if (getLogger().isInfoEnabled()) {
241                     StringBuffer infoBuffer =
242                         new StringBuffer(64)
243                                 .append("Processor ")
244                                 .append(processorName)
245                                 .append(" instantiated.");
246                     getLogger().info(infoBuffer.toString());
247                 }
248             } catch (Exception ex) {
249                 if (getLogger().isErrorEnabled()) {
250                     StringBuffer errorBuffer =
251                        new StringBuffer(256)
252                                .append("Unable to init processor ")
253                                .append(processorName)
254                                .append(": ")
255                                .append(ex.toString());
256                     getLogger().error( errorBuffer.toString(), ex );
257                 }
258                 throw ex;
259             }
260         }
261         if (getLogger().isInfoEnabled()) {
262             StringBuffer infoBuffer =
263                 new StringBuffer(64)
264                     .append("Spooler Manager uses ")
265                     .append(numThreads)
266                     .append(" Thread(s)");
267             getLogger().info(infoBuffer.toString());
268         }
269 
270         active = true;
271         numActive = 0;
272         spoolThreads = new java.util.ArrayList(numThreads);
273         for ( int i = 0 ; i < numThreads ; i++ ) {
274             Thread reader = new Thread(this, "Spool Thread #" + i);
275             spoolThreads.add(reader);
276             reader.start();
277         }
278     }
279 
280     /***
281      * This routinely checks the message spool for messages, and processes
282      * them as necessary
283      */
284     public void run() {
285 
286         if (getLogger().isInfoEnabled())
287         {
288             getLogger().info("Run JamesSpoolManager: "
289                              + Thread.currentThread().getName());
290             getLogger().info("Spool=" + spool.getClass().getName());
291         }
292 
293         numActive++;
294         while(active) {
295             String key = null;
296             try {
297                 Mail mail = (Mail)spool.accept();
298                 key = mail.getName();
299                 if (getLogger().isDebugEnabled()) {
300                     StringBuffer debugBuffer =
301                         new StringBuffer(64)
302                                 .append("==== Begin processing mail ")
303                                 .append(mail.getName())
304                                 .append("====");
305                     getLogger().debug(debugBuffer.toString());
306                 }
307                 process(mail);
308                 // Only remove an email from the spool is processing is
309                 // complete, or if it has no recipients
310                 if ((Mail.GHOST.equals(mail.getState())) ||
311                     (mail.getRecipients() == null) ||
312                     (mail.getRecipients().size() == 0)) {
313                     ContainerUtil.dispose(mail);
314                     spool.remove(key);
315                     if (getLogger().isDebugEnabled()) {
316                         StringBuffer debugBuffer =
317                             new StringBuffer(64)
318                                     .append("==== Removed from spool mail ")
319                                     .append(key)
320                                     .append("====");
321                         getLogger().debug(debugBuffer.toString());
322                     }
323                 }
324                 else {
325                     // spool.remove() has a side-effect!  It unlocks the
326                     // message so that other threads can work on it!  If
327                     // we don't remove it, we must unlock it!
328                     spool.store(mail);
329                     ContainerUtil.dispose(mail);
330                     spool.unlock(key);
331                     // Do not notify: we simply updated the current mail
332                     // and we are able to reprocess it now.
333                 }
334                 mail = null;
335             } catch (InterruptedException ie) {
336                 getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
337             } catch (Throwable e) {
338                 if (getLogger().isErrorEnabled()) {
339                     getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
340                                       + e.getMessage(), e);
341                 }
342                 /* Move the mail to ERROR state?  If we do, it could be
343                  * deleted if an error occurs in the ERROR processor.
344                  * Perhaps the answer is to resolve that issue by
345                  * having a special state for messages that are not to
346                  * be processed, but aren't to be deleted?  The message
347                  * would already be in the spool, but would not be
348                  * touched again.
349                 if (mail != null) {
350                     try {
351                         mail.setState(Mail.ERROR);
352                         spool.store(mail);
353                     }
354                 }
355                 */
356             }
357         }
358         if (getLogger().isInfoEnabled())
359         {
360             getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
361         }
362         numActive--;
363     }
364 
365     /***
366      * Process this mail message by the appropriate processor as designated
367      * in the state of the Mail object.
368      *
369      * @param mail the mail message to be processed
370      */
371     protected void process(Mail mail) {
372         while (true) {
373             String processorName = mail.getState();
374             if (processorName.equals(Mail.GHOST)) {
375                 //This message should disappear
376                 return;
377             }
378             try {
379                 LinearProcessor processor
380                     = (LinearProcessor)processors.get(processorName);
381                 if (processor == null) {
382                     StringBuffer exceptionMessageBuffer =
383                         new StringBuffer(128)
384                             .append("Unable to find processor ")
385                             .append(processorName)
386                             .append(" requested for processing of ")
387                             .append(mail.getName());
388                     String exceptionMessage = exceptionMessageBuffer.toString();
389                     getLogger().debug(exceptionMessage);
390                     mail.setState(Mail.ERROR);
391                     throw new MailetException(exceptionMessage);
392                 }
393                 StringBuffer logMessageBuffer = null;
394                 if (getLogger().isDebugEnabled()) {
395                     logMessageBuffer =
396                         new StringBuffer(64)
397                                 .append("Processing ")
398                                 .append(mail.getName())
399                                 .append(" through ")
400                                 .append(processorName);
401                     getLogger().debug(logMessageBuffer.toString());
402                 }
403                 processor.service(mail);
404                 if (getLogger().isDebugEnabled()) {
405                     logMessageBuffer =
406                         new StringBuffer(128)
407                                 .append("Processed ")
408                                 .append(mail.getName())
409                                 .append(" through ")
410                                 .append(processorName);
411                     getLogger().debug(logMessageBuffer.toString());
412                     getLogger().debug("Result was " + mail.getState());
413                 }
414                 return;
415             } catch (Throwable e) {
416                 // This is a strange error situation that shouldn't ordinarily
417                 // happen
418                 StringBuffer exceptionBuffer = 
419                     new StringBuffer(64)
420                             .append("Exception in processor <")
421                             .append(processorName)
422                             .append(">");
423                 getLogger().error(exceptionBuffer.toString(), e);
424                 if (processorName.equals(Mail.ERROR)) {
425                     // We got an error on the error processor...
426                     // kill the message
427                     mail.setState(Mail.GHOST);
428                     mail.setErrorMessage(e.getMessage());
429                 } else {
430                     //We got an error... send it to the requested processor
431                     if (!(e instanceof MessagingException)) {
432                         //We got an error... send it to the error processor
433                         mail.setState(Mail.ERROR);
434                     }
435                     mail.setErrorMessage(e.getMessage());
436                 }
437             }
438             if (getLogger().isErrorEnabled()) {
439                 StringBuffer logMessageBuffer =
440                     new StringBuffer(128)
441                             .append("An error occurred processing ")
442                             .append(mail.getName())
443                             .append(" through ")
444                             .append(processorName);
445                 getLogger().error(logMessageBuffer.toString());
446                 getLogger().error("Result was " + mail.getState());
447             }
448         }
449     }
450 
451     /***
452      * The dispose operation is called at the end of a components lifecycle.
453      * Instances of this class use this method to release and destroy any
454      * resources that they own.
455      *
456      * This implementation shuts down the LinearProcessors managed by this
457      * JamesSpoolManager
458      *
459      * @throws Exception if an error is encountered during shutdown
460      */
461     public void dispose() {
462         getLogger().info("JamesSpoolManager dispose...");
463         active = false; // shutdown the threads
464         for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) {
465             ((Thread) it.next()).interrupt(); // interrupt any waiting accept() calls.
466         }
467 
468         long stop = System.currentTimeMillis() + 60000;
469         // give the spooler threads one minute to terminate gracefully
470         while (numActive != 0 && stop > System.currentTimeMillis()) {
471             try {
472                 Thread.sleep(1000);
473             } catch (Exception ignored) {}
474         }
475         getLogger().info("JamesSpoolManager thread shutdown completed.");
476 
477         Iterator it = processors.keySet().iterator();
478         while (it.hasNext()) {
479             String processorName = (String)it.next();
480             if (getLogger().isDebugEnabled()) {
481                 getLogger().debug("Processor " + processorName);
482             }
483             LinearProcessor processor = (LinearProcessor)processors.get(processorName);
484             processor.dispose();
485             processors.remove(processor);
486         }
487     }
488 
489 }