View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.test.mock.james;
21  
22  import org.apache.avalon.framework.activity.Disposable;
23  import org.apache.avalon.framework.container.ContainerUtil;
24  import org.apache.james.core.MailImpl;
25  import org.apache.james.services.SpoolRepository;
26  import org.apache.james.test.mock.avalon.MockLogger;
27  import org.apache.james.util.Lock;
28  import org.apache.mailet.Mail;
29  
30  import javax.mail.MessagingException;
31  
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.ConcurrentModificationException;
35  import java.util.Hashtable;
36  import java.util.Iterator;
37  
38  /***
39   * Implementation of a MailRepository on a FileSystem.
40   *
41   * Requires a configuration element in the .conf.xml file of the form:
42   *  <repository destinationURL="file://path-to-root-dir-for-repository"
43   *              type="MAIL"
44   *              model="SYNCHRONOUS"/>
45   * Requires a logger called MailRepository.
46   *
47   * @version 1.0.0, 24/04/1999
48   */
49  public class InMemorySpoolRepository
50      implements SpoolRepository, Disposable {
51  
52      /***
53       * Whether 'deep debugging' is turned on.
54       */
55      protected final static boolean DEEP_DEBUG = true;
56      private Lock lock;
57      private MockLogger logger;
58      private Hashtable spool;
59  
60      private MockLogger getLogger() {
61          if (logger == null) {
62              logger = new MockLogger();
63          }
64          return logger;
65      }
66  
67      /***
68       * Releases a lock on a message identified by a key
69       *
70       * @param key the key of the message to be unlocked
71       *
72       * @return true if successfully released the lock, false otherwise
73       */
74      public boolean unlock(String key) {
75          if (lock.unlock(key)) {
76              if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
77                  StringBuffer debugBuffer =
78                      new StringBuffer(256)
79                              .append("Unlocked ")
80                              .append(key)
81                              .append(" for ")
82                              .append(Thread.currentThread().getName())
83                              .append(" @ ")
84                              .append(new java.util.Date(System.currentTimeMillis()));
85                  getLogger().debug(debugBuffer.toString());
86              }
87              return true;
88          } else {
89              return false;
90          }
91      }
92  
93      /***
94       * Obtains a lock on a message identified by a key
95       *
96       * @param key the key of the message to be locked
97       *
98       * @return true if successfully obtained the lock, false otherwise
99       */
100     public boolean lock(String key) {
101         if (lock.lock(key)) {
102             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
103                 StringBuffer debugBuffer =
104                     new StringBuffer(256)
105                             .append("Locked ")
106                             .append(key)
107                             .append(" for ")
108                             .append(Thread.currentThread().getName())
109                             .append(" @ ")
110                             .append(new java.util.Date(System.currentTimeMillis()));
111                 getLogger().debug(debugBuffer.toString());
112             }
113 //            synchronized (this) {
114 //                notifyAll();
115 //            }
116             return true;
117         } else {
118             return false;
119         }
120     }
121 
122     /***
123      * Stores a message in this repository. Shouldn't this return the key
124      * under which it is stored?
125      *
126      * @param mc the mail message to store
127      */
128     public void store(Mail mc) throws MessagingException {
129         try {
130             String key = mc.getName();
131             //Remember whether this key was locked
132             boolean wasLocked = true;
133             synchronized (this) {
134                 wasLocked = lock.isLocked(key);
135     
136                 if (!wasLocked) {
137                     //If it wasn't locked, we want a lock during the store
138                     lock(key);
139                 }
140             }
141             try {
142                 // Remove any previous copy of this mail
143                 if (spool.containsKey(key)) {
144                     // do not use this.remove because this would
145                     // also remove a current lock.
146                     Object o = spool.remove(key);
147                     ContainerUtil.dispose(o);
148                 }
149                 // Clone the mail (so the caller could modify it).
150                 MailImpl m = new MailImpl(mc,mc.getName());
151                 m.setState(mc.getState());
152                 m.setLastUpdated(mc.getLastUpdated());
153                 m.setErrorMessage(mc.getErrorMessage());
154                 spool.put(mc.getName(),m);
155             } finally {
156                 if (!wasLocked) {
157                     // If it wasn't locked, we need to unlock now
158                     unlock(key);
159                     synchronized (this) {
160                         notify();
161                     }
162                 }
163             }
164 
165             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
166                 StringBuffer logBuffer =
167                     new StringBuffer(64)
168                             .append("Mail ")
169                             .append(key)
170                             .append(" stored.");
171                 getLogger().debug(logBuffer.toString());
172             }
173 
174         } catch (Exception e) {
175             getLogger().error("Exception storing mail: " + e,e);
176             throw new MessagingException("Exception caught while storing Message Container: ",e);
177         }
178     }
179 
180     /***
181      * Retrieves a message given a key. At the moment, keys can be obtained
182      * from list() in superinterface Store.Repository
183      *
184      * @param key the key of the message to retrieve
185      * @return the mail corresponding to this key, null if none exists
186      */
187     public Mail retrieve(String key) throws MessagingException {
188         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
189             getLogger().debug("Retrieving mail: " + key);
190         }
191         try {
192             Mail mc = null;
193             try {
194                 mc = new MailImpl((Mail) spool.get(key),key);
195                 mc.setState(((Mail) spool.get(key)).getState());
196                 mc.setErrorMessage(((Mail) spool.get(key)).getErrorMessage());
197                 mc.setLastUpdated(((Mail) spool.get(key)).getLastUpdated());
198             } 
199             catch (RuntimeException re){
200                 StringBuffer exceptionBuffer = new StringBuffer(128);
201                 if(re.getCause() instanceof Error){
202                     exceptionBuffer.append("Error when retrieving mail, not deleting: ")
203                             .append(re.toString());
204                 }else{
205                     exceptionBuffer.append("Exception retrieving mail: ")
206                             .append(re.toString())
207                             .append(", so we're deleting it.");
208                     remove(key);
209                 }
210                 getLogger().warn(exceptionBuffer.toString());
211                 return null;
212             }
213             return mc;
214         } catch (Exception me) {
215             getLogger().error("Exception retrieving mail: " + me);
216             throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
217         }
218     }
219 
220     /***
221      * Removes a specified message
222      *
223      * @param mail the message to be removed from the repository
224      */
225     public void remove(Mail mail) throws MessagingException {
226         remove(mail.getName());
227     }
228 
229 
230     /***
231      * Removes a Collection of mails from the repository
232      * @param mails The Collection of <code>MailImpl</code>'s to delete
233      * @throws MessagingException
234      * @since 2.2.0
235      */
236     public void remove(Collection mails) throws MessagingException {
237         Iterator delList = mails.iterator();
238         while (delList.hasNext()) {
239             remove((Mail)delList.next());
240         }
241     }
242 
243     /***
244      * Removes a message identified by key.
245      *
246      * @param key the key of the message to be removed from the repository
247      */
248     public void remove(String key) throws MessagingException {
249         if (lock(key)) {
250             try {
251                 if (spool != null) {
252                     Object o = spool.remove(key);
253                     ContainerUtil.dispose(o);
254                 }
255             } finally {
256                 unlock(key);
257             }
258         } else {
259             StringBuffer exceptionBuffer =
260                 new StringBuffer(64)
261                         .append("Cannot lock ")
262                         .append(key)
263                         .append(" to remove it");
264             throw new MessagingException(exceptionBuffer.toString());
265         }
266     }
267 
268     /***
269      * List string keys of messages in repository.
270      *
271      * @return an <code>Iterator</code> over the list of keys in the repository
272      *
273      */
274     public Iterator list() {
275         // Fix ConcurrentModificationException by cloning 
276         // the keyset before getting an iterator
277         final ArrayList clone;
278         synchronized(spool) {
279             clone = new ArrayList(spool.keySet());
280         }
281         return clone.iterator();
282     }
283 
284     
285     /***
286      * <p>Returns an arbitrarily selected mail deposited in this Repository.
287      * Usage: SpoolManager calls accept() to see if there are any unprocessed 
288      * mails in the spool repository.</p>
289      *
290      * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
291      *
292      * @return the mail
293      */
294     public synchronized Mail accept() throws InterruptedException {
295         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
296             getLogger().debug("Method accept() called");
297         }
298         return accept(new SpoolRepository.AcceptFilter () {
299             public boolean accept (String _, String __, long ___, String ____) {
300                 return true;
301             }
302 
303             public long getWaitTime () {
304                 return 0;
305             }
306         });
307     }
308 
309     /***
310      * <p>Returns an arbitrarily selected mail deposited in this Repository that
311      * is either ready immediately for delivery, or is younger than it's last_updated plus
312      * the number of failed attempts times the delay time.
313      * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
314      * unprocessed mail is available.</p>
315      *
316      * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
317      *
318      * @return the mail
319      */
320     public synchronized Mail accept(final long delay) throws InterruptedException
321     {
322         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
323             getLogger().debug("Method accept(delay) called");
324         }
325         return accept(new SpoolRepository.AcceptFilter () {
326             long youngest = 0;
327                 
328                 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
329                     if (state.equals(Mail.ERROR)) {
330                         //Test the time...
331                         long timeToProcess = delay + lastUpdated;
332                 
333                         if (System.currentTimeMillis() > timeToProcess) {
334                             //We're ready to process this again
335                             return true;
336                         } else {
337                             //We're not ready to process this.
338                             if (youngest == 0 || youngest > timeToProcess) {
339                                 //Mark this as the next most likely possible mail to process
340                                 youngest = timeToProcess;
341                             }
342                             return false;
343                         }
344                     } else {
345                         //This mail is good to go... return the key
346                         return true;
347                     }
348                 }
349         
350                 public long getWaitTime () {
351                     if (youngest == 0) {
352                         return 0;
353                     } else {
354                         long duration = youngest - System.currentTimeMillis();
355                         youngest = 0; //get ready for next round
356                         return duration <= 0 ? 1 : duration;
357                     }
358                 }
359             });
360     }
361 
362 
363     /***
364      * Returns an arbitrarily select mail deposited in this Repository for
365      * which the supplied filter's accept method returns true.
366      * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
367      * based on number of retries if the mail is ready for processing.
368      * If no message is ready the method will block until one is, the amount of time to block is
369      * determined by calling the filters getWaitTime method.
370      *
371      * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
372      *
373      * @return  the mail
374      */
375     public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
376         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
377             getLogger().debug("Method accept(Filter) called");
378         }
379         while (!Thread.currentThread().isInterrupted()) try {
380             for (Iterator it = list(); it.hasNext(); ) {
381                 String s = it.next().toString();
382                 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
383                     StringBuffer logBuffer =
384                         new StringBuffer(64)
385                                 .append("Found item ")
386                                 .append(s)
387                                 .append(" in spool.");
388                     getLogger().debug(logBuffer.toString());
389                 }
390                 if (lock(s)) {
391                     if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
392                         getLogger().debug("accept(Filter) has locked: " + s);
393                     }
394                     try {
395                         Mail mail = retrieve(s);
396                         // Retrieve can return null if the mail is no longer on the spool
397                         // (i.e. another thread has gotten to it first).
398                         // In this case we simply continue to the next key
399                         if (mail == null || !filter.accept (mail.getName(),
400                                                             mail.getState(),
401                                                             mail.getLastUpdated().getTime(),
402                                                             mail.getErrorMessage())) {
403                             unlock(s);
404                             continue;
405                         }
406                         return mail;
407                     } catch (javax.mail.MessagingException e) {
408                         unlock(s);
409                         getLogger().error("Exception during retrieve -- skipping item " + s, e);
410                     }
411                 }
412             }
413 
414             //We did not find any... let's wait for a certain amount of time
415             wait (filter.getWaitTime());
416         } catch (InterruptedException ex) {
417             throw ex;
418         } catch (ConcurrentModificationException cme) {
419             // Should never get here now that list methods clones keyset for iterator
420             getLogger().error("CME in spooler - please report to http://james.apache.org", cme);
421         }
422         throw new InterruptedException();
423     }
424 
425     /***
426      * 
427      */
428     public InMemorySpoolRepository() {
429         spool = new Hashtable();
430         lock = new Lock();
431     }
432 
433     public int size() {
434         return spool.size();
435     }
436 
437     public void clear() {
438         if (spool != null) {
439             Iterator i = list();
440             while (i.hasNext()) {
441                 String key = (String) i.next();
442                 try {
443                     remove(key);
444                 } catch (MessagingException e) {
445                 }
446             }
447         }
448     }
449 
450     public void dispose() {
451         clear();
452     }
453 
454     public String toString() {
455         StringBuffer result = new StringBuffer();
456         result.append(super.toString());
457         Iterator i = list();
458         while (i.hasNext()) {
459             result.append("\n\t"+i.next());
460         }
461         return result.toString();
462     }
463     
464 }