View Javadoc

1   /************************************************************************
2    * Copyright (c) 2000-2006 The Apache Software Foundation.             *
3    * All rights reserved.                                                *
4    * ------------------------------------------------------------------- *
5    * Licensed under the Apache License, Version 2.0 (the "License"); you *
6    * may not use this file except in compliance with the License. You    *
7    * may obtain a copy of the License at:                                *
8    *                                                                     *
9    *     http://www.apache.org/licenses/LICENSE-2.0                      *
10   *                                                                     *
11   * Unless required by applicable law or agreed to in writing, software *
12   * distributed under the License is distributed on an "AS IS" BASIS,   *
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or     *
14   * implied.  See the License for the specific language governing       *
15   * permissions and limitations under the License.                      *
16   ***********************************************************************/
17  
18  package org.apache.james.transport;
19  
20  import org.apache.avalon.framework.activity.Initializable;
21  import org.apache.avalon.framework.activity.Disposable;
22  import org.apache.avalon.framework.container.ContainerUtil;
23  import org.apache.avalon.framework.logger.AbstractLogEnabled;
24  import org.apache.james.core.MailImpl;
25  import org.apache.james.core.MailetConfigImpl;
26  import org.apache.james.services.SpoolRepository;
27  import org.apache.mailet.GenericMailet;
28  import org.apache.mailet.GenericMatcher;
29  import org.apache.mailet.Mail;
30  import org.apache.mailet.MailAddress;
31  import org.apache.mailet.Mailet;
32  import org.apache.mailet.MailetConfig;
33  import org.apache.mailet.MailetException;
34  import org.apache.mailet.Matcher;
35  
36  import javax.mail.MessagingException;
37  import java.io.PrintWriter;
38  import java.io.StringWriter;
39  import java.util.ArrayList;
40  import java.util.Collection;
41  import java.util.LinkedList;
42  import java.util.List;
43  import java.util.Random;
44  import java.util.Iterator;
45  import java.util.Locale;
46  
47  /***
48   * Implements a processor for mails, directing the mail down
49   * the chain of matchers/mailets.
50   *
51   *  SAMPLE CONFIGURATION
52   *  <processor name="try" onerror="return,log">
53   *      <mailet match="RecipientIsLocal" class="LocalDelivery">
54   *      </mailet>
55   *      <mailet match="All" class="RemoteDelivery">
56   *          <delayTime>21600000</delayTime>
57   *          <maxRetries>5</maxRetries>
58   *      </mailet>
59   *  </processor>
60   *
61   * Note that the 'onerror' attribute is not yet supported.
62   *
63   * As of James v2.2.0a5, 'onerror' functionality is implemented, but
64   * it is implemented on the <mailet> tag.  The specification is:
65   *
66   *   <mailet match="..." class="..."
67   *       [onMatchException="{noMatch|matchAll|error|<aProcessorName>}"] 
68   *       [onMailetException="{ignore|error|<aProcessorName>}"]>
69   *
70   * noMatch:   no addresses are considered to match
71   * matchAll:  all addresses are considered to match
72   * error:     as before, send the message to the ERROR processor
73   *
74   * Otherwise, a processor name can be specified, and the message will
75   * be sent there.
76   *
77   * <P>CVS $Id: LinearProcessor.java 428745 2006-08-04 15:06:02 +0000 (ven, 04 ago 2006) bago $</P>
78   * @version 2.2.0
79   */
80  public class LinearProcessor 
81      extends AbstractLogEnabled
82      implements Initializable, Disposable {
83  
84      private static final Random random = new Random();  // Used to generate new mail names
85  
86      /***
87       *  The name of the matcher used to terminate the matcher chain.  The
88       *  end of the matcher/mailet chain must be a matcher that matches
89       *  all mails and a mailet that sets every mail to GHOST status.
90       *  This is necessary to ensure that mails are removed from the spool
91       *  in an orderly fashion.
92       */
93      private static final String TERMINATING_MATCHER_NAME = "Terminating%Matcher%Name";
94  
95      /***
96       *  The name of the mailet used to terminate the mailet chain.  The
97       *  end of the matcher/mailet chain must be a matcher that matches
98       *  all mails and a mailet that sets every mail to GHOST status.
99       *  This is necessary to ensure that mails are removed from the spool
100      *  in an orderly fashion.
101      */
102     private static final String TERMINATING_MAILET_NAME = "Terminating%Mailet%Name";
103 
104     private List mailets;  // The list of mailets for this processor
105     private List matchers; // The list of matchers for this processor
106     private volatile boolean listsClosed;  // Whether the matcher/mailet lists have been closed.
107     private SpoolRepository spool;  // The spool on which this processor is acting
108 
109     /***
110      * Set the spool to be used by this LinearProcessor.
111      *
112      * @param spool the spool to be used by this processor
113      *
114      * @throws IllegalArgumentException when the spool passed in is null
115      */
116     public void setSpool(SpoolRepository spool) {
117         if (spool == null) {
118             throw new IllegalArgumentException("The spool cannot be null");
119         }
120         this.spool = spool;
121     }
122 
123     /***
124      * @see org.apache.avalon.framework.activity.Initializable#initialize()
125      */
126     public void initialize() {
127         matchers = new ArrayList();
128         mailets = new ArrayList();
129     }
130 
131     /***
132      * <p>The dispose operation is called at the end of a components lifecycle.
133      * Instances of this class use this method to release and destroy any
134      * resources that they own.</p>
135      *
136      * <p>This implementation disposes of all the mailet instances added to the
137      * processor</p>
138      *
139      * @throws Exception if an error is encountered during shutdown
140      */
141     public void dispose() {
142         Iterator it = mailets.iterator();
143         boolean debugEnabled = getLogger().isDebugEnabled();
144         while (it.hasNext()) {
145             Mailet mailet = (Mailet)it.next();
146             if (debugEnabled) {
147                 getLogger().debug("Shutdown mailet " + mailet.getMailetInfo());
148             }
149             mailet.destroy();
150         }
151     }
152 
153     /***
154      * <p>Adds a new <code>Matcher</code> / <code>Mailet</code> pair
155      * to the processor.  Checks to ensure that the matcher and
156      * mailet passed in are not null.  Synchronized to ensure that
157      * the matchers and mailets are kept in sync.</p>
158      *
159      * <p>It is an essential part of the contract of the LinearProcessor
160      * that a particular matcher/mailet combination be used to
161      * terminate the processor chain.  This is done by calling the  
162      * closeProcessorList method.</p>
163      *
164      * <p>Once the closeProcessorList has been called any subsequent
165      * call to the add method will result in an IllegalStateException.</p>
166      *
167      * <p>This method is synchronized to protect against corruption of
168      * matcher/mailets lists</p>
169      *
170      * @param matcher the new matcher being added
171      * @param mailet the new mailet being added
172      *
173      * @throws IllegalArgumentException when the matcher or mailet passed in is null
174      * @throws IllegalStateException when this method is called after the processor lists have been closed
175      */
176     public synchronized void add(Matcher matcher, Mailet mailet) {
177         if (matcher == null) {
178             throw new IllegalArgumentException("Null valued matcher passed to LinearProcessor.");
179         }
180         if (mailet == null) {
181             throw new IllegalArgumentException("Null valued mailet passed to LinearProcessor.");
182         }
183         if (listsClosed) {
184             throw new IllegalStateException("Attempt to add matcher/mailet after lists have been closed");
185         }
186         matchers.add(matcher);
187         mailets.add(mailet);
188     }
189 
190     /***
191      * <p>Closes the processor matcher/mailet list.</p>
192      *
193      * <p>This method is synchronized to protect against corruption of
194      * matcher/mailets lists</p>
195      *
196      * @throws IllegalStateException when this method is called after the processor lists have been closed
197      */
198     public synchronized void closeProcessorLists() {
199         if (listsClosed) {
200             throw new IllegalStateException("Processor's matcher/mailet lists have already been closed.");
201         }
202         Matcher terminatingMatcher =
203             new GenericMatcher() {
204                 public Collection match(Mail mail) {
205                     return mail.getRecipients();
206                 }
207             
208                 public String getMatcherInfo() {
209                     return TERMINATING_MATCHER_NAME;
210                 }
211             };
212         Mailet terminatingMailet = 
213             new GenericMailet() {
214                 public void service(Mail mail) {
215                     if (!(Mail.ERROR.equals(mail.getState()))) {
216                         // Don't complain if we fall off the end of the
217                         // error processor.  That is currently the
218                         // normal situation for James, and the message
219                         // will show up in the error store.
220                         StringBuffer warnBuffer = new StringBuffer(256)
221                                               .append("Message ")
222                                               .append(mail.getName())
223                                               .append(" reached the end of this processor, and is automatically deleted.  This may indicate a configuration error.");
224                         LinearProcessor.this.getLogger().warn(warnBuffer.toString());
225                     }
226                     mail.setState(Mail.GHOST);
227                 }
228             
229                 public String getMailetInfo() {
230                     return getMailetName();
231                 }
232             
233                 public String getMailetName() {
234                     return TERMINATING_MAILET_NAME;
235                 }
236             };
237         add(terminatingMatcher, terminatingMailet);
238         listsClosed = true;
239     }
240 
241     /***
242      * <p>Processes a single mail message through the chain of matchers and mailets.</p>
243      *
244      * <p>Calls to this method before setSpool has been called with a non-null argument
245      * will result in an <code>IllegalStateException</code>.</p>
246      *
247      * <p>If the matcher/mailet lists have not been closed by a call to the closeProcessorLists
248      * method then a call to this method will result in an <code>IllegalStateException</code>.
249      * The end of the matcher/mailet chain must be a matcher that matches all mails and 
250      * a mailet that sets every mail to GHOST status.  This is necessary to ensure that 
251      * mails are removed from the spool in an orderly fashion.  The closeProcessorLists method
252      * ensures this.</p>
253      * 
254      * @param mail the new mail to be processed
255      *
256      * @throws IllegalStateException when this method is called before the processor lists have been closed
257      *                                  or the spool has been initialized
258      */
259     public void service(Mail mail) throws MessagingException {
260         if (spool == null) {
261             throw new IllegalStateException("Attempt to service mail before the spool has been set to a non-null value");
262         }
263 
264         if (!listsClosed) {
265             throw new IllegalStateException("Attempt to service mail before matcher/mailet lists have been closed");
266         }
267 
268         if (getLogger().isDebugEnabled()) {
269             getLogger().debug("Servicing mail: " + mail.getName());
270         }
271         //  unprocessed is an array of Lists of Mail objects
272         //  the array indicates which matcher/mailet (stage in the linear
273         //  processor) that this Mail needs to be processed.
274         //  e.g., a Mail in unprocessed[0] needs to be
275         //  processed by the first matcher/mailet.
276         //
277         //  It is a List of Mail objects at each array spot as multiple Mail
278         //  objects could be at the same stage.
279         //
280         //  Note that every Mail object in this array will either be the 
281         //  original Mail object passed in, or a result of this method's
282         //  (and hence this thread's) processing.
283 
284         List[] unprocessed = new List[matchers.size() + 1];
285 
286         for (int i = 0; i < unprocessed.length; i++) {
287             // No need to use synchronization, as this is totally
288             // local to the method
289             unprocessed[i] = new LinkedList();
290         }
291 
292         //Add the object to the bottom of the list
293         unprocessed[0].add(mail);
294 
295         //This is the original state of the message
296         String originalState = mail.getState();
297         
298         
299         // The original mail: we should not care to save this mail.
300         // This should be saved in the spoolmanager.
301         Mail originalMail = mail;
302 
303         //We'll use these as temporary variables in the loop
304         mail = null;  // the message we're currently processing
305         int i = 0;    // where in the stage we're looking
306         while (true) {
307             //  The last element in the unprocessed array has mail messages
308             //  that have completed all stages.  We want them to just die,
309             //  so we clear that spot to allow garbage collection of the
310             //  objects.
311             //
312             //  Please note that the presence of the terminating mailet at the end
313             //  of the chain is critical to the proper operation
314             //  of the LinearProcessor code.  If this mailet is not placed
315             //  at the end of the chain with a terminating matcher, there is a 
316             //  potential for configuration or implementation errors to 
317             //  lead to mails trapped in the spool.  This matcher/mailet
318             //  combination is added when the closeProcessorList method
319             //  is called.
320             unprocessed[unprocessed.length - 1].clear();
321 
322             //initialize the mail reference we will be searching on
323             mail = null;
324 
325             //Scan through all stages, trying to find a message to process
326             for (i = 0; i < unprocessed.length; i++) {
327                 if (unprocessed[i].size() > 0) {
328                     //Get the first element from the queue, and remove it from there
329                     mail = (Mail)unprocessed[i].remove(0);
330                     break;
331                 }
332             }
333 
334             //Check it we found anything
335             if (mail == null) {
336                 //We found no messages to process... we're done servicing the mail object
337                 return;
338             }
339 
340 
341             //Call the matcher and find what recipients match
342             Collection recipients = null;
343             Matcher matcher = (Matcher) matchers.get(i);
344             StringBuffer logMessageBuffer = null;
345             if (getLogger().isDebugEnabled()) {
346                 logMessageBuffer =
347                     new StringBuffer(128)
348                             .append("Checking ")
349                             .append(mail.getName())
350                             .append(" with ")
351                             .append(matcher);
352                 getLogger().debug(logMessageBuffer.toString());
353             }
354             try {
355                 recipients = matcher.match(mail);
356                 if (recipients == null) {
357                     //In case the matcher returned null, create an empty Collection
358                     recipients = new ArrayList(0);
359                 } else if (recipients != mail.getRecipients()) {
360                     //Make sure all the objects are MailAddress objects
361                     verifyMailAddresses(recipients);
362                 }
363             } catch (MessagingException me) {
364                 // look in the matcher's mailet's init attributes
365                 MailetConfig mailetConfig = ((Mailet) mailets.get(i)).getMailetConfig();
366                 String onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
367                 if (onMatchException == null) {
368                     onMatchException = Mail.ERROR;
369                 } else {
370                     onMatchException = onMatchException.trim().toLowerCase(Locale.US);
371                 }
372                 if (onMatchException.compareTo("nomatch") == 0) {
373                     //In case the matcher returned null, create an empty Collection
374                     recipients = new ArrayList(0);
375                 } else if (onMatchException.compareTo("matchall") == 0) {
376                     recipients = mail.getRecipients();
377                     // no need to verify addresses
378                 } else {
379                     handleException(me, mail, matcher.getMatcherConfig().getMatcherName(), onMatchException);
380                 }
381             }
382 
383             // Split the recipients into two pools.  notRecipients will contain the
384             // recipients on the message that the matcher did not return.
385             Collection notRecipients;
386             if (recipients == mail.getRecipients() || recipients.size() == 0) {
387                 notRecipients = new ArrayList(0);
388             } else {
389                 notRecipients = new ArrayList(mail.getRecipients());
390                 notRecipients.removeAll(recipients);
391             }
392 
393             if (recipients.size() == 0) {
394                 //Everything was not a match... store it in the next spot in the array
395                 unprocessed[i + 1].add(mail);
396                 continue;
397             }
398             if (notRecipients.size() != 0) {
399                 // There are a mix of recipients and not recipients.
400                 // We need to clone this message, put the notRecipients on the clone
401                 // and store it in the next spot
402                 Mail notMail = new MailImpl(mail,newName(mail));
403                 notMail.setRecipients(notRecipients);
404                 // set the state to the current processor
405                 notMail.setState(originalState);
406                 unprocessed[i + 1].add(notMail);
407                 //We have to set the reduce possible recipients on the old message
408                 mail.setRecipients(recipients);
409             }
410             // We have messages that need to process... time to run the mailet.
411             Mailet mailet = (Mailet) mailets.get(i);
412             if (getLogger().isDebugEnabled()) {
413                 logMessageBuffer =
414                     new StringBuffer(128)
415                             .append("Servicing ")
416                             .append(mail.getName())
417                             .append(" by ")
418                             .append(mailet.getMailetInfo());
419                 getLogger().debug(logMessageBuffer.toString());
420             }
421             try {
422                 mailet.service(mail);
423                 // Make sure all the recipients are still MailAddress objects
424                 verifyMailAddresses(mail.getRecipients());
425             } catch (MessagingException me) {
426                 MailetConfig mailetConfig = mailet.getMailetConfig();
427                 String onMailetException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMailetException");
428                 if (onMailetException == null) {
429                     onMailetException = Mail.ERROR;
430                 } else {
431                     onMailetException = onMailetException.trim().toLowerCase(Locale.US);
432                 }
433                 if (onMailetException.compareTo("ignore") == 0) {
434                     // ignore the exception and continue
435                     // this option should not be used if the mail object can be changed by the mailet
436                     verifyMailAddresses(mail.getRecipients());
437                 } else {
438                     handleException(me, mail, mailet.getMailetConfig().getMailetName(), onMailetException);
439                 }
440             }
441 
442             // See if the state was changed by the mailet
443             if (!mail.getState().equals(originalState)) {
444                 //If this message was ghosted, we just want to let it die
445                 if (mail.getState().equals(Mail.GHOST)) {
446                     // let this instance die...
447                     ContainerUtil.dispose(mail);
448                     mail = null;
449                     continue;
450                 }
451                 // This was just set to another state requiring further processing... 
452                 // Store this back in the spool and it will get picked up and 
453                 // run in that processor
454                 // We store only mails created by the matcher "splitting"
455                 // The original mail will be "stored" by the caller.
456                 if (originalMail != mail) {
457                     spool.store(mail);
458                     ContainerUtil.dispose(mail);
459                 }
460                 mail = null;
461                 continue;
462             } else {
463                 // Ok, we made it through with the same state... move it to the next
464                 //  spot in the array
465                 unprocessed[i + 1].add(mail);
466             }
467 
468         }
469     }
470 
471     /***
472      * Create a unique new primary key name.
473      *
474      * @param mail the mail to use as the basis for the new mail name
475      * 
476      * @return a new name
477      */
478     private String newName(Mail mail) {
479         StringBuffer nameBuffer =
480             new StringBuffer(64)
481                     .append(mail.getName())
482                     .append("-!")
483                     .append(random.nextInt(1048576));
484         return nameBuffer.toString();
485     }
486 
487 
488 
489     /***
490      * Checks that all objects in this class are of the form MailAddress.
491      *
492      * @throws MessagingException when the <code>Collection</code> contains objects that are not <code>MailAddress</code> objects
493      */
494     private void verifyMailAddresses(Collection col) throws MessagingException {
495         try {
496             MailAddress addresses[] = (MailAddress[])col.toArray(new MailAddress[0]);
497 
498             // Why is this here?  According to the javadoc for
499             // java.util.Collection.toArray(Object[]), this should
500             // never happen.  The exception will be thrown.
501             if (addresses.length != col.size()) {
502                 throw new MailetException("The recipient list contains objects other than MailAddress objects");
503             }
504         } catch (ArrayStoreException ase) {
505             throw new MailetException("The recipient list contains objects other than MailAddress objects");
506         }
507     }
508 
509     /***
510      * This is a helper method that updates the state of the mail object to
511      * Mail.ERROR as well as recording the exception to the log
512      *
513      * @param me the exception to be handled
514      * @param mail the mail being processed when the exception was generated
515      * @param offendersName the matcher or mailet than generated the exception
516      * @param nextState the next state to set
517      *
518      * @throws MessagingException thrown always, rethrowing the passed in exception
519      */
520     private void handleException(MessagingException me, Mail mail, String offendersName, String nextState) throws MessagingException {
521         System.err.println("exception! " + me);
522         mail.setState(nextState);
523         StringWriter sout = new StringWriter();
524         PrintWriter out = new PrintWriter(sout, true);
525         StringBuffer exceptionBuffer =
526             new StringBuffer(128)
527                     .append("Exception calling ")
528                     .append(offendersName)
529                     .append(": ")
530                     .append(me.getMessage());
531         out.println(exceptionBuffer.toString());
532         Exception e = me;
533         while (e != null) {
534             e.printStackTrace(out);
535             if (e instanceof MessagingException) {
536                 e = ((MessagingException)e).getNextException();
537             } else {
538                 e = null;
539             }
540         }
541         String errorString = sout.toString();
542         mail.setErrorMessage(errorString);
543         getLogger().error(errorString);
544         throw me;
545     }
546 }