1 /************************************************************************
2 * Copyright (c) 1999-2006 The Apache Software Foundation. *
3 * All rights reserved. *
4 * ------------------------------------------------------------------- *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you *
6 * may not use this file except in compliance with the License. You *
7 * may obtain a copy of the License at: *
8 * *
9 * http://www.apache.org/licenses/LICENSE-2.0 *
10 * *
11 * Unless required by applicable law or agreed to in writing, software *
12 * distributed under the License is distributed on an "AS IS" BASIS, *
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14 * implied. See the License for the specific language governing *
15 * permissions and limitations under the License. *
16 ***********************************************************************/
17
18 package org.apache.james.mailrepository;
19
20 import org.apache.james.services.SpoolRepository;
21 import org.apache.mailet.Mail;
22
23 import java.util.ConcurrentModificationException;
24 import java.util.Iterator;
25
26 /***
27 * Implementation of a MailRepository on a FileSystem.
28 *
29 * Requires a configuration element in the .conf.xml file of the form:
30 * <repository destinationURL="file://path-to-root-dir-for-repository"
31 * type="MAIL"
32 * model="SYNCHRONOUS"/>
33 * Requires a logger called MailRepository.
34 *
35 * @version 1.0.0, 24/04/1999
36 */
37 public class AvalonSpoolRepository
38 extends AvalonMailRepository
39 implements SpoolRepository {
40
41 /***
42 * <p>Returns an arbitrarily selected mail deposited in this Repository.
43 * Usage: SpoolManager calls accept() to see if there are any unprocessed
44 * mails in the spool repository.</p>
45 *
46 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
47 *
48 * @return the mail
49 */
50 public synchronized Mail accept() throws InterruptedException {
51 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
52 getLogger().debug("Method accept() called");
53 }
54 return accept(new SpoolRepository.AcceptFilter () {
55 public boolean accept (String _, String __, long ___, String ____) {
56 return true;
57 }
58
59 public long getWaitTime () {
60 return 0;
61 }
62 });
63 }
64
65 /***
66 * <p>Returns an arbitrarily selected mail deposited in this Repository that
67 * is either ready immediately for delivery, or is younger than it's last_updated plus
68 * the number of failed attempts times the delay time.
69 * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
70 * unprocessed mail is available.</p>
71 *
72 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
73 *
74 * @return the mail
75 */
76 public synchronized Mail accept(final long delay) throws InterruptedException
77 {
78 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
79 getLogger().debug("Method accept(delay) called");
80 }
81 return accept(new SpoolRepository.AcceptFilter () {
82 long youngest = 0;
83
84 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
85 if (state.equals(Mail.ERROR)) {
86
87 long timeToProcess = delay + lastUpdated;
88
89 if (System.currentTimeMillis() > timeToProcess) {
90
91 return true;
92 } else {
93
94 if (youngest == 0 || youngest > timeToProcess) {
95
96 youngest = timeToProcess;
97 }
98 return false;
99 }
100 } else {
101
102 return true;
103 }
104 }
105
106 public long getWaitTime () {
107 if (youngest == 0) {
108 return 0;
109 } else {
110 long duration = youngest - System.currentTimeMillis();
111 youngest = 0;
112 return duration <= 0 ? 1 : duration;
113 }
114 }
115 });
116 }
117
118
119 /***
120 * Returns an arbitrarily select mail deposited in this Repository for
121 * which the supplied filter's accept method returns true.
122 * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
123 * based on number of retries if the mail is ready for processing.
124 * If no message is ready the method will block until one is, the amount of time to block is
125 * determined by calling the filters getWaitTime method.
126 *
127 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
128 *
129 * @return the mail
130 */
131 public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
132 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
133 getLogger().debug("Method accept(Filter) called");
134 }
135 while (!Thread.currentThread().isInterrupted()) try {
136 for (Iterator it = list(); it.hasNext(); ) {
137 String s = it.next().toString();
138 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
139 StringBuffer logBuffer =
140 new StringBuffer(64)
141 .append("Found item ")
142 .append(s)
143 .append(" in spool.");
144 getLogger().debug(logBuffer.toString());
145 }
146 if (lock(s)) {
147 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
148 getLogger().debug("accept(Filter) has locked: " + s);
149 }
150 try {
151 Mail mail = retrieve(s);
152
153
154
155 if (mail == null || !filter.accept (mail.getName(),
156 mail.getState(),
157 mail.getLastUpdated().getTime(),
158 mail.getErrorMessage())) {
159 unlock(s);
160 continue;
161 }
162 return mail;
163 } catch (javax.mail.MessagingException e) {
164 unlock(s);
165 getLogger().error("Exception during retrieve -- skipping item " + s, e);
166 }
167 }
168 }
169
170
171 wait (filter.getWaitTime());
172 } catch (InterruptedException ex) {
173 throw ex;
174 } catch (ConcurrentModificationException cme) {
175
176 getLogger().error("CME in spooler - please report to http://james.apache.org", cme);
177 }
178 throw new InterruptedException();
179 }
180
181 }