View Javadoc

1   /************************************************************************
2    * Copyright (c) 1999-2006 The Apache Software Foundation.             *
3    * All rights reserved.                                                *
4    * ------------------------------------------------------------------- *
5    * Licensed under the Apache License, Version 2.0 (the "License"); you *
6    * may not use this file except in compliance with the License. You    *
7    * may obtain a copy of the License at:                                *
8    *                                                                     *
9    *     http://www.apache.org/licenses/LICENSE-2.0                      *
10   *                                                                     *
11   * Unless required by applicable law or agreed to in writing, software *
12   * distributed under the License is distributed on an "AS IS" BASIS,   *
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or     *
14   * implied.  See the License for the specific language governing       *
15   * permissions and limitations under the License.                      *
16   ***********************************************************************/
17  
18  package org.apache.james.mailrepository;
19  
20  import org.apache.avalon.cornerstone.services.store.ObjectRepository;
21  import org.apache.avalon.cornerstone.services.store.Store;
22  import org.apache.avalon.cornerstone.services.store.StreamRepository;
23  import org.apache.avalon.framework.activity.Initializable;
24  import org.apache.avalon.framework.configuration.Configurable;
25  import org.apache.avalon.framework.configuration.Configuration;
26  import org.apache.avalon.framework.configuration.ConfigurationException;
27  import org.apache.avalon.framework.configuration.DefaultConfiguration;
28  import org.apache.avalon.framework.logger.AbstractLogEnabled;
29  import org.apache.avalon.framework.service.ServiceException;
30  import org.apache.avalon.framework.service.ServiceManager;
31  import org.apache.avalon.framework.service.Serviceable;
32  import org.apache.james.core.MimeMessageCopyOnWriteProxy;
33  import org.apache.james.core.MimeMessageWrapper;
34  import org.apache.james.services.MailRepository;
35  import org.apache.james.util.Lock;
36  import org.apache.mailet.Mail;
37  
38  import javax.mail.MessagingException;
39  import javax.mail.internet.MimeMessage;
40  
41  import java.io.OutputStream;
42  import java.util.ArrayList;
43  import java.util.Collection;
44  import java.util.Collections;
45  import java.util.HashSet;
46  import java.util.Iterator;
47  import java.util.Set;
48  
49  /***
50   * Implementation of a MailRepository on a FileSystem.
51   *
52   * Requires a configuration element in the .conf.xml file of the form:
53   *  <repository destinationURL="file://path-to-root-dir-for-repository"
54   *              type="MAIL"
55   *              model="SYNCHRONOUS"/>
56   * Requires a logger called MailRepository.
57   *
58   * @version 1.0.0, 24/04/1999
59   */
60  public class AvalonMailRepository
61      extends AbstractLogEnabled
62      implements MailRepository, Configurable, Serviceable, Initializable {
63  
64      /***
65       * Whether 'deep debugging' is turned on.
66       */
67      protected final static boolean DEEP_DEBUG = false;
68  
69      private Lock lock;
70      private Store store;
71      private StreamRepository sr;
72      private ObjectRepository or;
73      private String destination;
74      private Set keys;
75      private boolean fifo;
76      private boolean cacheKeys; // experimental: for use with write mostly repositories such as spam and error
77  
78      /***
79       * @see org.apache.avalon.framework.service.Serviceable#compose(ServiceManager )
80       */
81      public void service( final ServiceManager componentManager )
82              throws ServiceException {
83          store = (Store)componentManager.lookup( Store.ROLE );
84      }
85  
86      /***
87       * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
88       */
89      public void configure(Configuration conf) throws ConfigurationException {
90          destination = conf.getAttribute("destinationURL");
91          if (getLogger().isDebugEnabled()) {
92              getLogger().debug("AvalonMailRepository.destinationURL: " + destination);
93          }
94          String checkType = conf.getAttribute("type");
95          if (! (checkType.equals("MAIL") || checkType.equals("SPOOL")) ) {
96              String exceptionString = "Attempt to configure AvalonMailRepository as " +
97                                       checkType;
98              if (getLogger().isWarnEnabled()) {
99                  getLogger().warn(exceptionString);
100             }
101             throw new ConfigurationException(exceptionString);
102         }
103         fifo = conf.getAttributeAsBoolean("FIFO", false);
104         cacheKeys = conf.getAttributeAsBoolean("CACHEKEYS", true);
105         // ignore model
106     }
107 
108     /***
109      * @see org.apache.avalon.framework.activity.Initializable#initialize()
110      */
111     public void initialize()
112             throws Exception {
113         try {
114             //prepare Configurations for object and stream repositories
115             DefaultConfiguration objectConfiguration
116                 = new DefaultConfiguration( "repository",
117                                             "generated:AvalonFileRepository.compose()" );
118 
119             objectConfiguration.setAttribute("destinationURL", destination);
120             objectConfiguration.setAttribute("type", "OBJECT");
121             objectConfiguration.setAttribute("model", "SYNCHRONOUS");
122 
123             DefaultConfiguration streamConfiguration
124                 = new DefaultConfiguration( "repository",
125                                             "generated:AvalonFileRepository.compose()" );
126 
127             streamConfiguration.setAttribute( "destinationURL", destination );
128             streamConfiguration.setAttribute( "type", "STREAM" );
129             streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
130 
131             sr = (StreamRepository) store.select(streamConfiguration);
132             or = (ObjectRepository) store.select(objectConfiguration);
133             lock = new Lock();
134             if (cacheKeys) keys = Collections.synchronizedSet(new HashSet());
135 
136 
137             //Finds non-matching pairs and deletes the extra files
138             HashSet streamKeys = new HashSet();
139             for (Iterator i = sr.list(); i.hasNext(); ) {
140                 streamKeys.add(i.next());
141             }
142             HashSet objectKeys = new HashSet();
143             for (Iterator i = or.list(); i.hasNext(); ) {
144                 objectKeys.add(i.next());
145             }
146 
147             Collection strandedStreams = (Collection)streamKeys.clone();
148             strandedStreams.removeAll(objectKeys);
149             for (Iterator i = strandedStreams.iterator(); i.hasNext(); ) {
150                 String key = (String)i.next();
151                 remove(key);
152             }
153 
154             Collection strandedObjects = (Collection)objectKeys.clone();
155             strandedObjects.removeAll(streamKeys);
156             for (Iterator i = strandedObjects.iterator(); i.hasNext(); ) {
157                 String key = (String)i.next();
158                 remove(key);
159             }
160 
161             if (keys != null) {
162                 // Next get a list from the object repository
163                 // and use that for the list of keys
164                 keys.clear();
165                 for (Iterator i = or.list(); i.hasNext(); ) {
166                     keys.add(i.next());
167                 }
168             }
169             if (getLogger().isDebugEnabled()) {
170                 StringBuffer logBuffer =
171                     new StringBuffer(128)
172                             .append(this.getClass().getName())
173                             .append(" created in ")
174                             .append(destination);
175                 getLogger().debug(logBuffer.toString());
176             }
177         } catch (Exception e) {
178             final String message = "Failed to retrieve Store component:" + e.getMessage();
179             getLogger().error( message, e );
180             throw e;
181         }
182     }
183 
184     /***
185      * Releases a lock on a message identified by a key
186      *
187      * @param key the key of the message to be unlocked
188      *
189      * @return true if successfully released the lock, false otherwise
190      */
191     public boolean unlock(String key) {
192         if (lock.unlock(key)) {
193             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
194                 StringBuffer debugBuffer =
195                     new StringBuffer(256)
196                             .append("Unlocked ")
197                             .append(key)
198                             .append(" for ")
199                             .append(Thread.currentThread().getName())
200                             .append(" @ ")
201                             .append(new java.util.Date(System.currentTimeMillis()));
202                 getLogger().debug(debugBuffer.toString());
203             }
204             return true;
205         } else {
206             return false;
207         }
208     }
209 
210     /***
211      * Obtains a lock on a message identified by a key
212      *
213      * @param key the key of the message to be locked
214      *
215      * @return true if successfully obtained the lock, false otherwise
216      */
217     public boolean lock(String key) {
218         if (lock.lock(key)) {
219             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
220                 StringBuffer debugBuffer =
221                     new StringBuffer(256)
222                             .append("Locked ")
223                             .append(key)
224                             .append(" for ")
225                             .append(Thread.currentThread().getName())
226                             .append(" @ ")
227                             .append(new java.util.Date(System.currentTimeMillis()));
228                 getLogger().debug(debugBuffer.toString());
229             }
230 //            synchronized (this) {
231 //                notifyAll();
232 //            }
233             return true;
234         } else {
235             return false;
236         }
237     }
238 
239     /***
240      * Stores a message in this repository. Shouldn't this return the key
241      * under which it is stored?
242      *
243      * @param mc the mail message to store
244      */
245     public void store(Mail mc) throws MessagingException {
246         try {
247             String key = mc.getName();
248             //Remember whether this key was locked
249             boolean wasLocked = true;
250             synchronized (this) {
251                 wasLocked = lock.isLocked(key);
252     
253                 if (!wasLocked) {
254                     //If it wasn't locked, we want a lock during the store
255                     lock(key);
256                 }
257             }
258             try {
259                 if (keys != null && !keys.contains(key)) {
260                     keys.add(key);
261                 }
262                 boolean saveStream = true;
263 
264                 MimeMessage message = mc.getMessage();
265                 // if the message is a Copy on Write proxy we check the wrapped message
266                 // to optimize the behaviour in case of MimeMessageWrapper
267                 if (message instanceof MimeMessageCopyOnWriteProxy) {
268                     MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) message;
269                     message = messageCow.getWrappedMessage();
270                 }
271                 if (message instanceof MimeMessageWrapper) {
272                     MimeMessageWrapper wrapper = (MimeMessageWrapper) message;
273                     if (DEEP_DEBUG) {
274                         System.out.println("Retrieving from: " + wrapper.getSourceId());
275                         StringBuffer debugBuffer =
276                             new StringBuffer(64)
277                                     .append("Saving to:       ")
278                                     .append(destination)
279                                     .append("/")
280                                     .append(mc.getName());
281                         System.out.println(debugBuffer.toString());
282                         System.out.println("Modified: " + wrapper.isModified());
283                     }
284                     StringBuffer destinationBuffer =
285                         new StringBuffer(128)
286                             .append(destination)
287                             .append("/")
288                             .append(mc.getName());
289                     if (destinationBuffer.toString().equals(wrapper.getSourceId()) && !wrapper.isModified()) {
290                         //We're trying to save to the same place, and it's not modified... we shouldn't save.
291                         //More importantly, if we try to save, we will create a 0-byte file since we're
292                         //retrying to retrieve from a file we'll be overwriting.
293                         saveStream = false;
294                     }
295                 }
296                 if (saveStream) {
297                     OutputStream out = null;
298                     try {
299                         out = sr.put(key);
300                         mc.getMessage().writeTo(out);
301                     } finally {
302                         if (out != null) out.close();
303                     }
304                 }
305                 //Always save the header information
306                 or.put(key, mc);
307             } finally {
308                 if (!wasLocked) {
309                     // If it wasn't locked, we need to unlock now
310                     unlock(key);
311                     synchronized (this) {
312                         notify();
313                     }
314                 }
315             }
316 
317             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
318                 StringBuffer logBuffer =
319                     new StringBuffer(64)
320                             .append("Mail ")
321                             .append(key)
322                             .append(" stored.");
323                 getLogger().debug(logBuffer.toString());
324             }
325 
326         } catch (Exception e) {
327             getLogger().error("Exception storing mail: " + e, e);
328             throw new MessagingException("Exception caught while storing Message Container: " + e);
329         }
330     }
331 
332     /***
333      * Retrieves a message given a key. At the moment, keys can be obtained
334      * from list() in superinterface Store.Repository
335      *
336      * @param key the key of the message to retrieve
337      * @return the mail corresponding to this key, null if none exists
338      */
339     public Mail retrieve(String key) throws MessagingException {
340         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
341             getLogger().debug("Retrieving mail: " + key);
342         }
343         try {
344             Mail mc = null;
345             try {
346                 mc = (Mail) or.get(key);
347             } 
348             catch (RuntimeException re){
349                 StringBuffer exceptionBuffer = new StringBuffer(128);
350                 if(re.getCause() instanceof Error){
351                     exceptionBuffer.append("Error when retrieving mail, not deleting: ")
352                             .append(re.toString());
353                 }else{
354                     exceptionBuffer.append("Exception retrieving mail: ")
355                             .append(re.toString())
356                             .append(", so we're deleting it.");
357                     remove(key);
358                 }
359                 getLogger().warn(exceptionBuffer.toString());
360                 return null;
361             }
362             MimeMessageAvalonSource source = new MimeMessageAvalonSource(sr, destination, key);
363             mc.setMessage(new MimeMessageCopyOnWriteProxy(source));
364 
365             return mc;
366         } catch (Exception me) {
367             getLogger().error("Exception retrieving mail: " + me);
368             throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
369         }
370     }
371 
372     /***
373      * Removes a specified message
374      *
375      * @param mail the message to be removed from the repository
376      */
377     public void remove(Mail mail) throws MessagingException {
378         remove(mail.getName());
379     }
380 
381 
382     /***
383      * Removes a Collection of mails from the repository
384      * @param mails The Collection of <code>MailImpl</code>'s to delete
385      * @throws MessagingException
386      * @since 2.2.0
387      */
388     public void remove(Collection mails) throws MessagingException {
389         Iterator delList = mails.iterator();
390         while (delList.hasNext()) {
391             remove((Mail)delList.next());
392         }
393     }
394 
395     /***
396      * Removes a message identified by key.
397      *
398      * @param key the key of the message to be removed from the repository
399      */
400     public void remove(String key) throws MessagingException {
401         if (lock(key)) {
402             try {
403                 if (keys != null) keys.remove(key);
404                 sr.remove(key);
405                 or.remove(key);
406             } finally {
407                 unlock(key);
408             }
409         } else {
410             StringBuffer exceptionBuffer =
411                 new StringBuffer(64)
412                         .append("Cannot lock ")
413                         .append(key)
414                         .append(" to remove it");
415             throw new MessagingException(exceptionBuffer.toString());
416         }
417     }
418 
419     /***
420      * List string keys of messages in repository.
421      *
422      * @return an <code>Iterator</code> over the list of keys in the repository
423      *
424      */
425     public Iterator list() {
426         // Fix ConcurrentModificationException by cloning 
427         // the keyset before getting an iterator
428         final ArrayList clone;
429         if (keys != null) synchronized(keys) {
430             clone = new ArrayList(keys);
431         } else {
432             clone = new ArrayList();
433             for (Iterator i = or.list(); i.hasNext(); ) {
434                 clone.add(i.next());
435             }
436         }
437         if (fifo) Collections.sort(clone); // Keys is a HashSet; impose FIFO for apps that need it
438         return clone.iterator();
439     }
440 }