1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.transport;
21
22 import org.apache.avalon.framework.activity.Disposable;
23 import org.apache.avalon.framework.activity.Initializable;
24 import org.apache.avalon.framework.configuration.Configurable;
25 import org.apache.avalon.framework.configuration.Configuration;
26 import org.apache.avalon.framework.configuration.ConfigurationException;
27 import org.apache.avalon.framework.container.ContainerUtil;
28 import org.apache.avalon.framework.logger.AbstractLogEnabled;
29 import org.apache.avalon.framework.service.DefaultServiceManager;
30 import org.apache.avalon.framework.service.ServiceException;
31 import org.apache.avalon.framework.service.ServiceManager;
32 import org.apache.avalon.framework.service.Serviceable;
33 import org.apache.james.services.MailetLoader;
34 import org.apache.james.services.MatcherLoader;
35 import org.apache.james.services.SpoolRepository;
36 import org.apache.mailet.Mail;
37 import org.apache.mailet.Mailet;
38 import org.apache.mailet.MailetException;
39 import org.apache.mailet.Matcher;
40
41 import javax.mail.MessagingException;
42
43 import java.util.Collection;
44 import java.util.HashMap;
45 import java.util.Iterator;
46
47 /***
48 * Manages the mail spool. This class is responsible for retrieving
49 * messages from the spool, directing messages to the appropriate
50 * processor, and removing them from the spool when processing is
51 * complete.
52 *
53 * @version CVS $Revision: 494012 $ $Date: 2007-01-08 10:23:58 +0000 (lun, 08 gen 2007) $
54 */
55 public class JamesSpoolManager
56 extends AbstractLogEnabled
57 implements Serviceable, Configurable, Initializable, Runnable, Disposable {
58
59 /***
60 * System component manager
61 */
62 private DefaultServiceManager compMgr;
63
64 /***
65 * The configuration object used by this spool manager.
66 */
67 private Configuration conf;
68
69 /***
70 * The spool that this manager will process
71 */
72 private SpoolRepository spool;
73
74 /***
75 * The map of processor names to processors
76 */
77 private HashMap processors;
78
79 /***
80 * The number of threads used to move mail through the spool.
81 */
82 private int numThreads;
83
84 /***
85 * The ThreadPool containing worker threads.
86 *
87 * This used to be used, but for threads that lived the entire
88 * lifespan of the application. Currently commented out. In
89 * the future, we could use a thread pool to run short-lived
90 * workers, so that we have a smaller number of readers that
91 * accept a message from the spool, and dispatch to a pool of
92 * worker threads that process the message.
93 */
94
95
96 /***
97 * The ThreadManager from which the thread pool is obtained.
98 */
99
100
101 /***
102 * Number of active threads
103 */
104 private int numActive;
105
106 /***
107 * Spool threads are active
108 */
109 private boolean active;
110
111 /***
112 * Spool threads
113 */
114 private Collection spoolThreads;
115
116 /***
117 * @see org.apache.avalon.framework.service.Serviceable#service(ServiceManager)
118 */
119 public void service(ServiceManager comp) throws ServiceException {
120
121 compMgr = new DefaultServiceManager(comp);
122 }
123
124 /***
125 * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
126 */
127 public void configure(Configuration conf) throws ConfigurationException {
128 this.conf = conf;
129 numThreads = conf.getChild("threads").getValueAsInteger(1);
130 }
131
132 /***
133 * @see org.apache.avalon.framework.activity.Initializable#initialize()
134 */
135 public void initialize() throws Exception {
136
137 getLogger().info("JamesSpoolManager init...");
138 spool = (SpoolRepository) compMgr.lookup(SpoolRepository.ROLE);
139
140 MailetLoader mailetLoader
141 = (MailetLoader) compMgr.lookup(MailetLoader.ROLE);
142 MatcherLoader matchLoader
143 = (MatcherLoader) compMgr.lookup(MatcherLoader.ROLE);
144
145
146 processors = new HashMap();
147
148 final Configuration[] processorConfs = conf.getChildren( "processor" );
149 for ( int i = 0; i < processorConfs.length; i++ )
150 {
151 Configuration processorConf = processorConfs[i];
152 String processorName = processorConf.getAttribute("name");
153 try {
154 LinearProcessor processor = new LinearProcessor();
155 setupLogger(processor, processorName);
156 processor.setSpool(spool);
157 processor.initialize();
158 processors.put(processorName, processor);
159
160 final Configuration[] mailetConfs
161 = processorConf.getChildren( "mailet" );
162
163
164
165 for ( int j = 0; j < mailetConfs.length; j++ )
166 {
167 Configuration c = mailetConfs[j];
168 String mailetClassName = c.getAttribute("class");
169 String matcherName = c.getAttribute("match");
170 Mailet mailet = null;
171 Matcher matcher = null;
172 try {
173 matcher = matchLoader.getMatcher(matcherName);
174
175 if (getLogger().isInfoEnabled()) {
176 StringBuffer infoBuffer =
177 new StringBuffer(64)
178 .append("Matcher ")
179 .append(matcherName)
180 .append(" instantiated.");
181 getLogger().info(infoBuffer.toString());
182 }
183 } catch (MessagingException ex) {
184
185 if (getLogger().isErrorEnabled()) {
186 StringBuffer errorBuffer =
187 new StringBuffer(256)
188 .append("Unable to init matcher ")
189 .append(matcherName)
190 .append(": ")
191 .append(ex.toString());
192 getLogger().error( errorBuffer.toString(), ex );
193 if (ex.getNextException() != null) {
194 getLogger().error( "Caused by nested exception: ", ex.getNextException());
195 }
196 }
197 System.err.println("Unable to init matcher " + matcherName);
198 System.err.println("Check spool manager logs for more details.");
199
200 throw ex;
201 }
202 try {
203 mailet = mailetLoader.getMailet(mailetClassName, c);
204 if (getLogger().isInfoEnabled()) {
205 StringBuffer infoBuffer =
206 new StringBuffer(64)
207 .append("Mailet ")
208 .append(mailetClassName)
209 .append(" instantiated.");
210 getLogger().info(infoBuffer.toString());
211 }
212 } catch (MessagingException ex) {
213
214 if (getLogger().isErrorEnabled()) {
215 StringBuffer errorBuffer =
216 new StringBuffer(256)
217 .append("Unable to init mailet ")
218 .append(mailetClassName)
219 .append(": ")
220 .append(ex.toString());
221 getLogger().error( errorBuffer.toString(), ex );
222 if (ex.getNextException() != null) {
223 getLogger().error( "Caused by nested exception: ", ex.getNextException());
224 }
225 }
226 System.err.println("Unable to init mailet " + mailetClassName);
227 System.err.println("Check spool manager logs for more details.");
228
229 throw ex;
230 }
231
232 processor.add(matcher, mailet);
233 }
234
235
236
237
238
239
240 processor.closeProcessorLists();
241
242 if (getLogger().isInfoEnabled()) {
243 StringBuffer infoBuffer =
244 new StringBuffer(64)
245 .append("Processor ")
246 .append(processorName)
247 .append(" instantiated.");
248 getLogger().info(infoBuffer.toString());
249 }
250 } catch (Exception ex) {
251 if (getLogger().isErrorEnabled()) {
252 StringBuffer errorBuffer =
253 new StringBuffer(256)
254 .append("Unable to init processor ")
255 .append(processorName)
256 .append(": ")
257 .append(ex.toString());
258 getLogger().error( errorBuffer.toString(), ex );
259 }
260 throw ex;
261 }
262 }
263 if (getLogger().isInfoEnabled()) {
264 StringBuffer infoBuffer =
265 new StringBuffer(64)
266 .append("Spooler Manager uses ")
267 .append(numThreads)
268 .append(" Thread(s)");
269 getLogger().info(infoBuffer.toString());
270 }
271
272 active = true;
273 numActive = 0;
274 spoolThreads = new java.util.ArrayList(numThreads);
275 for ( int i = 0 ; i < numThreads ; i++ ) {
276 Thread reader = new Thread(this, "Spool Thread #" + i);
277 spoolThreads.add(reader);
278 reader.start();
279 }
280 }
281
282 /***
283 * This routinely checks the message spool for messages, and processes
284 * them as necessary
285 */
286 public void run() {
287
288 if (getLogger().isInfoEnabled())
289 {
290 getLogger().info("Run JamesSpoolManager: "
291 + Thread.currentThread().getName());
292 getLogger().info("Spool=" + spool.getClass().getName());
293 }
294
295 numActive++;
296 while(active) {
297 String key = null;
298 try {
299 Mail mail = (Mail)spool.accept();
300 key = mail.getName();
301 if (getLogger().isDebugEnabled()) {
302 StringBuffer debugBuffer =
303 new StringBuffer(64)
304 .append("==== Begin processing mail ")
305 .append(mail.getName())
306 .append("====");
307 getLogger().debug(debugBuffer.toString());
308 }
309 process(mail);
310
311
312 if ((Mail.GHOST.equals(mail.getState())) ||
313 (mail.getRecipients() == null) ||
314 (mail.getRecipients().size() == 0)) {
315 ContainerUtil.dispose(mail);
316 spool.remove(key);
317 if (getLogger().isDebugEnabled()) {
318 StringBuffer debugBuffer =
319 new StringBuffer(64)
320 .append("==== Removed from spool mail ")
321 .append(key)
322 .append("====");
323 getLogger().debug(debugBuffer.toString());
324 }
325 }
326 else {
327
328
329
330 spool.store(mail);
331 ContainerUtil.dispose(mail);
332 spool.unlock(key);
333
334
335 }
336 mail = null;
337 } catch (InterruptedException ie) {
338 getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
339 } catch (Throwable e) {
340 if (getLogger().isErrorEnabled()) {
341 getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
342 + e.getMessage(), e);
343 }
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358 }
359 }
360 if (getLogger().isInfoEnabled())
361 {
362 getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
363 }
364 numActive--;
365 }
366
367 /***
368 * Process this mail message by the appropriate processor as designated
369 * in the state of the Mail object.
370 *
371 * @param mail the mail message to be processed
372 */
373 protected void process(Mail mail) {
374 while (true) {
375 String processorName = mail.getState();
376 if (processorName.equals(Mail.GHOST)) {
377
378 return;
379 }
380 try {
381 LinearProcessor processor
382 = (LinearProcessor)processors.get(processorName);
383 if (processor == null) {
384 StringBuffer exceptionMessageBuffer =
385 new StringBuffer(128)
386 .append("Unable to find processor ")
387 .append(processorName)
388 .append(" requested for processing of ")
389 .append(mail.getName());
390 String exceptionMessage = exceptionMessageBuffer.toString();
391 getLogger().debug(exceptionMessage);
392 mail.setState(Mail.ERROR);
393 throw new MailetException(exceptionMessage);
394 }
395 StringBuffer logMessageBuffer = null;
396 if (getLogger().isDebugEnabled()) {
397 logMessageBuffer =
398 new StringBuffer(64)
399 .append("Processing ")
400 .append(mail.getName())
401 .append(" through ")
402 .append(processorName);
403 getLogger().debug(logMessageBuffer.toString());
404 }
405 processor.service(mail);
406 if (getLogger().isDebugEnabled()) {
407 logMessageBuffer =
408 new StringBuffer(128)
409 .append("Processed ")
410 .append(mail.getName())
411 .append(" through ")
412 .append(processorName);
413 getLogger().debug(logMessageBuffer.toString());
414 getLogger().debug("Result was " + mail.getState());
415 }
416 return;
417 } catch (Throwable e) {
418
419
420 StringBuffer exceptionBuffer =
421 new StringBuffer(64)
422 .append("Exception in processor <")
423 .append(processorName)
424 .append(">");
425 getLogger().error(exceptionBuffer.toString(), e);
426 if (processorName.equals(Mail.ERROR)) {
427
428
429 mail.setState(Mail.GHOST);
430 mail.setErrorMessage(e.getMessage());
431 } else {
432
433 if (!(e instanceof MessagingException)) {
434
435 mail.setState(Mail.ERROR);
436 }
437 mail.setErrorMessage(e.getMessage());
438 }
439 }
440 if (getLogger().isErrorEnabled()) {
441 StringBuffer logMessageBuffer =
442 new StringBuffer(128)
443 .append("An error occurred processing ")
444 .append(mail.getName())
445 .append(" through ")
446 .append(processorName);
447 getLogger().error(logMessageBuffer.toString());
448 getLogger().error("Result was " + mail.getState());
449 }
450 }
451 }
452
453 /***
454 * The dispose operation is called at the end of a components lifecycle.
455 * Instances of this class use this method to release and destroy any
456 * resources that they own.
457 *
458 * This implementation shuts down the LinearProcessors managed by this
459 * JamesSpoolManager
460 *
461 * @throws Exception if an error is encountered during shutdown
462 */
463 public void dispose() {
464 getLogger().info("JamesSpoolManager dispose...");
465 active = false;
466 for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) {
467 ((Thread) it.next()).interrupt();
468 }
469
470 long stop = System.currentTimeMillis() + 60000;
471
472 while (numActive != 0 && stop > System.currentTimeMillis()) {
473 try {
474 Thread.sleep(1000);
475 } catch (Exception ignored) {}
476 }
477 getLogger().info("JamesSpoolManager thread shutdown completed.");
478
479 Iterator it = processors.keySet().iterator();
480 while (it.hasNext()) {
481 String processorName = (String)it.next();
482 if (getLogger().isDebugEnabled()) {
483 getLogger().debug("Processor " + processorName);
484 }
485 LinearProcessor processor = (LinearProcessor)processors.get(processorName);
486 processor.dispose();
487 processors.remove(processor);
488 }
489 }
490
491 }