1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.apache.james.mailrepository;
23
24 import org.apache.avalon.framework.configuration.Configuration;
25 import org.apache.avalon.framework.configuration.ConfigurationException;
26
27 import org.apache.james.services.SpoolRepository;
28 import org.apache.mailet.Mail;
29
30 import java.sql.Connection;
31 import java.sql.PreparedStatement;
32 import java.sql.ResultSet;
33 import java.sql.SQLException;
34 import java.util.LinkedList;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 public class JDBCSpoolRepository extends JDBCMailRepository implements SpoolRepository {
92
93
94
95
96 private static int WAIT_LIMIT = 60000;
97
98
99
100 private static int LOAD_TIME_MININUM = 1000;
101
102
103
104 private LinkedList pendingMessages = new LinkedList();
105
106
107
108 private long pendingMessagesLoadTime = 0;
109
110
111
112 private int maxPendingMessages = 0;
113
114
115
116
117 public void configure(Configuration conf) throws ConfigurationException {
118 super.configure(conf);
119 maxPendingMessages = conf.getChild("maxcache").getValueAsInteger(1000);
120 }
121
122
123
124
125 public synchronized Mail accept() throws InterruptedException {
126 return accept(new SpoolRepository.AcceptFilter () {
127 public boolean accept (String _, String __, long ___, String ____) {
128 return true;
129 }
130
131 public long getWaitTime () {
132 return 0;
133 }
134 });
135 }
136
137
138
139
140 public synchronized Mail accept(final long delay) throws InterruptedException {
141 return accept (new SpoolRepository.AcceptFilter () {
142 long sleepUntil = 0;
143
144 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
145 if (Mail.ERROR.equals(state)) {
146
147 long processingTime = delay + lastUpdated;
148 if (processingTime < System.currentTimeMillis()) {
149
150 return true;
151 } else {
152
153
154 if (sleepUntil == 0 || processingTime < sleepUntil) {
155 sleepUntil = processingTime;
156 }
157 return false;
158 }
159 } else {
160 return true;
161 }
162 }
163
164
165 public long getWaitTime () {
166 if (sleepUntil == 0) {
167
168
169 return 0;
170 }
171 long waitTime = sleepUntil - System.currentTimeMillis();
172 sleepUntil = 0;
173 return waitTime <= 0 ? 1 : waitTime;
174 }
175
176 });
177 }
178
179
180
181
182
183 public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
184 while (!Thread.currentThread().isInterrupted()) {
185
186
187 PendingMessage next = null;
188 while ((next = getNextPendingMessage(filter)) != null && !Thread.currentThread().isInterrupted()) {
189
190
191
192
193 if (
194 try {
195 Mail mail = retrieve(next.key);
196
197
198
199 if (mail == null) {
200 unlock(next.key);
201 continue;
202 }
203 return mail;
204 } catch (javax.mail.MessagingException e) {
205 unlock(next.key);
206 getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
207 }
208 }
209 }
210
211 long wait_time = filter.getWaitTime();
212 if (wait_time <= 0) {
213 wait_time = WAIT_LIMIT;
214 }
215 try {
216 wait (wait_time);
217 } catch (InterruptedException ex) {
218 throw ex;
219 }
220 }
221 throw new InterruptedException();
222 }
223
224
225
226
227
228
229
230
231
232 public void store(Mail mc) throws javax.mail.MessagingException {
233 pendingMessagesLoadTime = 0;
234 super.store(mc);
235 }
236
237
238
239
240
241
242 private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter filter) {
243 synchronized (pendingMessages) {
244 if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) {
245
246 loadPendingMessages(filter);
247 pendingMessagesLoadTime = Math.max(filter.getWaitTime(), LOAD_TIME_MININUM) + System.currentTimeMillis();
248 }
249
250 if (pendingMessages.size() == 0) {
251 return null;
252 } else {
253 return (PendingMessage)pendingMessages.removeFirst();
254 }
255 }
256 }
257
258
259
260
261 private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
262
263 synchronized (pendingMessages) {
264 pendingMessages.clear();
265
266 Connection conn = null;
267 PreparedStatement listMessages = null;
268 ResultSet rsListMessages = null;
269 try {
270 conn = datasource.getConnection();
271 listMessages =
272 conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
273 listMessages.setString(1, repositoryName);
274
275
276
277
278
279 rsListMessages = listMessages.executeQuery();
280
281
282
283
284 while (rsListMessages.next() && pendingMessages.size() < maxPendingMessages && !Thread.currentThread().isInterrupted()) {
285 String key = rsListMessages.getString(1);
286 String state = rsListMessages.getString(2);
287 long lastUpdated = rsListMessages.getTimestamp(3).getTime();
288 String errorMessage = rsListMessages.getString(4);
289 if (filter.accept(key, state, lastUpdated, errorMessage)) {
290 pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage));
291 }
292 }
293 } catch (SQLException sqle) {
294
295 getLogger().error("Error retrieving pending messages", sqle);
296 pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 + System.currentTimeMillis();
297 } finally {
298 theJDBCUtil.closeJDBCResultSet(rsListMessages);
299 theJDBCUtil.closeJDBCStatement(listMessages);
300 theJDBCUtil.closeJDBCConnection(conn);
301 }
302 }
303 }
304
305
306
307
308 class PendingMessage {
309 protected String key;
310 protected String state;
311 protected long lastUpdated;
312 protected String errorMessage;
313
314 public PendingMessage(String key, String state, long lastUpdated, String errorMessage) {
315 this.key = key;
316 this.state = state;
317 this.lastUpdated = lastUpdated;
318 this.errorMessage = errorMessage;
319 }
320 }
321 }