View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.transport;
21  
22  import org.apache.avalon.framework.activity.Disposable;
23  import org.apache.avalon.framework.activity.Initializable;
24  import org.apache.avalon.framework.configuration.Configurable;
25  import org.apache.avalon.framework.configuration.Configuration;
26  import org.apache.avalon.framework.configuration.ConfigurationException;
27  import org.apache.avalon.framework.container.ContainerUtil;
28  import org.apache.avalon.framework.logger.AbstractLogEnabled;
29  import org.apache.avalon.framework.service.DefaultServiceManager;
30  import org.apache.avalon.framework.service.ServiceException;
31  import org.apache.avalon.framework.service.ServiceManager;
32  import org.apache.avalon.framework.service.Serviceable;
33  import org.apache.james.services.MailetLoader;
34  import org.apache.james.services.MatcherLoader;
35  import org.apache.james.services.SpoolRepository;
36  import org.apache.mailet.Mail;
37  import org.apache.mailet.Mailet;
38  import org.apache.mailet.MailetException;
39  import org.apache.mailet.Matcher;
40  
41  import javax.mail.MessagingException;
42  
43  import java.util.Collection;
44  import java.util.HashMap;
45  import java.util.Iterator;
46  
47  /***
48   * Manages the mail spool.  This class is responsible for retrieving
49   * messages from the spool, directing messages to the appropriate
50   * processor, and removing them from the spool when processing is
51   * complete.
52   *
53   * @version CVS $Revision: 494012 $ $Date: 2007-01-08 10:23:58 +0000 (lun, 08 gen 2007) $
54   */
55  public class JamesSpoolManager
56      extends AbstractLogEnabled
57      implements Serviceable, Configurable, Initializable, Runnable, Disposable {
58  
59      /***
60       * System component manager
61       */
62      private DefaultServiceManager compMgr;
63  
64      /***
65       * The configuration object used by this spool manager.
66       */
67      private Configuration conf;
68  
69      /***
70       * The spool that this manager will process
71       */
72      private SpoolRepository spool;
73  
74      /***
75       * The map of processor names to processors
76       */
77      private HashMap processors;
78  
79      /***
80       * The number of threads used to move mail through the spool.
81       */
82      private int numThreads;
83  
84      /***
85       * The ThreadPool containing worker threads.
86       *
87       * This used to be used, but for threads that lived the entire
88       * lifespan of the application.  Currently commented out.  In
89       * the future, we could use a thread pool to run short-lived
90       * workers, so that we have a smaller number of readers that
91       * accept a message from the spool, and dispatch to a pool of
92       * worker threads that process the message.
93       */
94      // private ThreadPool workerPool;
95  
96      /***
97       * The ThreadManager from which the thread pool is obtained.
98       */
99      // private ThreadManager threadManager;
100 
101     /***
102      * Number of active threads
103      */
104     private int numActive;
105 
106     /***
107      * Spool threads are active
108      */
109     private boolean active;
110 
111     /***
112      * Spool threads
113      */
114     private Collection spoolThreads;
115 
116     /***
117      * @see org.apache.avalon.framework.service.Serviceable#service(ServiceManager)
118      */
119     public void service(ServiceManager comp) throws ServiceException {
120         // threadManager = (ThreadManager) comp.lookup(ThreadManager.ROLE);
121         compMgr = new DefaultServiceManager(comp);
122     }
123 
124     /***
125      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
126      */
127     public void configure(Configuration conf) throws ConfigurationException {
128         this.conf = conf;
129         numThreads = conf.getChild("threads").getValueAsInteger(1);
130     }
131 
132     /***
133      * @see org.apache.avalon.framework.activity.Initializable#initialize()
134      */
135     public void initialize() throws Exception {
136 
137         getLogger().info("JamesSpoolManager init...");
138         spool = (SpoolRepository) compMgr.lookup(SpoolRepository.ROLE);
139 
140         MailetLoader mailetLoader
141         = (MailetLoader) compMgr.lookup(MailetLoader.ROLE);
142         MatcherLoader matchLoader
143         = (MatcherLoader) compMgr.lookup(MatcherLoader.ROLE);
144 
145         //A processor is a Collection of
146         processors = new HashMap();
147 
148         final Configuration[] processorConfs = conf.getChildren( "processor" );
149         for ( int i = 0; i < processorConfs.length; i++ )
150         {
151             Configuration processorConf = processorConfs[i];
152             String processorName = processorConf.getAttribute("name");
153             try {
154                 LinearProcessor processor = new LinearProcessor();
155                 setupLogger(processor, processorName);
156                 processor.setSpool(spool);
157                 processor.initialize();
158                 processors.put(processorName, processor);
159 
160                 final Configuration[] mailetConfs
161                     = processorConf.getChildren( "mailet" );
162                 // Loop through the mailet configuration, load
163                 // all of the matcher and mailets, and add
164                 // them to the processor.
165                 for ( int j = 0; j < mailetConfs.length; j++ )
166                 {
167                     Configuration c = mailetConfs[j];
168                     String mailetClassName = c.getAttribute("class");
169                     String matcherName = c.getAttribute("match");
170                     Mailet mailet = null;
171                     Matcher matcher = null;
172                     try {
173                         matcher = matchLoader.getMatcher(matcherName);
174                         //The matcher itself should log that it's been inited.
175                         if (getLogger().isInfoEnabled()) {
176                             StringBuffer infoBuffer =
177                                 new StringBuffer(64)
178                                         .append("Matcher ")
179                                         .append(matcherName)
180                                         .append(" instantiated.");
181                             getLogger().info(infoBuffer.toString());
182                         }
183                     } catch (MessagingException ex) {
184                         // **** Do better job printing out exception
185                         if (getLogger().isErrorEnabled()) {
186                             StringBuffer errorBuffer =
187                                 new StringBuffer(256)
188                                         .append("Unable to init matcher ")
189                                         .append(matcherName)
190                                         .append(": ")
191                                         .append(ex.toString());
192                             getLogger().error( errorBuffer.toString(), ex );
193                 if (ex.getNextException() != null) {
194                 getLogger().error( "Caused by nested exception: ", ex.getNextException());
195                 }
196                         }
197                         System.err.println("Unable to init matcher " + matcherName);
198                         System.err.println("Check spool manager logs for more details.");
199                         //System.exit(1);
200                         throw ex;
201                     }
202                     try {
203                         mailet = mailetLoader.getMailet(mailetClassName, c);
204                         if (getLogger().isInfoEnabled()) {
205                             StringBuffer infoBuffer =
206                                 new StringBuffer(64)
207                                         .append("Mailet ")
208                                         .append(mailetClassName)
209                                         .append(" instantiated.");
210                             getLogger().info(infoBuffer.toString());
211                         }
212                     } catch (MessagingException ex) {
213                         // **** Do better job printing out exception
214                         if (getLogger().isErrorEnabled()) {
215                             StringBuffer errorBuffer =
216                                 new StringBuffer(256)
217                                         .append("Unable to init mailet ")
218                                         .append(mailetClassName)
219                                         .append(": ")
220                                         .append(ex.toString());
221                             getLogger().error( errorBuffer.toString(), ex );
222                 if (ex.getNextException() != null) {
223                 getLogger().error( "Caused by nested exception: ", ex.getNextException());
224                 }
225                         }
226                         System.err.println("Unable to init mailet " + mailetClassName);
227                         System.err.println("Check spool manager logs for more details.");
228                         //System.exit(1);
229                         throw ex;
230                     }
231                     //Add this pair to the processor
232                     processor.add(matcher, mailet);
233                 }
234 
235                 // Close the processor matcher/mailet lists.
236                 //
237                 // Please note that this is critical to the proper operation
238                 // of the LinearProcessor code.  The processor will not be
239                 // able to service mails until this call is made.
240                 processor.closeProcessorLists();
241 
242                 if (getLogger().isInfoEnabled()) {
243                     StringBuffer infoBuffer =
244                         new StringBuffer(64)
245                                 .append("Processor ")
246                                 .append(processorName)
247                                 .append(" instantiated.");
248                     getLogger().info(infoBuffer.toString());
249                 }
250             } catch (Exception ex) {
251                 if (getLogger().isErrorEnabled()) {
252                     StringBuffer errorBuffer =
253                        new StringBuffer(256)
254                                .append("Unable to init processor ")
255                                .append(processorName)
256                                .append(": ")
257                                .append(ex.toString());
258                     getLogger().error( errorBuffer.toString(), ex );
259                 }
260                 throw ex;
261             }
262         }
263         if (getLogger().isInfoEnabled()) {
264             StringBuffer infoBuffer =
265                 new StringBuffer(64)
266                     .append("Spooler Manager uses ")
267                     .append(numThreads)
268                     .append(" Thread(s)");
269             getLogger().info(infoBuffer.toString());
270         }
271 
272         active = true;
273         numActive = 0;
274         spoolThreads = new java.util.ArrayList(numThreads);
275         for ( int i = 0 ; i < numThreads ; i++ ) {
276             Thread reader = new Thread(this, "Spool Thread #" + i);
277             spoolThreads.add(reader);
278             reader.start();
279         }
280     }
281 
282     /***
283      * This routinely checks the message spool for messages, and processes
284      * them as necessary
285      */
286     public void run() {
287 
288         if (getLogger().isInfoEnabled())
289         {
290             getLogger().info("Run JamesSpoolManager: "
291                              + Thread.currentThread().getName());
292             getLogger().info("Spool=" + spool.getClass().getName());
293         }
294 
295         numActive++;
296         while(active) {
297             String key = null;
298             try {
299                 Mail mail = (Mail)spool.accept();
300                 key = mail.getName();
301                 if (getLogger().isDebugEnabled()) {
302                     StringBuffer debugBuffer =
303                         new StringBuffer(64)
304                                 .append("==== Begin processing mail ")
305                                 .append(mail.getName())
306                                 .append("====");
307                     getLogger().debug(debugBuffer.toString());
308                 }
309                 process(mail);
310                 // Only remove an email from the spool is processing is
311                 // complete, or if it has no recipients
312                 if ((Mail.GHOST.equals(mail.getState())) ||
313                     (mail.getRecipients() == null) ||
314                     (mail.getRecipients().size() == 0)) {
315                     ContainerUtil.dispose(mail);
316                     spool.remove(key);
317                     if (getLogger().isDebugEnabled()) {
318                         StringBuffer debugBuffer =
319                             new StringBuffer(64)
320                                     .append("==== Removed from spool mail ")
321                                     .append(key)
322                                     .append("====");
323                         getLogger().debug(debugBuffer.toString());
324                     }
325                 }
326                 else {
327                     // spool.remove() has a side-effect!  It unlocks the
328                     // message so that other threads can work on it!  If
329                     // we don't remove it, we must unlock it!
330                     spool.store(mail);
331                     ContainerUtil.dispose(mail);
332                     spool.unlock(key);
333                     // Do not notify: we simply updated the current mail
334                     // and we are able to reprocess it now.
335                 }
336                 mail = null;
337             } catch (InterruptedException ie) {
338                 getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
339             } catch (Throwable e) {
340                 if (getLogger().isErrorEnabled()) {
341                     getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
342                                       + e.getMessage(), e);
343                 }
344                 /* Move the mail to ERROR state?  If we do, it could be
345                  * deleted if an error occurs in the ERROR processor.
346                  * Perhaps the answer is to resolve that issue by
347                  * having a special state for messages that are not to
348                  * be processed, but aren't to be deleted?  The message
349                  * would already be in the spool, but would not be
350                  * touched again.
351                 if (mail != null) {
352                     try {
353                         mail.setState(Mail.ERROR);
354                         spool.store(mail);
355                     }
356                 }
357                 */
358             }
359         }
360         if (getLogger().isInfoEnabled())
361         {
362             getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
363         }
364         numActive--;
365     }
366 
367     /***
368      * Process this mail message by the appropriate processor as designated
369      * in the state of the Mail object.
370      *
371      * @param mail the mail message to be processed
372      */
373     protected void process(Mail mail) {
374         while (true) {
375             String processorName = mail.getState();
376             if (processorName.equals(Mail.GHOST)) {
377                 //This message should disappear
378                 return;
379             }
380             try {
381                 LinearProcessor processor
382                     = (LinearProcessor)processors.get(processorName);
383                 if (processor == null) {
384                     StringBuffer exceptionMessageBuffer =
385                         new StringBuffer(128)
386                             .append("Unable to find processor ")
387                             .append(processorName)
388                             .append(" requested for processing of ")
389                             .append(mail.getName());
390                     String exceptionMessage = exceptionMessageBuffer.toString();
391                     getLogger().debug(exceptionMessage);
392                     mail.setState(Mail.ERROR);
393                     throw new MailetException(exceptionMessage);
394                 }
395                 StringBuffer logMessageBuffer = null;
396                 if (getLogger().isDebugEnabled()) {
397                     logMessageBuffer =
398                         new StringBuffer(64)
399                                 .append("Processing ")
400                                 .append(mail.getName())
401                                 .append(" through ")
402                                 .append(processorName);
403                     getLogger().debug(logMessageBuffer.toString());
404                 }
405                 processor.service(mail);
406                 if (getLogger().isDebugEnabled()) {
407                     logMessageBuffer =
408                         new StringBuffer(128)
409                                 .append("Processed ")
410                                 .append(mail.getName())
411                                 .append(" through ")
412                                 .append(processorName);
413                     getLogger().debug(logMessageBuffer.toString());
414                     getLogger().debug("Result was " + mail.getState());
415                 }
416                 return;
417             } catch (Throwable e) {
418                 // This is a strange error situation that shouldn't ordinarily
419                 // happen
420                 StringBuffer exceptionBuffer = 
421                     new StringBuffer(64)
422                             .append("Exception in processor <")
423                             .append(processorName)
424                             .append(">");
425                 getLogger().error(exceptionBuffer.toString(), e);
426                 if (processorName.equals(Mail.ERROR)) {
427                     // We got an error on the error processor...
428                     // kill the message
429                     mail.setState(Mail.GHOST);
430                     mail.setErrorMessage(e.getMessage());
431                 } else {
432                     //We got an error... send it to the requested processor
433                     if (!(e instanceof MessagingException)) {
434                         //We got an error... send it to the error processor
435                         mail.setState(Mail.ERROR);
436                     }
437                     mail.setErrorMessage(e.getMessage());
438                 }
439             }
440             if (getLogger().isErrorEnabled()) {
441                 StringBuffer logMessageBuffer =
442                     new StringBuffer(128)
443                             .append("An error occurred processing ")
444                             .append(mail.getName())
445                             .append(" through ")
446                             .append(processorName);
447                 getLogger().error(logMessageBuffer.toString());
448                 getLogger().error("Result was " + mail.getState());
449             }
450         }
451     }
452 
453     /***
454      * The dispose operation is called at the end of a components lifecycle.
455      * Instances of this class use this method to release and destroy any
456      * resources that they own.
457      *
458      * This implementation shuts down the LinearProcessors managed by this
459      * JamesSpoolManager
460      *
461      * @throws Exception if an error is encountered during shutdown
462      */
463     public void dispose() {
464         getLogger().info("JamesSpoolManager dispose...");
465         active = false; // shutdown the threads
466         for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) {
467             ((Thread) it.next()).interrupt(); // interrupt any waiting accept() calls.
468         }
469 
470         long stop = System.currentTimeMillis() + 60000;
471         // give the spooler threads one minute to terminate gracefully
472         while (numActive != 0 && stop > System.currentTimeMillis()) {
473             try {
474                 Thread.sleep(1000);
475             } catch (Exception ignored) {}
476         }
477         getLogger().info("JamesSpoolManager thread shutdown completed.");
478 
479         Iterator it = processors.keySet().iterator();
480         while (it.hasNext()) {
481             String processorName = (String)it.next();
482             if (getLogger().isDebugEnabled()) {
483                 getLogger().debug("Processor " + processorName);
484             }
485             LinearProcessor processor = (LinearProcessor)processors.get(processorName);
486             processor.dispose();
487             processors.remove(processor);
488         }
489     }
490 
491 }