1 /************************************************************************
2 * Copyright (c) 1999-2006 The Apache Software Foundation. *
3 * All rights reserved. *
4 * ------------------------------------------------------------------- *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you *
6 * may not use this file except in compliance with the License. You *
7 * may obtain a copy of the License at: *
8 * *
9 * http://www.apache.org/licenses/LICENSE-2.0 *
10 * *
11 * Unless required by applicable law or agreed to in writing, software *
12 * distributed under the License is distributed on an "AS IS" BASIS, *
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14 * implied. See the License for the specific language governing *
15 * permissions and limitations under the License. *
16 ***********************************************************************/
17
18 package org.apache.james.transport;
19
20 import org.apache.avalon.framework.activity.Disposable;
21 import org.apache.avalon.framework.activity.Initializable;
22 import org.apache.avalon.framework.configuration.Configurable;
23 import org.apache.avalon.framework.configuration.Configuration;
24 import org.apache.avalon.framework.configuration.ConfigurationException;
25 import org.apache.avalon.framework.container.ContainerUtil;
26 import org.apache.avalon.framework.logger.AbstractLogEnabled;
27 import org.apache.avalon.framework.service.DefaultServiceManager;
28 import org.apache.avalon.framework.service.ServiceException;
29 import org.apache.avalon.framework.service.ServiceManager;
30 import org.apache.avalon.framework.service.Serviceable;
31 import org.apache.james.services.MailetLoader;
32 import org.apache.james.services.MatcherLoader;
33 import org.apache.james.services.SpoolRepository;
34 import org.apache.mailet.Mail;
35 import org.apache.mailet.Mailet;
36 import org.apache.mailet.MailetException;
37 import org.apache.mailet.Matcher;
38
39 import javax.mail.MessagingException;
40
41 import java.util.Collection;
42 import java.util.HashMap;
43 import java.util.Iterator;
44
45 /***
46 * Manages the mail spool. This class is responsible for retrieving
47 * messages from the spool, directing messages to the appropriate
48 * processor, and removing them from the spool when processing is
49 * complete.
50 *
51 * @version CVS $Revision: 428557 $ $Date: 2006-08-03 22:56:47 +0000 (gio, 03 ago 2006) $
52 */
53 public class JamesSpoolManager
54 extends AbstractLogEnabled
55 implements Serviceable, Configurable, Initializable, Runnable, Disposable {
56
57 /***
58 * System component manager
59 */
60 private DefaultServiceManager compMgr;
61
62 /***
63 * The configuration object used by this spool manager.
64 */
65 private Configuration conf;
66
67 /***
68 * The spool that this manager will process
69 */
70 private SpoolRepository spool;
71
72 /***
73 * The map of processor names to processors
74 */
75 private HashMap processors;
76
77 /***
78 * The number of threads used to move mail through the spool.
79 */
80 private int numThreads;
81
82 /***
83 * The ThreadPool containing worker threads.
84 *
85 * This used to be used, but for threads that lived the entire
86 * lifespan of the application. Currently commented out. In
87 * the future, we could use a thread pool to run short-lived
88 * workers, so that we have a smaller number of readers that
89 * accept a message from the spool, and dispatch to a pool of
90 * worker threads that process the message.
91 */
92
93
94 /***
95 * The ThreadManager from which the thread pool is obtained.
96 */
97
98
99 /***
100 * Number of active threads
101 */
102 private int numActive;
103
104 /***
105 * Spool threads are active
106 */
107 private boolean active;
108
109 /***
110 * Spool threads
111 */
112 private Collection spoolThreads;
113
114 /***
115 * @see org.apache.avalon.framework.service.Serviceable#service(ServiceManager)
116 */
117 public void service(ServiceManager comp) throws ServiceException {
118
119 compMgr = new DefaultServiceManager(comp);
120 }
121
122 /***
123 * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
124 */
125 public void configure(Configuration conf) throws ConfigurationException {
126 this.conf = conf;
127 numThreads = conf.getChild("threads").getValueAsInteger(1);
128 }
129
130 /***
131 * @see org.apache.avalon.framework.activity.Initializable#initialize()
132 */
133 public void initialize() throws Exception {
134
135 getLogger().info("JamesSpoolManager init...");
136 spool = (SpoolRepository) compMgr.lookup(SpoolRepository.ROLE);
137
138 MailetLoader mailetLoader
139 = (MailetLoader) compMgr.lookup(MailetLoader.ROLE);
140 MatcherLoader matchLoader
141 = (MatcherLoader) compMgr.lookup(MatcherLoader.ROLE);
142
143
144 processors = new HashMap();
145
146 final Configuration[] processorConfs = conf.getChildren( "processor" );
147 for ( int i = 0; i < processorConfs.length; i++ )
148 {
149 Configuration processorConf = processorConfs[i];
150 String processorName = processorConf.getAttribute("name");
151 try {
152 LinearProcessor processor = new LinearProcessor();
153 setupLogger(processor, processorName);
154 processor.setSpool(spool);
155 processor.initialize();
156 processors.put(processorName, processor);
157
158 final Configuration[] mailetConfs
159 = processorConf.getChildren( "mailet" );
160
161
162
163 for ( int j = 0; j < mailetConfs.length; j++ )
164 {
165 Configuration c = mailetConfs[j];
166 String mailetClassName = c.getAttribute("class");
167 String matcherName = c.getAttribute("match");
168 Mailet mailet = null;
169 Matcher matcher = null;
170 try {
171 matcher = matchLoader.getMatcher(matcherName);
172
173 if (getLogger().isInfoEnabled()) {
174 StringBuffer infoBuffer =
175 new StringBuffer(64)
176 .append("Matcher ")
177 .append(matcherName)
178 .append(" instantiated.");
179 getLogger().info(infoBuffer.toString());
180 }
181 } catch (MessagingException ex) {
182
183 if (getLogger().isErrorEnabled()) {
184 StringBuffer errorBuffer =
185 new StringBuffer(256)
186 .append("Unable to init matcher ")
187 .append(matcherName)
188 .append(": ")
189 .append(ex.toString());
190 getLogger().error( errorBuffer.toString(), ex );
191 if (ex.getNextException() != null) {
192 getLogger().error( "Caused by nested exception: ", ex.getNextException());
193 }
194 }
195 System.err.println("Unable to init matcher " + matcherName);
196 System.err.println("Check spool manager logs for more details.");
197
198 throw ex;
199 }
200 try {
201 mailet = mailetLoader.getMailet(mailetClassName, c);
202 if (getLogger().isInfoEnabled()) {
203 StringBuffer infoBuffer =
204 new StringBuffer(64)
205 .append("Mailet ")
206 .append(mailetClassName)
207 .append(" instantiated.");
208 getLogger().info(infoBuffer.toString());
209 }
210 } catch (MessagingException ex) {
211
212 if (getLogger().isErrorEnabled()) {
213 StringBuffer errorBuffer =
214 new StringBuffer(256)
215 .append("Unable to init mailet ")
216 .append(mailetClassName)
217 .append(": ")
218 .append(ex.toString());
219 getLogger().error( errorBuffer.toString(), ex );
220 if (ex.getNextException() != null) {
221 getLogger().error( "Caused by nested exception: ", ex.getNextException());
222 }
223 }
224 System.err.println("Unable to init mailet " + mailetClassName);
225 System.err.println("Check spool manager logs for more details.");
226
227 throw ex;
228 }
229
230 processor.add(matcher, mailet);
231 }
232
233
234
235
236
237
238 processor.closeProcessorLists();
239
240 if (getLogger().isInfoEnabled()) {
241 StringBuffer infoBuffer =
242 new StringBuffer(64)
243 .append("Processor ")
244 .append(processorName)
245 .append(" instantiated.");
246 getLogger().info(infoBuffer.toString());
247 }
248 } catch (Exception ex) {
249 if (getLogger().isErrorEnabled()) {
250 StringBuffer errorBuffer =
251 new StringBuffer(256)
252 .append("Unable to init processor ")
253 .append(processorName)
254 .append(": ")
255 .append(ex.toString());
256 getLogger().error( errorBuffer.toString(), ex );
257 }
258 throw ex;
259 }
260 }
261 if (getLogger().isInfoEnabled()) {
262 StringBuffer infoBuffer =
263 new StringBuffer(64)
264 .append("Spooler Manager uses ")
265 .append(numThreads)
266 .append(" Thread(s)");
267 getLogger().info(infoBuffer.toString());
268 }
269
270 active = true;
271 numActive = 0;
272 spoolThreads = new java.util.ArrayList(numThreads);
273 for ( int i = 0 ; i < numThreads ; i++ ) {
274 Thread reader = new Thread(this, "Spool Thread #" + i);
275 spoolThreads.add(reader);
276 reader.start();
277 }
278 }
279
280 /***
281 * This routinely checks the message spool for messages, and processes
282 * them as necessary
283 */
284 public void run() {
285
286 if (getLogger().isInfoEnabled())
287 {
288 getLogger().info("Run JamesSpoolManager: "
289 + Thread.currentThread().getName());
290 getLogger().info("Spool=" + spool.getClass().getName());
291 }
292
293 numActive++;
294 while(active) {
295 String key = null;
296 try {
297 Mail mail = (Mail)spool.accept();
298 key = mail.getName();
299 if (getLogger().isDebugEnabled()) {
300 StringBuffer debugBuffer =
301 new StringBuffer(64)
302 .append("==== Begin processing mail ")
303 .append(mail.getName())
304 .append("====");
305 getLogger().debug(debugBuffer.toString());
306 }
307 process(mail);
308
309
310 if ((Mail.GHOST.equals(mail.getState())) ||
311 (mail.getRecipients() == null) ||
312 (mail.getRecipients().size() == 0)) {
313 ContainerUtil.dispose(mail);
314 spool.remove(key);
315 if (getLogger().isDebugEnabled()) {
316 StringBuffer debugBuffer =
317 new StringBuffer(64)
318 .append("==== Removed from spool mail ")
319 .append(key)
320 .append("====");
321 getLogger().debug(debugBuffer.toString());
322 }
323 }
324 else {
325
326
327
328 spool.store(mail);
329 ContainerUtil.dispose(mail);
330 spool.unlock(key);
331
332
333 }
334 mail = null;
335 } catch (InterruptedException ie) {
336 getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
337 } catch (Throwable e) {
338 if (getLogger().isErrorEnabled()) {
339 getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
340 + e.getMessage(), e);
341 }
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356 }
357 }
358 if (getLogger().isInfoEnabled())
359 {
360 getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
361 }
362 numActive--;
363 }
364
365 /***
366 * Process this mail message by the appropriate processor as designated
367 * in the state of the Mail object.
368 *
369 * @param mail the mail message to be processed
370 */
371 protected void process(Mail mail) {
372 while (true) {
373 String processorName = mail.getState();
374 if (processorName.equals(Mail.GHOST)) {
375
376 return;
377 }
378 try {
379 LinearProcessor processor
380 = (LinearProcessor)processors.get(processorName);
381 if (processor == null) {
382 StringBuffer exceptionMessageBuffer =
383 new StringBuffer(128)
384 .append("Unable to find processor ")
385 .append(processorName)
386 .append(" requested for processing of ")
387 .append(mail.getName());
388 String exceptionMessage = exceptionMessageBuffer.toString();
389 getLogger().debug(exceptionMessage);
390 mail.setState(Mail.ERROR);
391 throw new MailetException(exceptionMessage);
392 }
393 StringBuffer logMessageBuffer = null;
394 if (getLogger().isDebugEnabled()) {
395 logMessageBuffer =
396 new StringBuffer(64)
397 .append("Processing ")
398 .append(mail.getName())
399 .append(" through ")
400 .append(processorName);
401 getLogger().debug(logMessageBuffer.toString());
402 }
403 processor.service(mail);
404 if (getLogger().isDebugEnabled()) {
405 logMessageBuffer =
406 new StringBuffer(128)
407 .append("Processed ")
408 .append(mail.getName())
409 .append(" through ")
410 .append(processorName);
411 getLogger().debug(logMessageBuffer.toString());
412 getLogger().debug("Result was " + mail.getState());
413 }
414 return;
415 } catch (Throwable e) {
416
417
418 StringBuffer exceptionBuffer =
419 new StringBuffer(64)
420 .append("Exception in processor <")
421 .append(processorName)
422 .append(">");
423 getLogger().error(exceptionBuffer.toString(), e);
424 if (processorName.equals(Mail.ERROR)) {
425
426
427 mail.setState(Mail.GHOST);
428 mail.setErrorMessage(e.getMessage());
429 } else {
430
431 if (!(e instanceof MessagingException)) {
432
433 mail.setState(Mail.ERROR);
434 }
435 mail.setErrorMessage(e.getMessage());
436 }
437 }
438 if (getLogger().isErrorEnabled()) {
439 StringBuffer logMessageBuffer =
440 new StringBuffer(128)
441 .append("An error occurred processing ")
442 .append(mail.getName())
443 .append(" through ")
444 .append(processorName);
445 getLogger().error(logMessageBuffer.toString());
446 getLogger().error("Result was " + mail.getState());
447 }
448 }
449 }
450
451 /***
452 * The dispose operation is called at the end of a components lifecycle.
453 * Instances of this class use this method to release and destroy any
454 * resources that they own.
455 *
456 * This implementation shuts down the LinearProcessors managed by this
457 * JamesSpoolManager
458 *
459 * @throws Exception if an error is encountered during shutdown
460 */
461 public void dispose() {
462 getLogger().info("JamesSpoolManager dispose...");
463 active = false;
464 for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) {
465 ((Thread) it.next()).interrupt();
466 }
467
468 long stop = System.currentTimeMillis() + 60000;
469
470 while (numActive != 0 && stop > System.currentTimeMillis()) {
471 try {
472 Thread.sleep(1000);
473 } catch (Exception ignored) {}
474 }
475 getLogger().info("JamesSpoolManager thread shutdown completed.");
476
477 Iterator it = processors.keySet().iterator();
478 while (it.hasNext()) {
479 String processorName = (String)it.next();
480 if (getLogger().isDebugEnabled()) {
481 getLogger().debug("Processor " + processorName);
482 }
483 LinearProcessor processor = (LinearProcessor)processors.get(processorName);
484 processor.dispose();
485 processors.remove(processor);
486 }
487 }
488
489 }