View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.mailrepository;
21  
22  import org.apache.avalon.cornerstone.services.store.ObjectRepository;
23  import org.apache.avalon.cornerstone.services.store.Store;
24  import org.apache.avalon.cornerstone.services.store.StreamRepository;
25  import org.apache.avalon.framework.activity.Initializable;
26  import org.apache.avalon.framework.configuration.Configurable;
27  import org.apache.avalon.framework.configuration.Configuration;
28  import org.apache.avalon.framework.configuration.ConfigurationException;
29  import org.apache.avalon.framework.configuration.DefaultConfiguration;
30  import org.apache.avalon.framework.logger.AbstractLogEnabled;
31  import org.apache.avalon.framework.service.ServiceException;
32  import org.apache.avalon.framework.service.ServiceManager;
33  import org.apache.avalon.framework.service.Serviceable;
34  import org.apache.james.core.MimeMessageCopyOnWriteProxy;
35  import org.apache.james.core.MimeMessageWrapper;
36  import org.apache.james.services.MailRepository;
37  import org.apache.james.util.Lock;
38  import org.apache.mailet.Mail;
39  
40  import javax.mail.MessagingException;
41  import javax.mail.internet.MimeMessage;
42  
43  import java.io.OutputStream;
44  import java.util.ArrayList;
45  import java.util.Collection;
46  import java.util.Collections;
47  import java.util.HashSet;
48  import java.util.Iterator;
49  import java.util.Set;
50  
51  /***
52   * Implementation of a MailRepository on a FileSystem.
53   *
54   * Requires a configuration element in the .conf.xml file of the form:
55   *  <repository destinationURL="file://path-to-root-dir-for-repository"
56   *              type="MAIL"
57   *              model="SYNCHRONOUS"/>
58   * Requires a logger called MailRepository.
59   *
60   * @version 1.0.0, 24/04/1999
61   */
62  public class AvalonMailRepository
63      extends AbstractLogEnabled
64      implements MailRepository, Configurable, Serviceable, Initializable {
65  
66      /***
67       * Whether 'deep debugging' is turned on.
68       */
69      protected final static boolean DEEP_DEBUG = false;
70  
71      private Lock lock;
72      private Store store;
73      private StreamRepository sr;
74      private ObjectRepository or;
75      private String destination;
76      private Set keys;
77      private boolean fifo;
78      private boolean cacheKeys; // experimental: for use with write mostly repositories such as spam and error
79  
80      /***
81       * @see org.apache.avalon.framework.service.Serviceable#compose(ServiceManager )
82       */
83      public void service( final ServiceManager componentManager )
84              throws ServiceException {
85          store = (Store)componentManager.lookup( Store.ROLE );
86      }
87  
88      /***
89       * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
90       */
91      public void configure(Configuration conf) throws ConfigurationException {
92          destination = conf.getAttribute("destinationURL");
93          if (getLogger().isDebugEnabled()) {
94              getLogger().debug("AvalonMailRepository.destinationURL: " + destination);
95          }
96          String checkType = conf.getAttribute("type");
97          if (! (checkType.equals("MAIL") || checkType.equals("SPOOL")) ) {
98              String exceptionString = "Attempt to configure AvalonMailRepository as " +
99                                       checkType;
100             if (getLogger().isWarnEnabled()) {
101                 getLogger().warn(exceptionString);
102             }
103             throw new ConfigurationException(exceptionString);
104         }
105         fifo = conf.getAttributeAsBoolean("FIFO", false);
106         cacheKeys = conf.getAttributeAsBoolean("CACHEKEYS", true);
107         // ignore model
108     }
109 
110     /***
111      * @see org.apache.avalon.framework.activity.Initializable#initialize()
112      */
113     public void initialize()
114             throws Exception {
115         try {
116             //prepare Configurations for object and stream repositories
117             DefaultConfiguration objectConfiguration
118                 = new DefaultConfiguration( "repository",
119                                             "generated:AvalonFileRepository.compose()" );
120 
121             objectConfiguration.setAttribute("destinationURL", destination);
122             objectConfiguration.setAttribute("type", "OBJECT");
123             objectConfiguration.setAttribute("model", "SYNCHRONOUS");
124 
125             DefaultConfiguration streamConfiguration
126                 = new DefaultConfiguration( "repository",
127                                             "generated:AvalonFileRepository.compose()" );
128 
129             streamConfiguration.setAttribute( "destinationURL", destination );
130             streamConfiguration.setAttribute( "type", "STREAM" );
131             streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
132 
133             sr = (StreamRepository) store.select(streamConfiguration);
134             or = (ObjectRepository) store.select(objectConfiguration);
135             lock = new Lock();
136             if (cacheKeys) keys = Collections.synchronizedSet(new HashSet());
137 
138 
139             //Finds non-matching pairs and deletes the extra files
140             HashSet streamKeys = new HashSet();
141             for (Iterator i = sr.list(); i.hasNext(); ) {
142                 streamKeys.add(i.next());
143             }
144             HashSet objectKeys = new HashSet();
145             for (Iterator i = or.list(); i.hasNext(); ) {
146                 objectKeys.add(i.next());
147             }
148 
149             Collection strandedStreams = (Collection)streamKeys.clone();
150             strandedStreams.removeAll(objectKeys);
151             for (Iterator i = strandedStreams.iterator(); i.hasNext(); ) {
152                 String key = (String)i.next();
153                 remove(key);
154             }
155 
156             Collection strandedObjects = (Collection)objectKeys.clone();
157             strandedObjects.removeAll(streamKeys);
158             for (Iterator i = strandedObjects.iterator(); i.hasNext(); ) {
159                 String key = (String)i.next();
160                 remove(key);
161             }
162 
163             if (keys != null) {
164                 // Next get a list from the object repository
165                 // and use that for the list of keys
166                 keys.clear();
167                 for (Iterator i = or.list(); i.hasNext(); ) {
168                     keys.add(i.next());
169                 }
170             }
171             if (getLogger().isDebugEnabled()) {
172                 StringBuffer logBuffer =
173                     new StringBuffer(128)
174                             .append(this.getClass().getName())
175                             .append(" created in ")
176                             .append(destination);
177                 getLogger().debug(logBuffer.toString());
178             }
179         } catch (Exception e) {
180             final String message = "Failed to retrieve Store component:" + e.getMessage();
181             getLogger().error( message, e );
182             throw e;
183         }
184     }
185 
186     /***
187      * Releases a lock on a message identified by a key
188      *
189      * @param key the key of the message to be unlocked
190      *
191      * @return true if successfully released the lock, false otherwise
192      */
193     public boolean unlock(String key) {
194         if (lock.unlock(key)) {
195             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
196                 StringBuffer debugBuffer =
197                     new StringBuffer(256)
198                             .append("Unlocked ")
199                             .append(key)
200                             .append(" for ")
201                             .append(Thread.currentThread().getName())
202                             .append(" @ ")
203                             .append(new java.util.Date(System.currentTimeMillis()));
204                 getLogger().debug(debugBuffer.toString());
205             }
206             return true;
207         } else {
208             return false;
209         }
210     }
211 
212     /***
213      * Obtains a lock on a message identified by a key
214      *
215      * @param key the key of the message to be locked
216      *
217      * @return true if successfully obtained the lock, false otherwise
218      */
219     public boolean lock(String key) {
220         if (lock.lock(key)) {
221             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
222                 StringBuffer debugBuffer =
223                     new StringBuffer(256)
224                             .append("Locked ")
225                             .append(key)
226                             .append(" for ")
227                             .append(Thread.currentThread().getName())
228                             .append(" @ ")
229                             .append(new java.util.Date(System.currentTimeMillis()));
230                 getLogger().debug(debugBuffer.toString());
231             }
232 //            synchronized (this) {
233 //                notifyAll();
234 //            }
235             return true;
236         } else {
237             return false;
238         }
239     }
240 
241     /***
242      * Stores a message in this repository. Shouldn't this return the key
243      * under which it is stored?
244      *
245      * @param mc the mail message to store
246      */
247     public void store(Mail mc) throws MessagingException {
248         try {
249             String key = mc.getName();
250             //Remember whether this key was locked
251             boolean wasLocked = true;
252             synchronized (this) {
253                 wasLocked = lock.isLocked(key);
254     
255                 if (!wasLocked) {
256                     //If it wasn't locked, we want a lock during the store
257                     lock(key);
258                 }
259             }
260             try {
261                 if (keys != null && !keys.contains(key)) {
262                     keys.add(key);
263                 }
264                 boolean saveStream = true;
265 
266                 MimeMessage message = mc.getMessage();
267                 // if the message is a Copy on Write proxy we check the wrapped message
268                 // to optimize the behaviour in case of MimeMessageWrapper
269                 if (message instanceof MimeMessageCopyOnWriteProxy) {
270                     MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) message;
271                     message = messageCow.getWrappedMessage();
272                 }
273                 if (message instanceof MimeMessageWrapper) {
274                     MimeMessageWrapper wrapper = (MimeMessageWrapper) message;
275                     if (DEEP_DEBUG) {
276                         System.out.println("Retrieving from: " + wrapper.getSourceId());
277                         StringBuffer debugBuffer =
278                             new StringBuffer(64)
279                                     .append("Saving to:       ")
280                                     .append(destination)
281                                     .append("/")
282                                     .append(mc.getName());
283                         System.out.println(debugBuffer.toString());
284                         System.out.println("Modified: " + wrapper.isModified());
285                     }
286                     StringBuffer destinationBuffer =
287                         new StringBuffer(128)
288                             .append(destination)
289                             .append("/")
290                             .append(mc.getName());
291                     if (destinationBuffer.toString().equals(wrapper.getSourceId()) && !wrapper.isModified()) {
292                         //We're trying to save to the same place, and it's not modified... we shouldn't save.
293                         //More importantly, if we try to save, we will create a 0-byte file since we're
294                         //retrying to retrieve from a file we'll be overwriting.
295                         saveStream = false;
296                     }
297                 }
298                 if (saveStream) {
299                     OutputStream out = null;
300                     try {
301                         out = sr.put(key);
302                         mc.getMessage().writeTo(out);
303                     } finally {
304                         if (out != null) out.close();
305                     }
306                 }
307                 //Always save the header information
308                 or.put(key, mc);
309             } finally {
310                 if (!wasLocked) {
311                     // If it wasn't locked, we need to unlock now
312                     unlock(key);
313                     synchronized (this) {
314                         notify();
315                     }
316                 }
317             }
318 
319             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
320                 StringBuffer logBuffer =
321                     new StringBuffer(64)
322                             .append("Mail ")
323                             .append(key)
324                             .append(" stored.");
325                 getLogger().debug(logBuffer.toString());
326             }
327 
328         } catch (Exception e) {
329             getLogger().error("Exception storing mail: " + e, e);
330             throw new MessagingException("Exception caught while storing Message Container: " + e);
331         }
332     }
333 
334     /***
335      * Retrieves a message given a key. At the moment, keys can be obtained
336      * from list() in superinterface Store.Repository
337      *
338      * @param key the key of the message to retrieve
339      * @return the mail corresponding to this key, null if none exists
340      */
341     public Mail retrieve(String key) throws MessagingException {
342         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
343             getLogger().debug("Retrieving mail: " + key);
344         }
345         try {
346             Mail mc = null;
347             try {
348                 mc = (Mail) or.get(key);
349             } 
350             catch (RuntimeException re){
351                 StringBuffer exceptionBuffer = new StringBuffer(128);
352                 if(re.getCause() instanceof Error){
353                     exceptionBuffer.append("Error when retrieving mail, not deleting: ")
354                             .append(re.toString());
355                 }else{
356                     exceptionBuffer.append("Exception retrieving mail: ")
357                             .append(re.toString())
358                             .append(", so we're deleting it.");
359                     remove(key);
360                 }
361                 getLogger().warn(exceptionBuffer.toString());
362                 return null;
363             }
364             MimeMessageAvalonSource source = new MimeMessageAvalonSource(sr, destination, key);
365             mc.setMessage(new MimeMessageCopyOnWriteProxy(source));
366 
367             return mc;
368         } catch (Exception me) {
369             getLogger().error("Exception retrieving mail: " + me);
370             throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
371         }
372     }
373 
374     /***
375      * Removes a specified message
376      *
377      * @param mail the message to be removed from the repository
378      */
379     public void remove(Mail mail) throws MessagingException {
380         remove(mail.getName());
381     }
382 
383 
384     /***
385      * Removes a Collection of mails from the repository
386      * @param mails The Collection of <code>MailImpl</code>'s to delete
387      * @throws MessagingException
388      * @since 2.2.0
389      */
390     public void remove(Collection mails) throws MessagingException {
391         Iterator delList = mails.iterator();
392         while (delList.hasNext()) {
393             remove((Mail)delList.next());
394         }
395     }
396 
397     /***
398      * Removes a message identified by key.
399      *
400      * @param key the key of the message to be removed from the repository
401      */
402     public void remove(String key) throws MessagingException {
403         if (lock(key)) {
404             try {
405                 if (keys != null) keys.remove(key);
406                 sr.remove(key);
407                 or.remove(key);
408             } finally {
409                 unlock(key);
410             }
411         } else {
412             StringBuffer exceptionBuffer =
413                 new StringBuffer(64)
414                         .append("Cannot lock ")
415                         .append(key)
416                         .append(" to remove it");
417             throw new MessagingException(exceptionBuffer.toString());
418         }
419     }
420 
421     /***
422      * List string keys of messages in repository.
423      *
424      * @return an <code>Iterator</code> over the list of keys in the repository
425      *
426      */
427     public Iterator list() {
428         // Fix ConcurrentModificationException by cloning 
429         // the keyset before getting an iterator
430         final ArrayList clone;
431         if (keys != null) synchronized(keys) {
432             clone = new ArrayList(keys);
433         } else {
434             clone = new ArrayList();
435             for (Iterator i = or.list(); i.hasNext(); ) {
436                 clone.add(i.next());
437             }
438         }
439         if (fifo) Collections.sort(clone); // Keys is a HashSet; impose FIFO for apps that need it
440         return clone.iterator();
441     }
442 }