View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  
22  package org.apache.james.transport;
23  
24  import org.apache.avalon.framework.activity.Disposable;
25  import org.apache.avalon.framework.activity.Initializable;
26  import org.apache.avalon.framework.configuration.Configurable;
27  import org.apache.avalon.framework.configuration.Configuration;
28  import org.apache.avalon.framework.configuration.ConfigurationException;
29  import org.apache.avalon.framework.container.ContainerUtil;
30  import org.apache.avalon.framework.logger.AbstractLogEnabled;
31  import org.apache.avalon.framework.service.ServiceException;
32  import org.apache.avalon.framework.service.ServiceManager;
33  import org.apache.avalon.framework.service.Serviceable;
34  import org.apache.james.services.SpoolRepository;
35  import org.apache.james.services.SpoolManager;
36  import org.apache.mailet.Mail;
37  
38  import java.util.Collection;
39  import java.util.Iterator;
40  import java.util.List;
41  import java.util.ArrayList;
42  
43  import javax.annotation.PostConstruct;
44  
45  /**
46   * Manages the mail spool.  This class is responsible for retrieving
47   * messages from the spool, directing messages to the appropriate
48   * processor, and removing them from the spool when processing is
49   * complete.
50   *
51   * @version CVS $Revision: 725962 $ $Date: 2008-12-12 11:02:01 +0000 (Fri, 12 Dec 2008) $
52   */
53  public class JamesSpoolManager
54      extends AbstractLogEnabled
55      implements Serviceable, Configurable, Runnable, Disposable, SpoolManager {
56  
57      /**
58       * System component manager
59       */
60      private ServiceManager compMgr;
61  
62      /**
63       * The spool that this manager will process
64       */
65      private SpoolRepository spool;
66  
67      /**
68       * The number of threads used to move mail through the spool.
69       */
70      private int numThreads;
71  
72      /**
73       * The ThreadPool containing worker threads.
74       *
75       * This used to be used, but for threads that lived the entire
76       * lifespan of the application.  Currently commented out.  In
77       * the future, we could use a thread pool to run short-lived
78       * workers, so that we have a smaller number of readers that
79       * accept a message from the spool, and dispatch to a pool of
80       * worker threads that process the message.
81       */
82      // private ThreadPool workerPool;
83  
84      /**
85       * The ThreadManager from which the thread pool is obtained.
86       */
87      // private ThreadManager threadManager;
88  
89      /**
90       * Number of active threads
91       */
92      private int numActive;
93  
94      /**
95       * Spool threads are active
96       */
97      private boolean active;
98  
99      /**
100      * Spool threads
101      */
102     private Collection spoolThreads;
103 
104     /**
105      * The mail processor 
106      */
107     private MailProcessor processorList;
108 
109     /**
110      * Set the SpoolRepository
111      * 
112      * @param spool the SpoolRepository
113      */
114     public void setSpool(SpoolRepository spool) {
115         this.spool = spool;
116     }
117 
118     /**
119      * @see org.apache.avalon.framework.service.Serviceable#service(ServiceManager)
120      */
121     public void service(ServiceManager comp) throws ServiceException {
122         compMgr = comp;
123         setSpool((SpoolRepository) compMgr.lookup(SpoolRepository.ROLE));
124     }
125 
126     /**
127      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
128      */
129     public void configure(Configuration conf) throws ConfigurationException {
130         numThreads = conf.getChild("threads").getValueAsInteger(1);
131 
132         String processorClass = conf.getChild("processorClass").getValue("org.apache.james.transport.StateAwareProcessorList");
133         try {
134             processorList = (MailProcessor) Thread.currentThread().getContextClassLoader().loadClass(processorClass).newInstance();
135         } catch (Exception e1) {
136             getLogger().error("Unable to instantiate spoolmanager processor: "+processorClass, e1);
137             throw new ConfigurationException("Instantiation exception: "+processorClass, e1);
138         }
139 
140         try {
141             ContainerUtil.enableLogging(processorList, getLogger());
142             ContainerUtil.service(processorList, compMgr);
143         } catch (ServiceException e) {
144             getLogger().error(e.getMessage(), e);
145             throw new ConfigurationException("Servicing failed with error: "+e.getMessage(),e);
146         }
147 
148         ContainerUtil.configure(processorList, conf);
149     }
150 
151     /**
152      * Initialises the spool manager.
153      */
154     @PostConstruct
155     public void initialize() throws Exception {
156 
157         getLogger().info("JamesSpoolManager init...");
158 
159         ContainerUtil.initialize(processorList);
160 
161         if (getLogger().isInfoEnabled()) {
162             StringBuffer infoBuffer =
163                 new StringBuffer(64)
164                     .append("Spooler Manager uses ")
165                     .append(numThreads)
166                     .append(" Thread(s)");
167             getLogger().info(infoBuffer.toString());
168         }
169 
170         active = true;
171         numActive = 0;
172         spoolThreads = new java.util.ArrayList(numThreads);
173         for ( int i = 0 ; i < numThreads ; i++ ) {
174             Thread reader = new Thread(this, "Spool Thread #" + i);
175             spoolThreads.add(reader);
176             reader.start();
177         }
178     }
179 
180     /**
181      * This routinely checks the message spool for messages, and processes
182      * them as necessary
183      */
184     public void run() {
185 
186         if (getLogger().isInfoEnabled())
187         {
188             getLogger().info("Run JamesSpoolManager: "
189                              + Thread.currentThread().getName());
190             getLogger().info("Spool=" + spool.getClass().getName());
191         }
192 
193         numActive++;
194         while(active) {
195             String key = null;
196             try {
197                 Mail mail = (Mail)spool.accept();
198                 key = mail.getName();
199                 if (getLogger().isDebugEnabled()) {
200                     StringBuffer debugBuffer =
201                         new StringBuffer(64)
202                                 .append("==== Begin processing mail ")
203                                 .append(mail.getName())
204                                 .append("====");
205                     getLogger().debug(debugBuffer.toString());
206                 }
207 
208                 processorList.service(mail);
209 
210                 // Only remove an email from the spool is processing is
211                 // complete, or if it has no recipients
212                 if ((Mail.GHOST.equals(mail.getState())) ||
213                     (mail.getRecipients() == null) ||
214                     (mail.getRecipients().size() == 0)) {
215                     ContainerUtil.dispose(mail);
216                     spool.remove(key);
217                     if (getLogger().isDebugEnabled()) {
218                         StringBuffer debugBuffer =
219                             new StringBuffer(64)
220                                     .append("==== Removed from spool mail ")
221                                     .append(key)
222                                     .append("====");
223                         getLogger().debug(debugBuffer.toString());
224                     }
225                 }
226                 else {
227                     // spool.remove() has a side-effect!  It unlocks the
228                     // message so that other threads can work on it!  If
229                     // we don't remove it, we must unlock it!
230                     spool.store(mail);
231                     ContainerUtil.dispose(mail);
232                     spool.unlock(key);
233                     // Do not notify: we simply updated the current mail
234                     // and we are able to reprocess it now.
235                 }
236                 mail = null;
237             } catch (InterruptedException ie) {
238                 getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
239             } catch (Throwable e) {
240                 if (getLogger().isErrorEnabled()) {
241                     getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
242                                       + e.getMessage(), e);
243                 }
244                 /* Move the mail to ERROR state?  If we do, it could be
245                  * deleted if an error occurs in the ERROR processor.
246                  * Perhaps the answer is to resolve that issue by
247                  * having a special state for messages that are not to
248                  * be processed, but aren't to be deleted?  The message
249                  * would already be in the spool, but would not be
250                  * touched again.
251                 if (mail != null) {
252                     try {
253                         mail.setState(Mail.ERROR);
254                         spool.store(mail);
255                     }
256                 }
257                 */
258             }
259         }
260         if (getLogger().isInfoEnabled())
261         {
262             getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
263         }
264         numActive--;
265     }
266 
267     /**
268      * The dispose operation is called at the end of a components lifecycle.
269      * Instances of this class use this method to release and destroy any
270      * resources that they own.
271      *
272      * This implementation shuts down the LinearProcessors managed by this
273      * JamesSpoolManager
274      * 
275      * @see org.apache.avalon.framework.activity.Disposable#dispose()
276      */
277     public void dispose() {
278         getLogger().info("JamesSpoolManager dispose...");
279         active = false; // shutdown the threads
280         for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) {
281             ((Thread) it.next()).interrupt(); // interrupt any waiting accept() calls.
282         }
283 
284         long stop = System.currentTimeMillis() + 60000;
285         // give the spooler threads one minute to terminate gracefully
286         while (numActive != 0 && stop > System.currentTimeMillis()) {
287             try {
288                 Thread.sleep(1000);
289             } catch (Exception ignored) {}
290         }
291         getLogger().info("JamesSpoolManager thread shutdown completed.");
292 
293         ContainerUtil.dispose(processorList);
294     }
295 
296     public String[] getProcessorNames() {
297         if (!(processorList instanceof ProcessorList)) {
298             return new String[0];  
299         }
300         String[] processorNames = ((ProcessorList) processorList).getProcessorNames();
301         return processorNames;
302     }
303 
304     public List getMailetConfigs(String processorName) {
305         MailetContainer mailetContainer = getMailetContainerByName(processorName);
306         if (mailetContainer == null) return new ArrayList();
307         return mailetContainer.getMailetConfigs();
308     }
309 
310     public List getMatcherConfigs(String processorName) {
311         MailetContainer mailetContainer = getMailetContainerByName(processorName);
312         if (mailetContainer == null) return new ArrayList();
313         return mailetContainer.getMatcherConfigs();
314     }
315 
316     private MailetContainer getMailetContainerByName(String processorName) {
317         if (!(processorList instanceof ProcessorList)) return null;
318         
319         MailProcessor processor = ((ProcessorList) processorList).getProcessor(processorName);
320         if (!(processor instanceof MailetContainer)) return null;
321         // TODO: decide, if we have to visit all sub-processors for being ProcessorLists 
322         // on their very own and collecting the processor names deeply.
323         return (MailetContainer)processor;
324     }
325 }