View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  
22  package org.apache.james.mailrepository;
23  
24  import org.apache.james.services.SpoolRepository;
25  import org.apache.mailet.Mail;
26  
27  import java.util.ConcurrentModificationException;
28  import java.util.Iterator;
29  
30  /**
31   * Implementation of a MailRepository on a FileSystem.
32   *
33   * Requires a configuration element in the .conf.xml file of the form:
34   *  <repository destinationURL="file://path-to-root-dir-for-repository"
35   *              type="MAIL"
36   *              model="SYNCHRONOUS"/>
37   * Requires a logger called MailRepository.
38   *
39   * @version 1.0.0, 24/04/1999
40   */
41  public class AvalonSpoolRepository
42      extends AvalonMailRepository
43      implements SpoolRepository {
44  
45      /**
46       * <p>Returns an arbitrarily selected mail deposited in this Repository.
47       * Usage: SpoolManager calls accept() to see if there are any unprocessed 
48       * mails in the spool repository.</p>
49       *
50       * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
51       *
52       * @return the mail
53       */
54      public synchronized Mail accept() throws InterruptedException {
55          if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
56              getLogger().debug("Method accept() called");
57          }
58          return accept(new SpoolRepository.AcceptFilter () {
59              public boolean accept (String _, String __, long ___, String ____) {
60                  return true;
61              }
62  
63              public long getWaitTime () {
64                  return 0;
65              }
66          });
67      }
68  
69      /**
70       * <p>Returns an arbitrarily selected mail deposited in this Repository that
71       * is either ready immediately for delivery, or is younger than it's last_updated plus
72       * the number of failed attempts times the delay time.
73       * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
74       * unprocessed mail is available.</p>
75       *
76       * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
77       *
78       * @return the mail
79       */
80      public synchronized Mail accept(final long delay) throws InterruptedException
81      {
82          if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
83              getLogger().debug("Method accept(delay) called");
84          }
85          return accept(new SpoolRepository.AcceptFilter () {
86              long youngest = 0;
87                  
88                  public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
89                      if (state.equals(Mail.ERROR)) {
90                          //Test the time...
91                          long timeToProcess = delay + lastUpdated;
92                  
93                          if (System.currentTimeMillis() > timeToProcess) {
94                              //We're ready to process this again
95                              return true;
96                          } else {
97                              //We're not ready to process this.
98                              if (youngest == 0 || youngest > timeToProcess) {
99                                  //Mark this as the next most likely possible mail to process
100                                 youngest = timeToProcess;
101                             }
102                             return false;
103                         }
104                     } else {
105                         //This mail is good to go... return the key
106                         return true;
107                     }
108                 }
109         
110                 public long getWaitTime () {
111                     if (youngest == 0) {
112                         return 0;
113                     } else {
114                         long duration = youngest - System.currentTimeMillis();
115                         youngest = 0; //get ready for next round
116                         return duration <= 0 ? 1 : duration;
117                     }
118                 }
119             });
120     }
121 
122 
123     /**
124      * Returns an arbitrarily select mail deposited in this Repository for
125      * which the supplied filter's accept method returns true.
126      * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
127      * based on number of retries if the mail is ready for processing.
128      * If no message is ready the method will block until one is, the amount of time to block is
129      * determined by calling the filters getWaitTime method.
130      *
131      * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
132      *
133      * @return  the mail
134      */
135     public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
136         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
137             getLogger().debug("Method accept(Filter) called");
138         }
139         while (!Thread.currentThread().isInterrupted()) try {
140             for (Iterator it = list(); it.hasNext(); ) {
141                 String s = it.next().toString();
142                 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
143                     StringBuffer logBuffer =
144                         new StringBuffer(64)
145                                 .append("Found item ")
146                                 .append(s)
147                                 .append(" in spool.");
148                     getLogger().debug(logBuffer.toString());
149                 }
150                 if (lock(s)) {
151                     if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
152                         getLogger().debug("accept(Filter) has locked: " + s);
153                     }
154                     try {
155                         Mail mail = retrieve(s);
156                         // Retrieve can return null if the mail is no longer on the spool
157                         // (i.e. another thread has gotten to it first).
158                         // In this case we simply continue to the next key
159                         if (mail == null || !filter.accept (mail.getName(),
160                                                             mail.getState(),
161                                                             mail.getLastUpdated().getTime(),
162                                                             mail.getErrorMessage())) {
163                             unlock(s);
164                             continue;
165                         }
166                         return mail;
167                     } catch (javax.mail.MessagingException e) {
168                         unlock(s);
169                         getLogger().error("Exception during retrieve -- skipping item " + s, e);
170                     }
171                 }
172             }
173 
174             //We did not find any... let's wait for a certain amount of time
175             wait (filter.getWaitTime());
176         } catch (InterruptedException ex) {
177             throw ex;
178         } catch (ConcurrentModificationException cme) {
179             // Should never get here now that list methods clones keyset for iterator
180             getLogger().error("CME in spooler - please report to http://james.apache.org", cme);
181         }
182         throw new InterruptedException();
183     }
184     
185 }