1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20
21
22 package org.apache.james.mailrepository;
23
24 import org.apache.avalon.framework.configuration.Configuration;
25 import org.apache.avalon.framework.configuration.ConfigurationException;
26
27 import org.apache.james.services.SpoolRepository;
28 import org.apache.mailet.Mail;
29
30 import java.sql.Connection;
31 import java.sql.PreparedStatement;
32 import java.sql.ResultSet;
33 import java.sql.SQLException;
34 import java.util.LinkedList;
35
36 /***
37 * Implementation of a SpoolRepository on a database.
38 *
39 * <p>Requires a configuration element in the .conf.xml file of the form:
40 * <br><repository destinationURL="town://path"
41 * <br> type="MAIL"
42 * <br> model="SYNCHRONOUS"/>
43 * <br> <driver>sun.jdbc.odbc.JdbcOdbcDriver</conn>
44 * <br> <conn>jdbc:odbc:LocalDB</conn>
45 * <br> <table>Message</table>
46 * <br></repository>
47 * <p>destinationURL specifies..(Serge??)
48 * <br>Type can be SPOOL or MAIL
49 * <br>Model is currently not used and may be dropped
50 * <br>conn is the location of the ...(Serge)
51 * <br>table is the name of the table in the Database to be used
52 *
53 * <p>Requires a logger called MailRepository.
54 *
55 * <p>Approach for spool manager:
56 *
57 * PendingMessage inner class
58 *
59 * accept() is called....
60 * checks whether needs to load PendingMessages()
61 * tries to get a message()
62 * if none, wait 60
63 *
64 * accept(long) is called
65 * checks whether needs to load PendingMessages
66 * tries to get a message(long)
67 * if none, wait accordingly
68 *
69 * sync checkswhetherneedstoloadPendingMessages()
70 * if pending messages has messages in immediate process, return immediately
71 * if run query in last WAIT_LIMIT time, return immediately
72 * query and build 2 vectors of Pending messages.
73 * Ones that need immediate processing
74 * Ones that are delayed. put them in time order
75 * return
76 *
77 * get_a_message()
78 * loop through immediate messages.
79 * - remove top message
80 * - try to lock. if successful, return. otherwise loop.
81 * if nothing, return null
82 *
83 * get_a_message(long)
84 * try get_a_message()
85 * check top message in pending. if ready, then remove, try to lock, return if lock.
86 * return null.
87 *
88 *
89 * @version 1.0.0, 24/04/1999
90 */
91 public class JDBCSpoolRepository extends JDBCMailRepository implements SpoolRepository {
92
93 /***
94 * How long a thread should sleep when there are no messages to process.
95 */
96 private static int WAIT_LIMIT = 60000;
97 /***
98 * How long we have to wait before reloading the list of pending messages
99 */
100 private static int LOAD_TIME_MININUM = 1000;
101 /***
102 * A queue in memory of messages that need processing
103 */
104 private LinkedList pendingMessages = new LinkedList();
105 /***
106 * When the queue was last read
107 */
108 private long pendingMessagesLoadTime = 0;
109 /***
110 * Maximum size of the pendingMessages queue
111 */
112 private int maxPendingMessages = 0;
113
114 /***
115 * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
116 */
117 public void configure(Configuration conf) throws ConfigurationException {
118 super.configure(conf);
119 maxPendingMessages = conf.getChild("maxcache").getValueAsInteger(1000);
120 }
121
122 /***
123 * Return a message to process. This is a message in the spool that is not locked.
124 */
125 public synchronized Mail accept() throws InterruptedException {
126 return accept(new SpoolRepository.AcceptFilter () {
127 public boolean accept (String _, String __, long ___, String ____) {
128 return true;
129 }
130
131 public long getWaitTime () {
132 return 0;
133 }
134 });
135 }
136
137 /***
138 * Return a message that's ready to process. If a message is of type "error"
139 * then check the last updated time, and don't try it until the long 'delay' parameter
140 * milliseconds has passed.
141 */
142 public synchronized Mail accept(final long delay) throws InterruptedException {
143 return accept (new SpoolRepository.AcceptFilter () {
144 long sleepUntil = 0;
145
146 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
147 if (Mail.ERROR.equals(state)) {
148
149 long processingTime = delay + lastUpdated;
150 if (processingTime < System.currentTimeMillis()) {
151
152 return true;
153 } else {
154
155
156 if (sleepUntil == 0 || processingTime < sleepUntil) {
157 sleepUntil = processingTime;
158 }
159 return false;
160 }
161 } else {
162 return true;
163 }
164 }
165
166
167 public long getWaitTime () {
168 if (sleepUntil == 0) {
169
170
171 return 0;
172 }
173 long waitTime = sleepUntil - System.currentTimeMillis();
174 sleepUntil = 0;
175 return waitTime <= 0 ? 1 : waitTime;
176 }
177
178 });
179 }
180
181 /***
182 * Returns an arbitrarily selected mail deposited in this Repository for
183 * which the supplied filter's accept method returns true.
184 * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
185 * based on number of retries if the mail is ready for processing.
186 * If no message is ready the method will block until one is, the amount of time to block is
187 * determined by calling the filters getWaitTime method.
188 *
189 * @return the mail
190 */
191 public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
192 while (!Thread.currentThread().isInterrupted()) {
193
194
195 PendingMessage next = null;
196 while ((next = getNextPendingMessage(filter)) != null && !Thread.currentThread().isInterrupted()) {
197
198
199
200
201 if (
202 try {
203 Mail mail = retrieve(next.key);
204
205
206
207 if (mail == null) {
208 unlock(next.key);
209 continue;
210 }
211 return mail;
212 } catch (javax.mail.MessagingException e) {
213 unlock(next.key);
214 getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
215 }
216 }
217 }
218
219 long wait_time = filter.getWaitTime();
220 if (wait_time <= 0) {
221 wait_time = WAIT_LIMIT;
222 }
223 try {
224 wait (wait_time);
225 } catch (InterruptedException ex) {
226 throw ex;
227 }
228 }
229 throw new InterruptedException();
230 }
231
232 /***
233 * Needs to override this method and reset the time to load to zero.
234 * This will force a reload of the pending messages queue once that
235 * is empty... a message that gets added will sit here until that queue
236 * time has passed and the list is then reloaded.
237 */
238 public void store(Mail mc) throws javax.mail.MessagingException {
239 pendingMessagesLoadTime = 0;
240 super.store(mc);
241 }
242
243 /***
244 * If not empty, gets the next pending message. Otherwise checks
245 * checks the last time pending messages was loaded and load if
246 * it's been more than 1 second (should be configurable).
247 */
248 private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter filter) {
249 synchronized (pendingMessages) {
250 if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) {
251
252 loadPendingMessages(filter);
253 pendingMessagesLoadTime = Math.max(filter.getWaitTime(), LOAD_TIME_MININUM) + System.currentTimeMillis();
254 }
255
256 if (pendingMessages.size() == 0) {
257 return null;
258 } else {
259 return (PendingMessage)pendingMessages.removeFirst();
260 }
261 }
262 }
263
264 /***
265 * Retrieves the pending messages that are in the database
266 */
267 private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
268
269 synchronized (pendingMessages) {
270 pendingMessages.clear();
271
272 Connection conn = null;
273 PreparedStatement listMessages = null;
274 ResultSet rsListMessages = null;
275 try {
276 conn = datasource.getConnection();
277 listMessages =
278 conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
279 listMessages.setString(1, repositoryName);
280
281
282
283
284
285 rsListMessages = listMessages.executeQuery();
286
287
288
289
290 while (rsListMessages.next() && pendingMessages.size() < maxPendingMessages && !Thread.currentThread().isInterrupted()) {
291 String key = rsListMessages.getString(1);
292 String state = rsListMessages.getString(2);
293 long lastUpdated = rsListMessages.getTimestamp(3).getTime();
294 String errorMessage = rsListMessages.getString(4);
295 if (filter.accept(key, state, lastUpdated, errorMessage)) {
296 pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage));
297 }
298 }
299 } catch (SQLException sqle) {
300
301 getLogger().error("Error retrieving pending messages", sqle);
302 pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 + System.currentTimeMillis();
303 } finally {
304 theJDBCUtil.closeJDBCResultSet(rsListMessages);
305 theJDBCUtil.closeJDBCStatement(listMessages);
306 theJDBCUtil.closeJDBCConnection(conn);
307 }
308 }
309 }
310
311 /***
312 * Simple class to hold basic information about a message in the spool
313 */
314 class PendingMessage {
315 protected String key;
316 protected String state;
317 protected long lastUpdated;
318 protected String errorMessage;
319
320 public PendingMessage(String key, String state, long lastUpdated, String errorMessage) {
321 this.key = key;
322 this.state = state;
323 this.lastUpdated = lastUpdated;
324 this.errorMessage = errorMessage;
325 }
326 }
327 }