View Javadoc

1   /************************************************************************
2    * Copyright (c) 1999-2006 The Apache Software Foundation.             *
3    * All rights reserved.                                                *
4    * ------------------------------------------------------------------- *
5    * Licensed under the Apache License, Version 2.0 (the "License"); you *
6    * may not use this file except in compliance with the License. You    *
7    * may obtain a copy of the License at:                                *
8    *                                                                     *
9    *     http://www.apache.org/licenses/LICENSE-2.0                      *
10   *                                                                     *
11   * Unless required by applicable law or agreed to in writing, software *
12   * distributed under the License is distributed on an "AS IS" BASIS,   *
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or     *
14   * implied.  See the License for the specific language governing       *
15   * permissions and limitations under the License.                      *
16   ***********************************************************************/
17  
18  package org.apache.james.mailrepository;
19  
20  import org.apache.james.services.SpoolRepository;
21  import org.apache.mailet.Mail;
22  
23  import java.util.ConcurrentModificationException;
24  import java.util.Iterator;
25  
26  /***
27   * Implementation of a MailRepository on a FileSystem.
28   *
29   * Requires a configuration element in the .conf.xml file of the form:
30   *  <repository destinationURL="file://path-to-root-dir-for-repository"
31   *              type="MAIL"
32   *              model="SYNCHRONOUS"/>
33   * Requires a logger called MailRepository.
34   *
35   * @version 1.0.0, 24/04/1999
36   */
37  public class AvalonSpoolRepository
38      extends AvalonMailRepository
39      implements SpoolRepository {
40  
41      /***
42       * <p>Returns an arbitrarily selected mail deposited in this Repository.
43       * Usage: SpoolManager calls accept() to see if there are any unprocessed 
44       * mails in the spool repository.</p>
45       *
46       * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
47       *
48       * @return the mail
49       */
50      public synchronized Mail accept() throws InterruptedException {
51          if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
52              getLogger().debug("Method accept() called");
53          }
54          return accept(new SpoolRepository.AcceptFilter () {
55              public boolean accept (String _, String __, long ___, String ____) {
56                  return true;
57              }
58  
59              public long getWaitTime () {
60                  return 0;
61              }
62          });
63      }
64  
65      /***
66       * <p>Returns an arbitrarily selected mail deposited in this Repository that
67       * is either ready immediately for delivery, or is younger than it's last_updated plus
68       * the number of failed attempts times the delay time.
69       * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
70       * unprocessed mail is available.</p>
71       *
72       * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
73       *
74       * @return the mail
75       */
76      public synchronized Mail accept(final long delay) throws InterruptedException
77      {
78          if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
79              getLogger().debug("Method accept(delay) called");
80          }
81          return accept(new SpoolRepository.AcceptFilter () {
82              long youngest = 0;
83                  
84                  public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
85                      if (state.equals(Mail.ERROR)) {
86                          //Test the time...
87                          long timeToProcess = delay + lastUpdated;
88                  
89                          if (System.currentTimeMillis() > timeToProcess) {
90                              //We're ready to process this again
91                              return true;
92                          } else {
93                              //We're not ready to process this.
94                              if (youngest == 0 || youngest > timeToProcess) {
95                                  //Mark this as the next most likely possible mail to process
96                                  youngest = timeToProcess;
97                              }
98                              return false;
99                          }
100                     } else {
101                         //This mail is good to go... return the key
102                         return true;
103                     }
104                 }
105         
106                 public long getWaitTime () {
107                     if (youngest == 0) {
108                         return 0;
109                     } else {
110                         long duration = youngest - System.currentTimeMillis();
111                         youngest = 0; //get ready for next round
112                         return duration <= 0 ? 1 : duration;
113                     }
114                 }
115             });
116     }
117 
118 
119     /***
120      * Returns an arbitrarily select mail deposited in this Repository for
121      * which the supplied filter's accept method returns true.
122      * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
123      * based on number of retries if the mail is ready for processing.
124      * If no message is ready the method will block until one is, the amount of time to block is
125      * determined by calling the filters getWaitTime method.
126      *
127      * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
128      *
129      * @return  the mail
130      */
131     public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
132         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
133             getLogger().debug("Method accept(Filter) called");
134         }
135         while (!Thread.currentThread().isInterrupted()) try {
136             for (Iterator it = list(); it.hasNext(); ) {
137                 String s = it.next().toString();
138                 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
139                     StringBuffer logBuffer =
140                         new StringBuffer(64)
141                                 .append("Found item ")
142                                 .append(s)
143                                 .append(" in spool.");
144                     getLogger().debug(logBuffer.toString());
145                 }
146                 if (lock(s)) {
147                     if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
148                         getLogger().debug("accept(Filter) has locked: " + s);
149                     }
150                     try {
151                         Mail mail = retrieve(s);
152                         // Retrieve can return null if the mail is no longer on the spool
153                         // (i.e. another thread has gotten to it first).
154                         // In this case we simply continue to the next key
155                         if (mail == null || !filter.accept (mail.getName(),
156                                                             mail.getState(),
157                                                             mail.getLastUpdated().getTime(),
158                                                             mail.getErrorMessage())) {
159                             unlock(s);
160                             continue;
161                         }
162                         return mail;
163                     } catch (javax.mail.MessagingException e) {
164                         unlock(s);
165                         getLogger().error("Exception during retrieve -- skipping item " + s, e);
166                     }
167                 }
168             }
169 
170             //We did not find any... let's wait for a certain amount of time
171             wait (filter.getWaitTime());
172         } catch (InterruptedException ex) {
173             throw ex;
174         } catch (ConcurrentModificationException cme) {
175             // Should never get here now that list methods clones keyset for iterator
176             getLogger().error("CME in spooler - please report to http://james.apache.org", cme);
177         }
178         throw new InterruptedException();
179     }
180     
181 }