1 /************************************************************************
2 * Copyright (c) 1999-2006 The Apache Software Foundation. *
3 * All rights reserved. *
4 * ------------------------------------------------------------------- *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you *
6 * may not use this file except in compliance with the License. You *
7 * may obtain a copy of the License at: *
8 * *
9 * http://www.apache.org/licenses/LICENSE-2.0 *
10 * *
11 * Unless required by applicable law or agreed to in writing, software *
12 * distributed under the License is distributed on an "AS IS" BASIS, *
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14 * implied. See the License for the specific language governing *
15 * permissions and limitations under the License. *
16 ***********************************************************************/
17
18 package org.apache.james.mailrepository;
19
20 import org.apache.avalon.cornerstone.services.store.ObjectRepository;
21 import org.apache.avalon.cornerstone.services.store.Store;
22 import org.apache.avalon.cornerstone.services.store.StreamRepository;
23 import org.apache.avalon.framework.activity.Initializable;
24 import org.apache.avalon.framework.configuration.Configurable;
25 import org.apache.avalon.framework.configuration.Configuration;
26 import org.apache.avalon.framework.configuration.ConfigurationException;
27 import org.apache.avalon.framework.configuration.DefaultConfiguration;
28 import org.apache.avalon.framework.logger.AbstractLogEnabled;
29 import org.apache.avalon.framework.service.ServiceException;
30 import org.apache.avalon.framework.service.ServiceManager;
31 import org.apache.avalon.framework.service.Serviceable;
32 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
33 import org.apache.james.core.MimeMessageWrapper;
34 import org.apache.james.services.MailRepository;
35 import org.apache.james.util.Lock;
36 import org.apache.mailet.Mail;
37
38 import javax.mail.MessagingException;
39 import javax.mail.internet.MimeMessage;
40
41 import java.io.OutputStream;
42 import java.util.ArrayList;
43 import java.util.Collection;
44 import java.util.Collections;
45 import java.util.HashSet;
46 import java.util.Iterator;
47 import java.util.Set;
48
49 /***
50 * Implementation of a MailRepository on a FileSystem.
51 *
52 * Requires a configuration element in the .conf.xml file of the form:
53 * <repository destinationURL="file://path-to-root-dir-for-repository"
54 * type="MAIL"
55 * model="SYNCHRONOUS"/>
56 * Requires a logger called MailRepository.
57 *
58 * @version 1.0.0, 24/04/1999
59 */
60 public class AvalonMailRepository
61 extends AbstractLogEnabled
62 implements MailRepository, Configurable, Serviceable, Initializable {
63
64 /***
65 * Whether 'deep debugging' is turned on.
66 */
67 protected final static boolean DEEP_DEBUG = false;
68
69 private Lock lock;
70 private Store store;
71 private StreamRepository sr;
72 private ObjectRepository or;
73 private String destination;
74 private Set keys;
75 private boolean fifo;
76 private boolean cacheKeys;
77
78 /***
79 * @see org.apache.avalon.framework.service.Serviceable#compose(ServiceManager )
80 */
81 public void service( final ServiceManager componentManager )
82 throws ServiceException {
83 store = (Store)componentManager.lookup( Store.ROLE );
84 }
85
86 /***
87 * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
88 */
89 public void configure(Configuration conf) throws ConfigurationException {
90 destination = conf.getAttribute("destinationURL");
91 if (getLogger().isDebugEnabled()) {
92 getLogger().debug("AvalonMailRepository.destinationURL: " + destination);
93 }
94 String checkType = conf.getAttribute("type");
95 if (! (checkType.equals("MAIL") || checkType.equals("SPOOL")) ) {
96 String exceptionString = "Attempt to configure AvalonMailRepository as " +
97 checkType;
98 if (getLogger().isWarnEnabled()) {
99 getLogger().warn(exceptionString);
100 }
101 throw new ConfigurationException(exceptionString);
102 }
103 fifo = conf.getAttributeAsBoolean("FIFO", false);
104 cacheKeys = conf.getAttributeAsBoolean("CACHEKEYS", true);
105
106 }
107
108 /***
109 * @see org.apache.avalon.framework.activity.Initializable#initialize()
110 */
111 public void initialize()
112 throws Exception {
113 try {
114
115 DefaultConfiguration objectConfiguration
116 = new DefaultConfiguration( "repository",
117 "generated:AvalonFileRepository.compose()" );
118
119 objectConfiguration.setAttribute("destinationURL", destination);
120 objectConfiguration.setAttribute("type", "OBJECT");
121 objectConfiguration.setAttribute("model", "SYNCHRONOUS");
122
123 DefaultConfiguration streamConfiguration
124 = new DefaultConfiguration( "repository",
125 "generated:AvalonFileRepository.compose()" );
126
127 streamConfiguration.setAttribute( "destinationURL", destination );
128 streamConfiguration.setAttribute( "type", "STREAM" );
129 streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
130
131 sr = (StreamRepository) store.select(streamConfiguration);
132 or = (ObjectRepository) store.select(objectConfiguration);
133 lock = new Lock();
134 if (cacheKeys) keys = Collections.synchronizedSet(new HashSet());
135
136
137
138 HashSet streamKeys = new HashSet();
139 for (Iterator i = sr.list(); i.hasNext(); ) {
140 streamKeys.add(i.next());
141 }
142 HashSet objectKeys = new HashSet();
143 for (Iterator i = or.list(); i.hasNext(); ) {
144 objectKeys.add(i.next());
145 }
146
147 Collection strandedStreams = (Collection)streamKeys.clone();
148 strandedStreams.removeAll(objectKeys);
149 for (Iterator i = strandedStreams.iterator(); i.hasNext(); ) {
150 String key = (String)i.next();
151 remove(key);
152 }
153
154 Collection strandedObjects = (Collection)objectKeys.clone();
155 strandedObjects.removeAll(streamKeys);
156 for (Iterator i = strandedObjects.iterator(); i.hasNext(); ) {
157 String key = (String)i.next();
158 remove(key);
159 }
160
161 if (keys != null) {
162
163
164 keys.clear();
165 for (Iterator i = or.list(); i.hasNext(); ) {
166 keys.add(i.next());
167 }
168 }
169 if (getLogger().isDebugEnabled()) {
170 StringBuffer logBuffer =
171 new StringBuffer(128)
172 .append(this.getClass().getName())
173 .append(" created in ")
174 .append(destination);
175 getLogger().debug(logBuffer.toString());
176 }
177 } catch (Exception e) {
178 final String message = "Failed to retrieve Store component:" + e.getMessage();
179 getLogger().error( message, e );
180 throw e;
181 }
182 }
183
184 /***
185 * Releases a lock on a message identified by a key
186 *
187 * @param key the key of the message to be unlocked
188 *
189 * @return true if successfully released the lock, false otherwise
190 */
191 public boolean unlock(String key) {
192 if (lock.unlock(key)) {
193 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
194 StringBuffer debugBuffer =
195 new StringBuffer(256)
196 .append("Unlocked ")
197 .append(key)
198 .append(" for ")
199 .append(Thread.currentThread().getName())
200 .append(" @ ")
201 .append(new java.util.Date(System.currentTimeMillis()));
202 getLogger().debug(debugBuffer.toString());
203 }
204 return true;
205 } else {
206 return false;
207 }
208 }
209
210 /***
211 * Obtains a lock on a message identified by a key
212 *
213 * @param key the key of the message to be locked
214 *
215 * @return true if successfully obtained the lock, false otherwise
216 */
217 public boolean lock(String key) {
218 if (lock.lock(key)) {
219 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
220 StringBuffer debugBuffer =
221 new StringBuffer(256)
222 .append("Locked ")
223 .append(key)
224 .append(" for ")
225 .append(Thread.currentThread().getName())
226 .append(" @ ")
227 .append(new java.util.Date(System.currentTimeMillis()));
228 getLogger().debug(debugBuffer.toString());
229 }
230
231
232
233 return true;
234 } else {
235 return false;
236 }
237 }
238
239 /***
240 * Stores a message in this repository. Shouldn't this return the key
241 * under which it is stored?
242 *
243 * @param mc the mail message to store
244 */
245 public void store(Mail mc) throws MessagingException {
246 try {
247 String key = mc.getName();
248
249 boolean wasLocked = true;
250 synchronized (this) {
251 wasLocked = lock.isLocked(key);
252
253 if (!wasLocked) {
254
255 lock(key);
256 }
257 }
258 try {
259 if (keys != null && !keys.contains(key)) {
260 keys.add(key);
261 }
262 boolean saveStream = true;
263
264 MimeMessage message = mc.getMessage();
265
266
267 if (message instanceof MimeMessageCopyOnWriteProxy) {
268 MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) message;
269 message = messageCow.getWrappedMessage();
270 }
271 if (message instanceof MimeMessageWrapper) {
272 MimeMessageWrapper wrapper = (MimeMessageWrapper) message;
273 if (DEEP_DEBUG) {
274 System.out.println("Retrieving from: " + wrapper.getSourceId());
275 StringBuffer debugBuffer =
276 new StringBuffer(64)
277 .append("Saving to: ")
278 .append(destination)
279 .append("/")
280 .append(mc.getName());
281 System.out.println(debugBuffer.toString());
282 System.out.println("Modified: " + wrapper.isModified());
283 }
284 StringBuffer destinationBuffer =
285 new StringBuffer(128)
286 .append(destination)
287 .append("/")
288 .append(mc.getName());
289 if (destinationBuffer.toString().equals(wrapper.getSourceId()) && !wrapper.isModified()) {
290
291
292
293 saveStream = false;
294 }
295 }
296 if (saveStream) {
297 OutputStream out = null;
298 try {
299 out = sr.put(key);
300 mc.getMessage().writeTo(out);
301 } finally {
302 if (out != null) out.close();
303 }
304 }
305
306 or.put(key, mc);
307 } finally {
308 if (!wasLocked) {
309
310 unlock(key);
311 synchronized (this) {
312 notify();
313 }
314 }
315 }
316
317 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
318 StringBuffer logBuffer =
319 new StringBuffer(64)
320 .append("Mail ")
321 .append(key)
322 .append(" stored.");
323 getLogger().debug(logBuffer.toString());
324 }
325
326 } catch (Exception e) {
327 getLogger().error("Exception storing mail: " + e, e);
328 throw new MessagingException("Exception caught while storing Message Container: " + e);
329 }
330 }
331
332 /***
333 * Retrieves a message given a key. At the moment, keys can be obtained
334 * from list() in superinterface Store.Repository
335 *
336 * @param key the key of the message to retrieve
337 * @return the mail corresponding to this key, null if none exists
338 */
339 public Mail retrieve(String key) throws MessagingException {
340 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
341 getLogger().debug("Retrieving mail: " + key);
342 }
343 try {
344 Mail mc = null;
345 try {
346 mc = (Mail) or.get(key);
347 }
348 catch (RuntimeException re){
349 StringBuffer exceptionBuffer = new StringBuffer(128);
350 if(re.getCause() instanceof Error){
351 exceptionBuffer.append("Error when retrieving mail, not deleting: ")
352 .append(re.toString());
353 }else{
354 exceptionBuffer.append("Exception retrieving mail: ")
355 .append(re.toString())
356 .append(", so we're deleting it.");
357 remove(key);
358 }
359 getLogger().warn(exceptionBuffer.toString());
360 return null;
361 }
362 MimeMessageAvalonSource source = new MimeMessageAvalonSource(sr, destination, key);
363 mc.setMessage(new MimeMessageCopyOnWriteProxy(source));
364
365 return mc;
366 } catch (Exception me) {
367 getLogger().error("Exception retrieving mail: " + me);
368 throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
369 }
370 }
371
372 /***
373 * Removes a specified message
374 *
375 * @param mail the message to be removed from the repository
376 */
377 public void remove(Mail mail) throws MessagingException {
378 remove(mail.getName());
379 }
380
381
382 /***
383 * Removes a Collection of mails from the repository
384 * @param mails The Collection of <code>MailImpl</code>'s to delete
385 * @throws MessagingException
386 * @since 2.2.0
387 */
388 public void remove(Collection mails) throws MessagingException {
389 Iterator delList = mails.iterator();
390 while (delList.hasNext()) {
391 remove((Mail)delList.next());
392 }
393 }
394
395 /***
396 * Removes a message identified by key.
397 *
398 * @param key the key of the message to be removed from the repository
399 */
400 public void remove(String key) throws MessagingException {
401 if (lock(key)) {
402 try {
403 if (keys != null) keys.remove(key);
404 sr.remove(key);
405 or.remove(key);
406 } finally {
407 unlock(key);
408 }
409 } else {
410 StringBuffer exceptionBuffer =
411 new StringBuffer(64)
412 .append("Cannot lock ")
413 .append(key)
414 .append(" to remove it");
415 throw new MessagingException(exceptionBuffer.toString());
416 }
417 }
418
419 /***
420 * List string keys of messages in repository.
421 *
422 * @return an <code>Iterator</code> over the list of keys in the repository
423 *
424 */
425 public Iterator list() {
426
427
428 final ArrayList clone;
429 if (keys != null) synchronized(keys) {
430 clone = new ArrayList(keys);
431 } else {
432 clone = new ArrayList();
433 for (Iterator i = or.list(); i.hasNext(); ) {
434 clone.add(i.next());
435 }
436 }
437 if (fifo) Collections.sort(clone);
438 return clone.iterator();
439 }
440 }