1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.test.mock.james;
21
22 import org.apache.avalon.framework.activity.Disposable;
23 import org.apache.avalon.framework.container.ContainerUtil;
24 import org.apache.james.core.MailImpl;
25 import org.apache.james.services.SpoolRepository;
26 import org.apache.james.test.mock.avalon.MockLogger;
27 import org.apache.james.util.Lock;
28 import org.apache.mailet.Mail;
29
30 import javax.mail.MessagingException;
31
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.ConcurrentModificationException;
35 import java.util.Hashtable;
36 import java.util.Iterator;
37
38 /***
39 * Implementation of a MailRepository on a FileSystem.
40 *
41 * Requires a configuration element in the .conf.xml file of the form:
42 * <repository destinationURL="file://path-to-root-dir-for-repository"
43 * type="MAIL"
44 * model="SYNCHRONOUS"/>
45 * Requires a logger called MailRepository.
46 *
47 * @version 1.0.0, 24/04/1999
48 */
49 public class InMemorySpoolRepository
50 implements SpoolRepository, Disposable {
51
52 /***
53 * Whether 'deep debugging' is turned on.
54 */
55 protected final static boolean DEEP_DEBUG = true;
56 private Lock lock;
57 private MockLogger logger;
58 private Hashtable spool;
59
60 private MockLogger getLogger() {
61 if (logger == null) {
62 logger = new MockLogger();
63 }
64 return logger;
65 }
66
67 /***
68 * Releases a lock on a message identified by a key
69 *
70 * @param key the key of the message to be unlocked
71 *
72 * @return true if successfully released the lock, false otherwise
73 */
74 public boolean unlock(String key) {
75 if (lock.unlock(key)) {
76 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
77 StringBuffer debugBuffer =
78 new StringBuffer(256)
79 .append("Unlocked ")
80 .append(key)
81 .append(" for ")
82 .append(Thread.currentThread().getName())
83 .append(" @ ")
84 .append(new java.util.Date(System.currentTimeMillis()));
85 getLogger().debug(debugBuffer.toString());
86 }
87 return true;
88 } else {
89 return false;
90 }
91 }
92
93 /***
94 * Obtains a lock on a message identified by a key
95 *
96 * @param key the key of the message to be locked
97 *
98 * @return true if successfully obtained the lock, false otherwise
99 */
100 public boolean lock(String key) {
101 if (lock.lock(key)) {
102 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
103 StringBuffer debugBuffer =
104 new StringBuffer(256)
105 .append("Locked ")
106 .append(key)
107 .append(" for ")
108 .append(Thread.currentThread().getName())
109 .append(" @ ")
110 .append(new java.util.Date(System.currentTimeMillis()));
111 getLogger().debug(debugBuffer.toString());
112 }
113
114
115
116 return true;
117 } else {
118 return false;
119 }
120 }
121
122 /***
123 * Stores a message in this repository. Shouldn't this return the key
124 * under which it is stored?
125 *
126 * @param mc the mail message to store
127 */
128 public void store(Mail mc) throws MessagingException {
129 try {
130 String key = mc.getName();
131
132 boolean wasLocked = true;
133 synchronized (this) {
134 wasLocked = lock.isLocked(key);
135
136 if (!wasLocked) {
137
138 lock(key);
139 }
140 }
141 try {
142
143 if (spool.containsKey(key)) {
144
145
146 Object o = spool.remove(key);
147 ContainerUtil.dispose(o);
148 }
149
150 MailImpl m = new MailImpl(mc,mc.getName());
151 m.setState(mc.getState());
152 m.setLastUpdated(mc.getLastUpdated());
153 m.setErrorMessage(mc.getErrorMessage());
154 spool.put(mc.getName(),m);
155 } finally {
156 if (!wasLocked) {
157
158 unlock(key);
159 synchronized (this) {
160 notify();
161 }
162 }
163 }
164
165 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
166 StringBuffer logBuffer =
167 new StringBuffer(64)
168 .append("Mail ")
169 .append(key)
170 .append(" stored.");
171 getLogger().debug(logBuffer.toString());
172 }
173
174 } catch (Exception e) {
175 getLogger().error("Exception storing mail: " + e,e);
176 throw new MessagingException("Exception caught while storing Message Container: ",e);
177 }
178 }
179
180 /***
181 * Retrieves a message given a key. At the moment, keys can be obtained
182 * from list() in superinterface Store.Repository
183 *
184 * @param key the key of the message to retrieve
185 * @return the mail corresponding to this key, null if none exists
186 */
187 public Mail retrieve(String key) throws MessagingException {
188 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
189 getLogger().debug("Retrieving mail: " + key);
190 }
191 try {
192 Mail mc = null;
193 try {
194 mc = new MailImpl((Mail) spool.get(key),key);
195 mc.setState(((Mail) spool.get(key)).getState());
196 mc.setErrorMessage(((Mail) spool.get(key)).getErrorMessage());
197 mc.setLastUpdated(((Mail) spool.get(key)).getLastUpdated());
198 }
199 catch (RuntimeException re){
200 StringBuffer exceptionBuffer = new StringBuffer(128);
201 if(re.getCause() instanceof Error){
202 exceptionBuffer.append("Error when retrieving mail, not deleting: ")
203 .append(re.toString());
204 }else{
205 exceptionBuffer.append("Exception retrieving mail: ")
206 .append(re.toString())
207 .append(", so we're deleting it.");
208 remove(key);
209 }
210 getLogger().warn(exceptionBuffer.toString());
211 return null;
212 }
213 return mc;
214 } catch (Exception me) {
215 getLogger().error("Exception retrieving mail: " + me);
216 throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
217 }
218 }
219
220 /***
221 * Removes a specified message
222 *
223 * @param mail the message to be removed from the repository
224 */
225 public void remove(Mail mail) throws MessagingException {
226 remove(mail.getName());
227 }
228
229
230 /***
231 * Removes a Collection of mails from the repository
232 * @param mails The Collection of <code>MailImpl</code>'s to delete
233 * @throws MessagingException
234 * @since 2.2.0
235 */
236 public void remove(Collection mails) throws MessagingException {
237 Iterator delList = mails.iterator();
238 while (delList.hasNext()) {
239 remove((Mail)delList.next());
240 }
241 }
242
243 /***
244 * Removes a message identified by key.
245 *
246 * @param key the key of the message to be removed from the repository
247 */
248 public void remove(String key) throws MessagingException {
249 if (lock(key)) {
250 try {
251 if (spool != null) {
252 Object o = spool.remove(key);
253 ContainerUtil.dispose(o);
254 }
255 } finally {
256 unlock(key);
257 }
258 } else {
259 StringBuffer exceptionBuffer =
260 new StringBuffer(64)
261 .append("Cannot lock ")
262 .append(key)
263 .append(" to remove it");
264 throw new MessagingException(exceptionBuffer.toString());
265 }
266 }
267
268 /***
269 * List string keys of messages in repository.
270 *
271 * @return an <code>Iterator</code> over the list of keys in the repository
272 *
273 */
274 public Iterator list() {
275
276
277 final ArrayList clone;
278 synchronized(spool) {
279 clone = new ArrayList(spool.keySet());
280 }
281 return clone.iterator();
282 }
283
284
285 /***
286 * <p>Returns an arbitrarily selected mail deposited in this Repository.
287 * Usage: SpoolManager calls accept() to see if there are any unprocessed
288 * mails in the spool repository.</p>
289 *
290 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
291 *
292 * @return the mail
293 */
294 public synchronized Mail accept() throws InterruptedException {
295 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
296 getLogger().debug("Method accept() called");
297 }
298 return accept(new SpoolRepository.AcceptFilter () {
299 public boolean accept (String _, String __, long ___, String ____) {
300 return true;
301 }
302
303 public long getWaitTime () {
304 return 0;
305 }
306 });
307 }
308
309 /***
310 * <p>Returns an arbitrarily selected mail deposited in this Repository that
311 * is either ready immediately for delivery, or is younger than it's last_updated plus
312 * the number of failed attempts times the delay time.
313 * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
314 * unprocessed mail is available.</p>
315 *
316 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
317 *
318 * @return the mail
319 */
320 public synchronized Mail accept(final long delay) throws InterruptedException
321 {
322 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
323 getLogger().debug("Method accept(delay) called");
324 }
325 return accept(new SpoolRepository.AcceptFilter () {
326 long youngest = 0;
327
328 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
329 if (state.equals(Mail.ERROR)) {
330
331 long timeToProcess = delay + lastUpdated;
332
333 if (System.currentTimeMillis() > timeToProcess) {
334
335 return true;
336 } else {
337
338 if (youngest == 0 || youngest > timeToProcess) {
339
340 youngest = timeToProcess;
341 }
342 return false;
343 }
344 } else {
345
346 return true;
347 }
348 }
349
350 public long getWaitTime () {
351 if (youngest == 0) {
352 return 0;
353 } else {
354 long duration = youngest - System.currentTimeMillis();
355 youngest = 0;
356 return duration <= 0 ? 1 : duration;
357 }
358 }
359 });
360 }
361
362
363 /***
364 * Returns an arbitrarily select mail deposited in this Repository for
365 * which the supplied filter's accept method returns true.
366 * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
367 * based on number of retries if the mail is ready for processing.
368 * If no message is ready the method will block until one is, the amount of time to block is
369 * determined by calling the filters getWaitTime method.
370 *
371 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
372 *
373 * @return the mail
374 */
375 public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
376 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
377 getLogger().debug("Method accept(Filter) called");
378 }
379 while (!Thread.currentThread().isInterrupted()) try {
380 for (Iterator it = list(); it.hasNext(); ) {
381 String s = it.next().toString();
382 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
383 StringBuffer logBuffer =
384 new StringBuffer(64)
385 .append("Found item ")
386 .append(s)
387 .append(" in spool.");
388 getLogger().debug(logBuffer.toString());
389 }
390 if (lock(s)) {
391 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
392 getLogger().debug("accept(Filter) has locked: " + s);
393 }
394 try {
395 Mail mail = retrieve(s);
396
397
398
399 if (mail == null || !filter.accept (mail.getName(),
400 mail.getState(),
401 mail.getLastUpdated().getTime(),
402 mail.getErrorMessage())) {
403 unlock(s);
404 continue;
405 }
406 return mail;
407 } catch (javax.mail.MessagingException e) {
408 unlock(s);
409 getLogger().error("Exception during retrieve -- skipping item " + s, e);
410 }
411 }
412 }
413
414
415 wait (filter.getWaitTime());
416 } catch (InterruptedException ex) {
417 throw ex;
418 } catch (ConcurrentModificationException cme) {
419
420 getLogger().error("CME in spooler - please report to http://james.apache.org", cme);
421 }
422 throw new InterruptedException();
423 }
424
425 /***
426 *
427 */
428 public InMemorySpoolRepository() {
429 spool = new Hashtable();
430 lock = new Lock();
431 }
432
433 public int size() {
434 return spool.size();
435 }
436
437 public void clear() {
438 if (spool != null) {
439 Iterator i = list();
440 while (i.hasNext()) {
441 String key = (String) i.next();
442 try {
443 remove(key);
444 } catch (MessagingException e) {
445 }
446 }
447 }
448 }
449
450 public void dispose() {
451 clear();
452 }
453
454 public String toString() {
455 StringBuffer result = new StringBuffer();
456 result.append(super.toString());
457 Iterator i = list();
458 while (i.hasNext()) {
459 result.append("\n\t"+i.next());
460 }
461 return result.toString();
462 }
463
464 }