1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.mailrepository;
21
22 import org.apache.avalon.cornerstone.services.store.ObjectRepository;
23 import org.apache.avalon.cornerstone.services.store.Store;
24 import org.apache.avalon.cornerstone.services.store.StreamRepository;
25 import org.apache.avalon.framework.activity.Initializable;
26 import org.apache.avalon.framework.configuration.Configurable;
27 import org.apache.avalon.framework.configuration.Configuration;
28 import org.apache.avalon.framework.configuration.ConfigurationException;
29 import org.apache.avalon.framework.configuration.DefaultConfiguration;
30 import org.apache.avalon.framework.logger.AbstractLogEnabled;
31 import org.apache.avalon.framework.service.ServiceException;
32 import org.apache.avalon.framework.service.ServiceManager;
33 import org.apache.avalon.framework.service.Serviceable;
34 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
35 import org.apache.james.core.MimeMessageWrapper;
36 import org.apache.james.services.MailRepository;
37 import org.apache.james.util.Lock;
38 import org.apache.mailet.Mail;
39
40 import javax.mail.MessagingException;
41 import javax.mail.internet.MimeMessage;
42
43 import java.io.OutputStream;
44 import java.util.ArrayList;
45 import java.util.Collection;
46 import java.util.Collections;
47 import java.util.HashSet;
48 import java.util.Iterator;
49 import java.util.Set;
50
51 /***
52 * Implementation of a MailRepository on a FileSystem.
53 *
54 * Requires a configuration element in the .conf.xml file of the form:
55 * <repository destinationURL="file://path-to-root-dir-for-repository"
56 * type="MAIL"
57 * model="SYNCHRONOUS"/>
58 * Requires a logger called MailRepository.
59 *
60 * @version 1.0.0, 24/04/1999
61 */
62 public class AvalonMailRepository
63 extends AbstractLogEnabled
64 implements MailRepository, Configurable, Serviceable, Initializable {
65
66 /***
67 * Whether 'deep debugging' is turned on.
68 */
69 protected final static boolean DEEP_DEBUG = false;
70
71 private Lock lock;
72 private Store store;
73 private StreamRepository sr;
74 private ObjectRepository or;
75 private String destination;
76 private Set keys;
77 private boolean fifo;
78 private boolean cacheKeys;
79
80 /***
81 * @see org.apache.avalon.framework.service.Serviceable#compose(ServiceManager )
82 */
83 public void service( final ServiceManager componentManager )
84 throws ServiceException {
85 store = (Store)componentManager.lookup( Store.ROLE );
86 }
87
88 /***
89 * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
90 */
91 public void configure(Configuration conf) throws ConfigurationException {
92 destination = conf.getAttribute("destinationURL");
93 if (getLogger().isDebugEnabled()) {
94 getLogger().debug("AvalonMailRepository.destinationURL: " + destination);
95 }
96 String checkType = conf.getAttribute("type");
97 if (! (checkType.equals("MAIL") || checkType.equals("SPOOL")) ) {
98 String exceptionString = "Attempt to configure AvalonMailRepository as " +
99 checkType;
100 if (getLogger().isWarnEnabled()) {
101 getLogger().warn(exceptionString);
102 }
103 throw new ConfigurationException(exceptionString);
104 }
105 fifo = conf.getAttributeAsBoolean("FIFO", false);
106 cacheKeys = conf.getAttributeAsBoolean("CACHEKEYS", true);
107
108 }
109
110 /***
111 * @see org.apache.avalon.framework.activity.Initializable#initialize()
112 */
113 public void initialize()
114 throws Exception {
115 try {
116
117 DefaultConfiguration objectConfiguration
118 = new DefaultConfiguration( "repository",
119 "generated:AvalonFileRepository.compose()" );
120
121 objectConfiguration.setAttribute("destinationURL", destination);
122 objectConfiguration.setAttribute("type", "OBJECT");
123 objectConfiguration.setAttribute("model", "SYNCHRONOUS");
124
125 DefaultConfiguration streamConfiguration
126 = new DefaultConfiguration( "repository",
127 "generated:AvalonFileRepository.compose()" );
128
129 streamConfiguration.setAttribute( "destinationURL", destination );
130 streamConfiguration.setAttribute( "type", "STREAM" );
131 streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
132
133 sr = (StreamRepository) store.select(streamConfiguration);
134 or = (ObjectRepository) store.select(objectConfiguration);
135 lock = new Lock();
136 if (cacheKeys) keys = Collections.synchronizedSet(new HashSet());
137
138
139
140 HashSet streamKeys = new HashSet();
141 for (Iterator i = sr.list(); i.hasNext(); ) {
142 streamKeys.add(i.next());
143 }
144 HashSet objectKeys = new HashSet();
145 for (Iterator i = or.list(); i.hasNext(); ) {
146 objectKeys.add(i.next());
147 }
148
149 Collection strandedStreams = (Collection)streamKeys.clone();
150 strandedStreams.removeAll(objectKeys);
151 for (Iterator i = strandedStreams.iterator(); i.hasNext(); ) {
152 String key = (String)i.next();
153 remove(key);
154 }
155
156 Collection strandedObjects = (Collection)objectKeys.clone();
157 strandedObjects.removeAll(streamKeys);
158 for (Iterator i = strandedObjects.iterator(); i.hasNext(); ) {
159 String key = (String)i.next();
160 remove(key);
161 }
162
163 if (keys != null) {
164
165
166 keys.clear();
167 for (Iterator i = or.list(); i.hasNext(); ) {
168 keys.add(i.next());
169 }
170 }
171 if (getLogger().isDebugEnabled()) {
172 StringBuffer logBuffer =
173 new StringBuffer(128)
174 .append(this.getClass().getName())
175 .append(" created in ")
176 .append(destination);
177 getLogger().debug(logBuffer.toString());
178 }
179 } catch (Exception e) {
180 final String message = "Failed to retrieve Store component:" + e.getMessage();
181 getLogger().error( message, e );
182 throw e;
183 }
184 }
185
186 /***
187 * Releases a lock on a message identified by a key
188 *
189 * @param key the key of the message to be unlocked
190 *
191 * @return true if successfully released the lock, false otherwise
192 */
193 public boolean unlock(String key) {
194 if (lock.unlock(key)) {
195 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
196 StringBuffer debugBuffer =
197 new StringBuffer(256)
198 .append("Unlocked ")
199 .append(key)
200 .append(" for ")
201 .append(Thread.currentThread().getName())
202 .append(" @ ")
203 .append(new java.util.Date(System.currentTimeMillis()));
204 getLogger().debug(debugBuffer.toString());
205 }
206 return true;
207 } else {
208 return false;
209 }
210 }
211
212 /***
213 * Obtains a lock on a message identified by a key
214 *
215 * @param key the key of the message to be locked
216 *
217 * @return true if successfully obtained the lock, false otherwise
218 */
219 public boolean lock(String key) {
220 if (lock.lock(key)) {
221 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
222 StringBuffer debugBuffer =
223 new StringBuffer(256)
224 .append("Locked ")
225 .append(key)
226 .append(" for ")
227 .append(Thread.currentThread().getName())
228 .append(" @ ")
229 .append(new java.util.Date(System.currentTimeMillis()));
230 getLogger().debug(debugBuffer.toString());
231 }
232
233
234
235 return true;
236 } else {
237 return false;
238 }
239 }
240
241 /***
242 * Stores a message in this repository. Shouldn't this return the key
243 * under which it is stored?
244 *
245 * @param mc the mail message to store
246 */
247 public void store(Mail mc) throws MessagingException {
248 try {
249 String key = mc.getName();
250
251 boolean wasLocked = true;
252 synchronized (this) {
253 wasLocked = lock.isLocked(key);
254
255 if (!wasLocked) {
256
257 lock(key);
258 }
259 }
260 try {
261 if (keys != null && !keys.contains(key)) {
262 keys.add(key);
263 }
264 boolean saveStream = true;
265
266 MimeMessage message = mc.getMessage();
267
268
269 if (message instanceof MimeMessageCopyOnWriteProxy) {
270 MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) message;
271 message = messageCow.getWrappedMessage();
272 }
273 if (message instanceof MimeMessageWrapper) {
274 MimeMessageWrapper wrapper = (MimeMessageWrapper) message;
275 if (DEEP_DEBUG) {
276 System.out.println("Retrieving from: " + wrapper.getSourceId());
277 StringBuffer debugBuffer =
278 new StringBuffer(64)
279 .append("Saving to: ")
280 .append(destination)
281 .append("/")
282 .append(mc.getName());
283 System.out.println(debugBuffer.toString());
284 System.out.println("Modified: " + wrapper.isModified());
285 }
286 StringBuffer destinationBuffer =
287 new StringBuffer(128)
288 .append(destination)
289 .append("/")
290 .append(mc.getName());
291 if (destinationBuffer.toString().equals(wrapper.getSourceId()) && !wrapper.isModified()) {
292
293
294
295 saveStream = false;
296 }
297 }
298 if (saveStream) {
299 OutputStream out = null;
300 try {
301 out = sr.put(key);
302 mc.getMessage().writeTo(out);
303 } finally {
304 if (out != null) out.close();
305 }
306 }
307
308 or.put(key, mc);
309 } finally {
310 if (!wasLocked) {
311
312 unlock(key);
313 synchronized (this) {
314 notify();
315 }
316 }
317 }
318
319 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
320 StringBuffer logBuffer =
321 new StringBuffer(64)
322 .append("Mail ")
323 .append(key)
324 .append(" stored.");
325 getLogger().debug(logBuffer.toString());
326 }
327
328 } catch (Exception e) {
329 getLogger().error("Exception storing mail: " + e, e);
330 throw new MessagingException("Exception caught while storing Message Container: " + e);
331 }
332 }
333
334 /***
335 * Retrieves a message given a key. At the moment, keys can be obtained
336 * from list() in superinterface Store.Repository
337 *
338 * @param key the key of the message to retrieve
339 * @return the mail corresponding to this key, null if none exists
340 */
341 public Mail retrieve(String key) throws MessagingException {
342 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
343 getLogger().debug("Retrieving mail: " + key);
344 }
345 try {
346 Mail mc = null;
347 try {
348 mc = (Mail) or.get(key);
349 }
350 catch (RuntimeException re){
351 StringBuffer exceptionBuffer = new StringBuffer(128);
352 if(re.getCause() instanceof Error){
353 exceptionBuffer.append("Error when retrieving mail, not deleting: ")
354 .append(re.toString());
355 }else{
356 exceptionBuffer.append("Exception retrieving mail: ")
357 .append(re.toString())
358 .append(", so we're deleting it.");
359 remove(key);
360 }
361 getLogger().warn(exceptionBuffer.toString());
362 return null;
363 }
364 MimeMessageAvalonSource source = new MimeMessageAvalonSource(sr, destination, key);
365 mc.setMessage(new MimeMessageCopyOnWriteProxy(source));
366
367 return mc;
368 } catch (Exception me) {
369 getLogger().error("Exception retrieving mail: " + me);
370 throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
371 }
372 }
373
374 /***
375 * Removes a specified message
376 *
377 * @param mail the message to be removed from the repository
378 */
379 public void remove(Mail mail) throws MessagingException {
380 remove(mail.getName());
381 }
382
383
384 /***
385 * Removes a Collection of mails from the repository
386 * @param mails The Collection of <code>MailImpl</code>'s to delete
387 * @throws MessagingException
388 * @since 2.2.0
389 */
390 public void remove(Collection mails) throws MessagingException {
391 Iterator delList = mails.iterator();
392 while (delList.hasNext()) {
393 remove((Mail)delList.next());
394 }
395 }
396
397 /***
398 * Removes a message identified by key.
399 *
400 * @param key the key of the message to be removed from the repository
401 */
402 public void remove(String key) throws MessagingException {
403 if (lock(key)) {
404 try {
405 if (keys != null) keys.remove(key);
406 sr.remove(key);
407 or.remove(key);
408 } finally {
409 unlock(key);
410 }
411 } else {
412 StringBuffer exceptionBuffer =
413 new StringBuffer(64)
414 .append("Cannot lock ")
415 .append(key)
416 .append(" to remove it");
417 throw new MessagingException(exceptionBuffer.toString());
418 }
419 }
420
421 /***
422 * List string keys of messages in repository.
423 *
424 * @return an <code>Iterator</code> over the list of keys in the repository
425 *
426 */
427 public Iterator list() {
428
429
430 final ArrayList clone;
431 if (keys != null) synchronized(keys) {
432 clone = new ArrayList(keys);
433 } else {
434 clone = new ArrayList();
435 for (Iterator i = or.list(); i.hasNext(); ) {
436 clone.add(i.next());
437 }
438 }
439 if (fifo) Collections.sort(clone);
440 return clone.iterator();
441 }
442 }