View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.mailrepository;
21  
22  import org.apache.james.services.SpoolRepository;
23  import org.apache.mailet.Mail;
24  
25  import java.util.ConcurrentModificationException;
26  import java.util.Iterator;
27  
28  /***
29   * Implementation of a MailRepository on a FileSystem.
30   *
31   * Requires a configuration element in the .conf.xml file of the form:
32   *  <repository destinationURL="file://path-to-root-dir-for-repository"
33   *              type="MAIL"
34   *              model="SYNCHRONOUS"/>
35   * Requires a logger called MailRepository.
36   *
37   * @version 1.0.0, 24/04/1999
38   */
39  public class AvalonSpoolRepository
40      extends AvalonMailRepository
41      implements SpoolRepository {
42  
43      /***
44       * <p>Returns an arbitrarily selected mail deposited in this Repository.
45       * Usage: SpoolManager calls accept() to see if there are any unprocessed 
46       * mails in the spool repository.</p>
47       *
48       * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
49       *
50       * @return the mail
51       */
52      public synchronized Mail accept() throws InterruptedException {
53          if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
54              getLogger().debug("Method accept() called");
55          }
56          return accept(new SpoolRepository.AcceptFilter () {
57              public boolean accept (String _, String __, long ___, String ____) {
58                  return true;
59              }
60  
61              public long getWaitTime () {
62                  return 0;
63              }
64          });
65      }
66  
67      /***
68       * <p>Returns an arbitrarily selected mail deposited in this Repository that
69       * is either ready immediately for delivery, or is younger than it's last_updated plus
70       * the number of failed attempts times the delay time.
71       * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
72       * unprocessed mail is available.</p>
73       *
74       * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
75       *
76       * @return the mail
77       */
78      public synchronized Mail accept(final long delay) throws InterruptedException
79      {
80          if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
81              getLogger().debug("Method accept(delay) called");
82          }
83          return accept(new SpoolRepository.AcceptFilter () {
84              long youngest = 0;
85                  
86                  public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
87                      if (state.equals(Mail.ERROR)) {
88                          //Test the time...
89                          long timeToProcess = delay + lastUpdated;
90                  
91                          if (System.currentTimeMillis() > timeToProcess) {
92                              //We're ready to process this again
93                              return true;
94                          } else {
95                              //We're not ready to process this.
96                              if (youngest == 0 || youngest > timeToProcess) {
97                                  //Mark this as the next most likely possible mail to process
98                                  youngest = timeToProcess;
99                              }
100                             return false;
101                         }
102                     } else {
103                         //This mail is good to go... return the key
104                         return true;
105                     }
106                 }
107         
108                 public long getWaitTime () {
109                     if (youngest == 0) {
110                         return 0;
111                     } else {
112                         long duration = youngest - System.currentTimeMillis();
113                         youngest = 0; //get ready for next round
114                         return duration <= 0 ? 1 : duration;
115                     }
116                 }
117             });
118     }
119 
120 
121     /***
122      * Returns an arbitrarily select mail deposited in this Repository for
123      * which the supplied filter's accept method returns true.
124      * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
125      * based on number of retries if the mail is ready for processing.
126      * If no message is ready the method will block until one is, the amount of time to block is
127      * determined by calling the filters getWaitTime method.
128      *
129      * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
130      *
131      * @return  the mail
132      */
133     public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
134         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
135             getLogger().debug("Method accept(Filter) called");
136         }
137         while (!Thread.currentThread().isInterrupted()) try {
138             for (Iterator it = list(); it.hasNext(); ) {
139                 String s = it.next().toString();
140                 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
141                     StringBuffer logBuffer =
142                         new StringBuffer(64)
143                                 .append("Found item ")
144                                 .append(s)
145                                 .append(" in spool.");
146                     getLogger().debug(logBuffer.toString());
147                 }
148                 if (lock(s)) {
149                     if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
150                         getLogger().debug("accept(Filter) has locked: " + s);
151                     }
152                     try {
153                         Mail mail = retrieve(s);
154                         // Retrieve can return null if the mail is no longer on the spool
155                         // (i.e. another thread has gotten to it first).
156                         // In this case we simply continue to the next key
157                         if (mail == null || !filter.accept (mail.getName(),
158                                                             mail.getState(),
159                                                             mail.getLastUpdated().getTime(),
160                                                             mail.getErrorMessage())) {
161                             unlock(s);
162                             continue;
163                         }
164                         return mail;
165                     } catch (javax.mail.MessagingException e) {
166                         unlock(s);
167                         getLogger().error("Exception during retrieve -- skipping item " + s, e);
168                     }
169                 }
170             }
171 
172             //We did not find any... let's wait for a certain amount of time
173             wait (filter.getWaitTime());
174         } catch (InterruptedException ex) {
175             throw ex;
176         } catch (ConcurrentModificationException cme) {
177             // Should never get here now that list methods clones keyset for iterator
178             getLogger().error("CME in spooler - please report to http://james.apache.org", cme);
179         }
180         throw new InterruptedException();
181     }
182     
183 }