View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.transport;
21  
22  import org.apache.avalon.framework.activity.Initializable;
23  import org.apache.avalon.framework.activity.Disposable;
24  import org.apache.avalon.framework.container.ContainerUtil;
25  import org.apache.avalon.framework.logger.AbstractLogEnabled;
26  import org.apache.james.core.MailImpl;
27  import org.apache.james.core.MailetConfigImpl;
28  import org.apache.james.services.SpoolRepository;
29  import org.apache.mailet.GenericMailet;
30  import org.apache.mailet.GenericMatcher;
31  import org.apache.mailet.Mail;
32  import org.apache.mailet.MailAddress;
33  import org.apache.mailet.Mailet;
34  import org.apache.mailet.MailetConfig;
35  import org.apache.mailet.MailetException;
36  import org.apache.mailet.Matcher;
37  
38  import javax.mail.MessagingException;
39  import java.io.PrintWriter;
40  import java.io.StringWriter;
41  import java.util.ArrayList;
42  import java.util.Collection;
43  import java.util.LinkedList;
44  import java.util.List;
45  import java.util.Random;
46  import java.util.Iterator;
47  import java.util.Locale;
48  
49  /***
50   * Implements a processor for mails, directing the mail down
51   * the chain of matchers/mailets.
52   *
53   *  SAMPLE CONFIGURATION
54   *  <processor name="try" onerror="return,log">
55   *      <mailet match="RecipientIsLocal" class="LocalDelivery">
56   *      </mailet>
57   *      <mailet match="All" class="RemoteDelivery">
58   *          <delayTime>21600000</delayTime>
59   *          <maxRetries>5</maxRetries>
60   *      </mailet>
61   *  </processor>
62   *
63   * Note that the 'onerror' attribute is not yet supported.
64   *
65   * As of James v2.2.0a5, 'onerror' functionality is implemented, but
66   * it is implemented on the <mailet> tag.  The specification is:
67   *
68   *   <mailet match="..." class="..."
69   *       [onMatchException="{noMatch|matchAll|error|<aProcessorName>}"] 
70   *       [onMailetException="{ignore|error|<aProcessorName>}"]>
71   *
72   * noMatch:   no addresses are considered to match
73   * matchAll:  all addresses are considered to match
74   * error:     as before, send the message to the ERROR processor
75   *
76   * Otherwise, a processor name can be specified, and the message will
77   * be sent there.
78   *
79   * <P>CVS $Id: LinearProcessor.java 494012 2007-01-08 10:23:58Z norman $</P>
80   * @version 2.2.0
81   */
82  public class LinearProcessor 
83      extends AbstractLogEnabled
84      implements Initializable, Disposable {
85  
86      private static final Random random = new Random();  // Used to generate new mail names
87  
88      /***
89       *  The name of the matcher used to terminate the matcher chain.  The
90       *  end of the matcher/mailet chain must be a matcher that matches
91       *  all mails and a mailet that sets every mail to GHOST status.
92       *  This is necessary to ensure that mails are removed from the spool
93       *  in an orderly fashion.
94       */
95      private static final String TERMINATING_MATCHER_NAME = "Terminating%Matcher%Name";
96  
97      /***
98       *  The name of the mailet used to terminate the mailet chain.  The
99       *  end of the matcher/mailet chain must be a matcher that matches
100      *  all mails and a mailet that sets every mail to GHOST status.
101      *  This is necessary to ensure that mails are removed from the spool
102      *  in an orderly fashion.
103      */
104     private static final String TERMINATING_MAILET_NAME = "Terminating%Mailet%Name";
105 
106     private List mailets;  // The list of mailets for this processor
107     private List matchers; // The list of matchers for this processor
108     private volatile boolean listsClosed;  // Whether the matcher/mailet lists have been closed.
109     private SpoolRepository spool;  // The spool on which this processor is acting
110 
111     /***
112      * Set the spool to be used by this LinearProcessor.
113      *
114      * @param spool the spool to be used by this processor
115      *
116      * @throws IllegalArgumentException when the spool passed in is null
117      */
118     public void setSpool(SpoolRepository spool) {
119         if (spool == null) {
120             throw new IllegalArgumentException("The spool cannot be null");
121         }
122         this.spool = spool;
123     }
124 
125     /***
126      * @see org.apache.avalon.framework.activity.Initializable#initialize()
127      */
128     public void initialize() {
129         matchers = new ArrayList();
130         mailets = new ArrayList();
131     }
132 
133     /***
134      * <p>The dispose operation is called at the end of a components lifecycle.
135      * Instances of this class use this method to release and destroy any
136      * resources that they own.</p>
137      *
138      * <p>This implementation disposes of all the mailet instances added to the
139      * processor</p>
140      *
141      * @throws Exception if an error is encountered during shutdown
142      */
143     public void dispose() {
144         Iterator it = mailets.iterator();
145         boolean debugEnabled = getLogger().isDebugEnabled();
146         while (it.hasNext()) {
147             Mailet mailet = (Mailet)it.next();
148             if (debugEnabled) {
149                 getLogger().debug("Shutdown mailet " + mailet.getMailetInfo());
150             }
151             mailet.destroy();
152         }
153     }
154 
155     /***
156      * <p>Adds a new <code>Matcher</code> / <code>Mailet</code> pair
157      * to the processor.  Checks to ensure that the matcher and
158      * mailet passed in are not null.  Synchronized to ensure that
159      * the matchers and mailets are kept in sync.</p>
160      *
161      * <p>It is an essential part of the contract of the LinearProcessor
162      * that a particular matcher/mailet combination be used to
163      * terminate the processor chain.  This is done by calling the  
164      * closeProcessorList method.</p>
165      *
166      * <p>Once the closeProcessorList has been called any subsequent
167      * call to the add method will result in an IllegalStateException.</p>
168      *
169      * <p>This method is synchronized to protect against corruption of
170      * matcher/mailets lists</p>
171      *
172      * @param matcher the new matcher being added
173      * @param mailet the new mailet being added
174      *
175      * @throws IllegalArgumentException when the matcher or mailet passed in is null
176      * @throws IllegalStateException when this method is called after the processor lists have been closed
177      */
178     public synchronized void add(Matcher matcher, Mailet mailet) {
179         if (matcher == null) {
180             throw new IllegalArgumentException("Null valued matcher passed to LinearProcessor.");
181         }
182         if (mailet == null) {
183             throw new IllegalArgumentException("Null valued mailet passed to LinearProcessor.");
184         }
185         if (listsClosed) {
186             throw new IllegalStateException("Attempt to add matcher/mailet after lists have been closed");
187         }
188         matchers.add(matcher);
189         mailets.add(mailet);
190     }
191 
192     /***
193      * <p>Closes the processor matcher/mailet list.</p>
194      *
195      * <p>This method is synchronized to protect against corruption of
196      * matcher/mailets lists</p>
197      *
198      * @throws IllegalStateException when this method is called after the processor lists have been closed
199      */
200     public synchronized void closeProcessorLists() {
201         if (listsClosed) {
202             throw new IllegalStateException("Processor's matcher/mailet lists have already been closed.");
203         }
204         Matcher terminatingMatcher =
205             new GenericMatcher() {
206                 public Collection match(Mail mail) {
207                     return mail.getRecipients();
208                 }
209             
210                 public String getMatcherInfo() {
211                     return TERMINATING_MATCHER_NAME;
212                 }
213             };
214         Mailet terminatingMailet = 
215             new GenericMailet() {
216                 public void service(Mail mail) {
217                     if (!(Mail.ERROR.equals(mail.getState()))) {
218                         // Don't complain if we fall off the end of the
219                         // error processor.  That is currently the
220                         // normal situation for James, and the message
221                         // will show up in the error store.
222                         StringBuffer warnBuffer = new StringBuffer(256)
223                                               .append("Message ")
224                                               .append(mail.getName())
225                                               .append(" reached the end of this processor, and is automatically deleted.  This may indicate a configuration error.");
226                         LinearProcessor.this.getLogger().warn(warnBuffer.toString());
227                     }
228                     mail.setState(Mail.GHOST);
229                 }
230             
231                 public String getMailetInfo() {
232                     return getMailetName();
233                 }
234             
235                 public String getMailetName() {
236                     return TERMINATING_MAILET_NAME;
237                 }
238             };
239         add(terminatingMatcher, terminatingMailet);
240         listsClosed = true;
241     }
242 
243     /***
244      * <p>Processes a single mail message through the chain of matchers and mailets.</p>
245      *
246      * <p>Calls to this method before setSpool has been called with a non-null argument
247      * will result in an <code>IllegalStateException</code>.</p>
248      *
249      * <p>If the matcher/mailet lists have not been closed by a call to the closeProcessorLists
250      * method then a call to this method will result in an <code>IllegalStateException</code>.
251      * The end of the matcher/mailet chain must be a matcher that matches all mails and 
252      * a mailet that sets every mail to GHOST status.  This is necessary to ensure that 
253      * mails are removed from the spool in an orderly fashion.  The closeProcessorLists method
254      * ensures this.</p>
255      * 
256      * @param mail the new mail to be processed
257      *
258      * @throws IllegalStateException when this method is called before the processor lists have been closed
259      *                                  or the spool has been initialized
260      */
261     public void service(Mail mail) throws MessagingException {
262         if (spool == null) {
263             throw new IllegalStateException("Attempt to service mail before the spool has been set to a non-null value");
264         }
265 
266         if (!listsClosed) {
267             throw new IllegalStateException("Attempt to service mail before matcher/mailet lists have been closed");
268         }
269 
270         if (getLogger().isDebugEnabled()) {
271             getLogger().debug("Servicing mail: " + mail.getName());
272         }
273         //  unprocessed is an array of Lists of Mail objects
274         //  the array indicates which matcher/mailet (stage in the linear
275         //  processor) that this Mail needs to be processed.
276         //  e.g., a Mail in unprocessed[0] needs to be
277         //  processed by the first matcher/mailet.
278         //
279         //  It is a List of Mail objects at each array spot as multiple Mail
280         //  objects could be at the same stage.
281         //
282         //  Note that every Mail object in this array will either be the 
283         //  original Mail object passed in, or a result of this method's
284         //  (and hence this thread's) processing.
285 
286         List[] unprocessed = new List[matchers.size() + 1];
287 
288         for (int i = 0; i < unprocessed.length; i++) {
289             // No need to use synchronization, as this is totally
290             // local to the method
291             unprocessed[i] = new LinkedList();
292         }
293 
294         //Add the object to the bottom of the list
295         unprocessed[0].add(mail);
296 
297         //This is the original state of the message
298         String originalState = mail.getState();
299         
300         
301         // The original mail: we should not care to save this mail.
302         // This should be saved in the spoolmanager.
303         Mail originalMail = mail;
304 
305         //We'll use these as temporary variables in the loop
306         mail = null;  // the message we're currently processing
307         int i = 0;    // where in the stage we're looking
308         while (true) {
309             //  The last element in the unprocessed array has mail messages
310             //  that have completed all stages.  We want them to just die,
311             //  so we clear that spot to allow garbage collection of the
312             //  objects.
313             //
314             //  Please note that the presence of the terminating mailet at the end
315             //  of the chain is critical to the proper operation
316             //  of the LinearProcessor code.  If this mailet is not placed
317             //  at the end of the chain with a terminating matcher, there is a 
318             //  potential for configuration or implementation errors to 
319             //  lead to mails trapped in the spool.  This matcher/mailet
320             //  combination is added when the closeProcessorList method
321             //  is called.
322             unprocessed[unprocessed.length - 1].clear();
323 
324             //initialize the mail reference we will be searching on
325             mail = null;
326 
327             //Scan through all stages, trying to find a message to process
328             for (i = 0; i < unprocessed.length; i++) {
329                 if (unprocessed[i].size() > 0) {
330                     //Get the first element from the queue, and remove it from there
331                     mail = (Mail)unprocessed[i].remove(0);
332                     break;
333                 }
334             }
335 
336             //Check it we found anything
337             if (mail == null) {
338                 //We found no messages to process... we're done servicing the mail object
339                 return;
340             }
341 
342 
343             //Call the matcher and find what recipients match
344             Collection recipients = null;
345             Matcher matcher = (Matcher) matchers.get(i);
346             StringBuffer logMessageBuffer = null;
347             if (getLogger().isDebugEnabled()) {
348                 logMessageBuffer =
349                     new StringBuffer(128)
350                             .append("Checking ")
351                             .append(mail.getName())
352                             .append(" with ")
353                             .append(matcher);
354                 getLogger().debug(logMessageBuffer.toString());
355             }
356             try {
357                 recipients = matcher.match(mail);
358                 if (recipients == null) {
359                     //In case the matcher returned null, create an empty Collection
360                     recipients = new ArrayList(0);
361                 } else if (recipients != mail.getRecipients()) {
362                     //Make sure all the objects are MailAddress objects
363                     verifyMailAddresses(recipients);
364                 }
365             } catch (MessagingException me) {
366                 // look in the matcher's mailet's init attributes
367                 MailetConfig mailetConfig = ((Mailet) mailets.get(i)).getMailetConfig();
368                 String onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
369                 if (onMatchException == null) {
370                     onMatchException = Mail.ERROR;
371                 } else {
372                     onMatchException = onMatchException.trim().toLowerCase(Locale.US);
373                 }
374                 if (onMatchException.compareTo("nomatch") == 0) {
375                     //In case the matcher returned null, create an empty Collection
376                     recipients = new ArrayList(0);
377                 } else if (onMatchException.compareTo("matchall") == 0) {
378                     recipients = mail.getRecipients();
379                     // no need to verify addresses
380                 } else {
381                     handleException(me, mail, matcher.getMatcherConfig().getMatcherName(), onMatchException);
382                 }
383             }
384 
385             // Split the recipients into two pools.  notRecipients will contain the
386             // recipients on the message that the matcher did not return.
387             Collection notRecipients;
388             if (recipients == mail.getRecipients() || recipients.size() == 0) {
389                 notRecipients = new ArrayList(0);
390             } else {
391                 notRecipients = new ArrayList(mail.getRecipients());
392                 notRecipients.removeAll(recipients);
393             }
394 
395             if (recipients.size() == 0) {
396                 //Everything was not a match... store it in the next spot in the array
397                 unprocessed[i + 1].add(mail);
398                 continue;
399             }
400             if (notRecipients.size() != 0) {
401                 // There are a mix of recipients and not recipients.
402                 // We need to clone this message, put the notRecipients on the clone
403                 // and store it in the next spot
404                 Mail notMail = new MailImpl(mail,newName(mail));
405                 notMail.setRecipients(notRecipients);
406                 // set the state to the current processor
407                 notMail.setState(originalState);
408                 unprocessed[i + 1].add(notMail);
409                 //We have to set the reduce possible recipients on the old message
410                 mail.setRecipients(recipients);
411             }
412             // We have messages that need to process... time to run the mailet.
413             Mailet mailet = (Mailet) mailets.get(i);
414             if (getLogger().isDebugEnabled()) {
415                 logMessageBuffer =
416                     new StringBuffer(128)
417                             .append("Servicing ")
418                             .append(mail.getName())
419                             .append(" by ")
420                             .append(mailet.getMailetInfo());
421                 getLogger().debug(logMessageBuffer.toString());
422             }
423             try {
424                 mailet.service(mail);
425                 // Make sure all the recipients are still MailAddress objects
426                 verifyMailAddresses(mail.getRecipients());
427             } catch (MessagingException me) {
428                 MailetConfig mailetConfig = mailet.getMailetConfig();
429                 String onMailetException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMailetException");
430                 if (onMailetException == null) {
431                     onMailetException = Mail.ERROR;
432                 } else {
433                     onMailetException = onMailetException.trim().toLowerCase(Locale.US);
434                 }
435                 if (onMailetException.compareTo("ignore") == 0) {
436                     // ignore the exception and continue
437                     // this option should not be used if the mail object can be changed by the mailet
438                     verifyMailAddresses(mail.getRecipients());
439                 } else {
440                     handleException(me, mail, mailet.getMailetConfig().getMailetName(), onMailetException);
441                 }
442             }
443 
444             // See if the state was changed by the mailet
445             if (!mail.getState().equals(originalState)) {
446                 //If this message was ghosted, we just want to let it die
447                 if (mail.getState().equals(Mail.GHOST)) {
448                     // let this instance die...
449                     ContainerUtil.dispose(mail);
450                     mail = null;
451                     continue;
452                 }
453                 // This was just set to another state requiring further processing... 
454                 // Store this back in the spool and it will get picked up and 
455                 // run in that processor
456                 // We store only mails created by the matcher "splitting"
457                 // The original mail will be "stored" by the caller.
458                 if (originalMail != mail) {
459                     spool.store(mail);
460                     ContainerUtil.dispose(mail);
461                 }
462                 mail = null;
463                 continue;
464             } else {
465                 // Ok, we made it through with the same state... move it to the next
466                 //  spot in the array
467                 unprocessed[i + 1].add(mail);
468             }
469 
470         }
471     }
472 
473     /***
474      * Create a unique new primary key name.
475      *
476      * @param mail the mail to use as the basis for the new mail name
477      * 
478      * @return a new name
479      */
480     private String newName(Mail mail) {
481         StringBuffer nameBuffer =
482             new StringBuffer(64)
483                     .append(mail.getName())
484                     .append("-!")
485                     .append(random.nextInt(1048576));
486         return nameBuffer.toString();
487     }
488 
489 
490 
491     /***
492      * Checks that all objects in this class are of the form MailAddress.
493      *
494      * @throws MessagingException when the <code>Collection</code> contains objects that are not <code>MailAddress</code> objects
495      */
496     private void verifyMailAddresses(Collection col) throws MessagingException {
497         try {
498             MailAddress addresses[] = (MailAddress[])col.toArray(new MailAddress[0]);
499 
500             // Why is this here?  According to the javadoc for
501             // java.util.Collection.toArray(Object[]), this should
502             // never happen.  The exception will be thrown.
503             if (addresses.length != col.size()) {
504                 throw new MailetException("The recipient list contains objects other than MailAddress objects");
505             }
506         } catch (ArrayStoreException ase) {
507             throw new MailetException("The recipient list contains objects other than MailAddress objects");
508         }
509     }
510 
511     /***
512      * This is a helper method that updates the state of the mail object to
513      * Mail.ERROR as well as recording the exception to the log
514      *
515      * @param me the exception to be handled
516      * @param mail the mail being processed when the exception was generated
517      * @param offendersName the matcher or mailet than generated the exception
518      * @param nextState the next state to set
519      *
520      * @throws MessagingException thrown always, rethrowing the passed in exception
521      */
522     private void handleException(MessagingException me, Mail mail, String offendersName, String nextState) throws MessagingException {
523         System.err.println("exception! " + me);
524         mail.setState(nextState);
525         StringWriter sout = new StringWriter();
526         PrintWriter out = new PrintWriter(sout, true);
527         StringBuffer exceptionBuffer =
528             new StringBuffer(128)
529                     .append("Exception calling ")
530                     .append(offendersName)
531                     .append(": ")
532                     .append(me.getMessage());
533         out.println(exceptionBuffer.toString());
534         Exception e = me;
535         while (e != null) {
536             e.printStackTrace(out);
537             if (e instanceof MessagingException) {
538                 e = ((MessagingException)e).getNextException();
539             } else {
540                 e = null;
541             }
542         }
543         String errorString = sout.toString();
544         mail.setErrorMessage(errorString);
545         getLogger().error(errorString);
546         throw me;
547     }
548 }