View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  
22  package org.apache.james.mailrepository;
23  
24  import org.apache.avalon.cornerstone.services.store.ObjectRepository;
25  import org.apache.avalon.cornerstone.services.store.Store;
26  import org.apache.avalon.cornerstone.services.store.StreamRepository;
27  import org.apache.avalon.framework.configuration.Configuration;
28  import org.apache.avalon.framework.configuration.ConfigurationException;
29  import org.apache.avalon.framework.configuration.DefaultConfiguration;
30  import org.apache.avalon.framework.service.ServiceException;
31  import org.apache.james.core.MimeMessageCopyOnWriteProxy;
32  import org.apache.james.core.MimeMessageWrapper;
33  import org.apache.mailet.Mail;
34  
35  import javax.mail.MessagingException;
36  import javax.mail.internet.MimeMessage;
37  
38  import java.io.IOException;
39  import java.io.OutputStream;
40  import java.util.ArrayList;
41  import java.util.Collection;
42  import java.util.Collections;
43  import java.util.HashSet;
44  import java.util.Iterator;
45  import java.util.Set;
46  
47  /**
48   * Implementation of a MailRepository on a FileSystem.
49   *
50   * Requires a configuration element in the .conf.xml file of the form:
51   *  <repository destinationURL="file://path-to-root-dir-for-repository"
52   *              type="MAIL"
53   *              model="SYNCHRONOUS"/>
54   * Requires a logger called MailRepository.
55   *
56   * @version 1.0.0, 24/04/1999
57   */
58  public class AvalonMailRepository
59      extends AbstractMailRepository {
60  
61      private StreamRepository streamRepository;
62      private ObjectRepository objectRepository;
63      private String destination;
64      private Set keys;
65      private boolean fifo;
66      private boolean cacheKeys; // experimental: for use with write mostly repositories such as spam and error
67  
68      /**
69       * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
70       */
71      public void configure(Configuration conf) throws ConfigurationException {
72          destination = conf.getAttribute("destinationURL");
73          if (getLogger().isDebugEnabled()) {
74              getLogger().debug("AvalonMailRepository.destinationURL: " + destination);
75          }
76          String checkType = conf.getAttribute("type");
77          if (! (checkType.equals("MAIL") || checkType.equals("SPOOL")) ) {
78              String exceptionString = "Attempt to configure AvalonMailRepository as " +
79                                       checkType;
80              if (getLogger().isWarnEnabled()) {
81                  getLogger().warn(exceptionString);
82              }
83              throw new ConfigurationException(exceptionString);
84          }
85          fifo = conf.getAttributeAsBoolean("FIFO", false);
86          cacheKeys = conf.getAttributeAsBoolean("CACHEKEYS", true);
87          // ignore model
88      }
89  
90      /**
91       * @see org.apache.avalon.framework.activity.Initializable#initialize()
92       */
93      public void initialize()
94              throws Exception {
95          super.initialize();
96          try {
97              objectRepository = (ObjectRepository) selectRepository(store, "OBJECT");
98              streamRepository = (StreamRepository) selectRepository(store, "STREAM");
99  
100             if (cacheKeys) keys = Collections.synchronizedSet(new HashSet());
101 
102             //Finds non-matching pairs and deletes the extra files
103             HashSet streamKeys = new HashSet();
104             for (Iterator i = streamRepository.list(); i.hasNext(); ) {
105                 streamKeys.add(i.next());
106             }
107             HashSet objectKeys = new HashSet();
108             for (Iterator i = objectRepository.list(); i.hasNext(); ) {
109                 objectKeys.add(i.next());
110             }
111 
112             Collection strandedStreams = (Collection)streamKeys.clone();
113             strandedStreams.removeAll(objectKeys);
114             for (Iterator i = strandedStreams.iterator(); i.hasNext(); ) {
115                 String key = (String)i.next();
116                 remove(key);
117             }
118 
119             Collection strandedObjects = (Collection)objectKeys.clone();
120             strandedObjects.removeAll(streamKeys);
121             for (Iterator i = strandedObjects.iterator(); i.hasNext(); ) {
122                 String key = (String)i.next();
123                 remove(key);
124             }
125 
126             if (keys != null) {
127                 // Next get a list from the object repository
128                 // and use that for the list of keys
129                 keys.clear();
130                 for (Iterator i = objectRepository.list(); i.hasNext(); ) {
131                     keys.add(i.next());
132                 }
133             }
134             if (getLogger().isDebugEnabled()) {
135                 StringBuffer logBuffer =
136                     new StringBuffer(128)
137                             .append(getClass().getName())
138                             .append(" created in ")
139                             .append(destination);
140                 getLogger().debug(logBuffer.toString());
141             }
142         } catch (Exception e) {
143             final String message = "Failed to retrieve Store component:" + e.getMessage();
144             getLogger().error( message, e );
145             throw e;
146         }
147     }
148 
149     private Object selectRepository(Store store, String type) throws ServiceException {
150         DefaultConfiguration objectConfiguration
151             = new DefaultConfiguration( "repository",
152                                         "generated:AvalonFileRepository.compose()" );
153 
154         objectConfiguration.setAttribute("destinationURL", destination);
155         objectConfiguration.setAttribute("type", type);
156         objectConfiguration.setAttribute("model", "SYNCHRONOUS");
157         return store.select(objectConfiguration);
158     }
159 
160     /**
161      * @see org.apache.james.mailrepository.AbstractMailRepository#internalStore(Mail)
162      */
163     protected void internalStore(Mail mc) throws MessagingException, IOException {
164         String key = mc.getName();
165         if (keys != null && !keys.contains(key)) {
166             keys.add(key);
167         }
168         boolean saveStream = true;
169 
170         MimeMessage message = mc.getMessage();
171         // if the message is a Copy on Write proxy we check the wrapped message
172         // to optimize the behaviour in case of MimeMessageWrapper
173         if (message instanceof MimeMessageCopyOnWriteProxy) {
174             MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) message;
175             message = messageCow.getWrappedMessage();
176         }
177         if (message instanceof MimeMessageWrapper) {
178             MimeMessageWrapper wrapper = (MimeMessageWrapper) message;
179             if (DEEP_DEBUG) {
180                 System.out.println("Retrieving from: " + wrapper.getSourceId());
181                 StringBuffer debugBuffer =
182                     new StringBuffer(64)
183                             .append("Saving to:       ")
184                             .append(destination)
185                             .append("/")
186                             .append(mc.getName());
187                 System.out.println(debugBuffer.toString());
188                 System.out.println("Modified: " + wrapper.isModified());
189             }
190             StringBuffer destinationBuffer =
191                 new StringBuffer(128)
192                     .append(destination)
193                     .append("/")
194                     .append(mc.getName());
195             if (destinationBuffer.toString().equals(wrapper.getSourceId()) && !wrapper.isModified()) {
196                 //We're trying to save to the same place, and it's not modified... we shouldn't save.
197                 //More importantly, if we try to save, we will create a 0-byte file since we're
198                 //retrying to retrieve from a file we'll be overwriting.
199                 saveStream = false;
200             }
201         }
202         if (saveStream) {
203             OutputStream out = null;
204             try {
205                 out = streamRepository.put(key);
206                 mc.getMessage().writeTo(out);
207             } finally {
208                 if (out != null) out.close();
209             }
210         }
211         //Always save the header information
212         objectRepository.put(key, mc);
213     }
214 
215     /**
216      * @see org.apache.james.services.MailRepository#retrieve(String)
217      */
218     public Mail retrieve(String key) throws MessagingException {
219         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
220             getLogger().debug("Retrieving mail: " + key);
221         }
222         try {
223             Mail mc = null;
224             try {
225                 mc = (Mail) objectRepository.get(key);
226             } 
227             catch (RuntimeException re){
228                 StringBuffer exceptionBuffer = new StringBuffer(128);
229                 if(re.getCause() instanceof Error){
230                     exceptionBuffer.append("Error when retrieving mail, not deleting: ")
231                             .append(re.toString());
232                 }else{
233                     exceptionBuffer.append("Exception retrieving mail: ")
234                             .append(re.toString())
235                             .append(", so we're deleting it.");
236                     remove(key);
237                 }
238                 final String errorMessage = exceptionBuffer.toString();
239                 getLogger().warn(errorMessage);
240                 getLogger().debug(errorMessage, re);
241                 return null;
242             }
243             MimeMessageAvalonSource source = new MimeMessageAvalonSource(streamRepository, destination, key);
244             mc.setMessage(new MimeMessageCopyOnWriteProxy(source));
245 
246             return mc;
247         } catch (Exception me) {
248             getLogger().error("Exception retrieving mail: " + me);
249             throw new MessagingException("Exception while retrieving mail: " + me.getMessage(), me);
250         }
251     }
252 
253 
254     /**
255      * @see org.apache.james.mailrepository.AbstractMailRepository#internalRemove(String)
256      */
257     protected void internalRemove(String key) throws MessagingException {
258         if (keys != null) keys.remove(key);
259         streamRepository.remove(key);
260         objectRepository.remove(key);
261     }
262 
263 
264     /**
265      * @see org.apache.james.services.MailRepository#list()
266      */
267     public Iterator list() {
268         // Fix ConcurrentModificationException by cloning 
269         // the keyset before getting an iterator
270         final ArrayList clone;
271         if (keys != null) synchronized(keys) {
272             clone = new ArrayList(keys);
273         } else {
274             clone = new ArrayList();
275             for (Iterator i = objectRepository.list(); i.hasNext(); ) {
276                 clone.add(i.next());
277             }
278         }
279         if (fifo) Collections.sort(clone); // Keys is a HashSet; impose FIFO for apps that need it
280         return clone.iterator();
281     }
282 }