1 /****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20
21
22 package org.apache.james.mailrepository;
23
24 import org.apache.avalon.framework.configuration.Configuration;
25 import org.apache.avalon.framework.configuration.ConfigurationException;
26
27 import org.apache.james.services.SpoolRepository;
28 import org.apache.mailet.Mail;
29
30 import java.sql.Connection;
31 import java.sql.PreparedStatement;
32 import java.sql.ResultSet;
33 import java.sql.SQLException;
34 import java.util.LinkedList;
35
36 /**
37 * Implementation of a SpoolRepository on a database.
38 *
39 * <p>Requires a configuration element in the .conf.xml file of the form:
40 * <br><repository destinationURL="town://path"
41 * <br> type="MAIL"
42 * <br> model="SYNCHRONOUS"/>
43 * <br> <driver>sun.jdbc.odbc.JdbcOdbcDriver</conn>
44 * <br> <conn>jdbc:odbc:LocalDB</conn>
45 * <br> <table>Message</table>
46 * <br></repository>
47 * <p>destinationURL specifies..(Serge??)
48 * <br>Type can be SPOOL or MAIL
49 * <br>Model is currently not used and may be dropped
50 * <br>conn is the location of the ...(Serge)
51 * <br>table is the name of the table in the Database to be used
52 *
53 * <p>Requires a logger called MailRepository.
54 *
55 * <p>Approach for spool manager:
56 *
57 * PendingMessage inner class
58 *
59 * accept() is called....
60 * checks whether needs to load PendingMessages()
61 * tries to get a message()
62 * if none, wait 60
63 *
64 * accept(long) is called
65 * checks whether needs to load PendingMessages
66 * tries to get a message(long)
67 * if none, wait accordingly
68 *
69 * sync checkswhetherneedstoloadPendingMessages()
70 * if pending messages has messages in immediate process, return immediately
71 * if run query in last WAIT_LIMIT time, return immediately
72 * query and build 2 vectors of Pending messages.
73 * Ones that need immediate processing
74 * Ones that are delayed. put them in time order
75 * return
76 *
77 * get_a_message()
78 * loop through immediate messages.
79 * - remove top message
80 * - try to lock. if successful, return. otherwise loop.
81 * if nothing, return null
82 *
83 * get_a_message(long)
84 * try get_a_message()
85 * check top message in pending. if ready, then remove, try to lock, return if lock.
86 * return null.
87 *
88 *
89 * @version 1.0.0, 24/04/1999
90 */
91 public class JDBCSpoolRepository extends JDBCMailRepository implements SpoolRepository {
92
93 /**
94 * How long a thread should sleep when there are no messages to process.
95 */
96 private static int WAIT_LIMIT = 60000;
97 /**
98 * How long we have to wait before reloading the list of pending messages
99 */
100 private static int LOAD_TIME_MININUM = 1000;
101 /**
102 * A queue in memory of messages that need processing
103 */
104 private LinkedList pendingMessages = new LinkedList();
105 /**
106 * When the queue was last read
107 */
108 private long pendingMessagesLoadTime = 0;
109 /**
110 * Maximum size of the pendingMessages queue
111 */
112 private int maxPendingMessages = 0;
113
114 /**
115 * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
116 */
117 public void configure(Configuration conf) throws ConfigurationException {
118 super.configure(conf);
119 maxPendingMessages = conf.getChild("maxcache").getValueAsInteger(1000);
120 }
121
122 /**
123 * @see org.apache.james.services.SpoolRepository#accept()
124 */
125 public synchronized Mail accept() throws InterruptedException {
126 return accept(new SpoolRepository.AcceptFilter () {
127 public boolean accept (String _, String __, long ___, String ____) {
128 return true;
129 }
130
131 public long getWaitTime () {
132 return 0;
133 }
134 });
135 }
136
137 /**
138 * @see org.apache.james.services.SpoolRepository#accept(long)
139 */
140 public synchronized Mail accept(final long delay) throws InterruptedException {
141 return accept (new SpoolRepository.AcceptFilter () {
142 long sleepUntil = 0;
143
144 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
145 if (Mail.ERROR.equals(state)) {
146 //if it's an error message, test the time
147 long processingTime = delay + lastUpdated;
148 if (processingTime < System.currentTimeMillis()) {
149 //It's time to process
150 return true;
151 } else {
152 //We don't process this, but we want to possibly reduce the amount of time
153 // we sleep so we wake when this message is ready.
154 if (sleepUntil == 0 || processingTime < sleepUntil) {
155 sleepUntil = processingTime;
156 }
157 return false;
158 }
159 } else {
160 return true;
161 }
162 }
163
164
165 public long getWaitTime () {
166 if (sleepUntil == 0) {
167 // in AvalonSpoolRepository we return 0: why do we change sleepUntil?
168 // sleepUntil = System.currentTimeMillis();
169 return 0;
170 }
171 long waitTime = sleepUntil - System.currentTimeMillis();
172 sleepUntil = 0;
173 return waitTime <= 0 ? 1 : waitTime;
174 }
175
176 });
177 }
178
179 /**
180 /**
181 * @see org.apache.james.services.SpoolRepository#accept(org.apache.james.services.SpoolRepository.AcceptFilter)
182 */
183 public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
184 while (!Thread.currentThread().isInterrupted()) {
185 //Loop through until we are either out of pending messages or have a message
186 // that we can lock
187 PendingMessage next = null;
188 while ((next = getNextPendingMessage(filter)) != null && !Thread.currentThread().isInterrupted()) {
189 //Check whether this is time to expire
190
191 // boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage);
192
193 if (/*shouldProcess && */ lock(next.key)) {
194 try {
195 Mail mail = retrieve(next.key);
196 // Retrieve can return null if the mail is no longer on the spool
197 // (i.e. another thread has gotten to it first).
198 // In this case we simply continue to the next key
199 if (mail == null) {
200 unlock(next.key);
201 continue;
202 }
203 return mail;
204 } catch (javax.mail.MessagingException e) {
205 unlock(next.key);
206 getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
207 }
208 }
209 }
210 //Nothing to do... sleep!
211 long wait_time = filter.getWaitTime();
212 if (wait_time <= 0) {
213 wait_time = WAIT_LIMIT;
214 }
215 try {
216 wait (wait_time);
217 } catch (InterruptedException ex) {
218 throw ex;
219 }
220 }
221 throw new InterruptedException();
222 }
223
224 /**
225 * Needs to override this method and reset the time to load to zero.
226 * This will force a reload of the pending messages queue once that
227 * is empty... a message that gets added will sit here until that queue
228 * time has passed and the list is then reloaded.
229 *
230 * @see org.apache.james.mailrepository.AbstractMailRepository#store(Mail)
231 */
232 public void store(Mail mc) throws javax.mail.MessagingException {
233 pendingMessagesLoadTime = 0;
234 super.store(mc);
235 }
236
237 /**
238 * If not empty, gets the next pending message. Otherwise checks
239 * checks the last time pending messages was loaded and load if
240 * it's been more than 1 second (should be configurable).
241 */
242 private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter filter) {
243 synchronized (pendingMessages) {
244 if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) {
245 // pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis();
246 loadPendingMessages(filter);
247 pendingMessagesLoadTime = Math.max(filter.getWaitTime(), LOAD_TIME_MININUM) + System.currentTimeMillis();
248 }
249
250 if (pendingMessages.size() == 0) {
251 return null;
252 } else {
253 return (PendingMessage)pendingMessages.removeFirst();
254 }
255 }
256 }
257
258 /**
259 * Retrieves the pending messages that are in the database
260 */
261 private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
262 //Loads a vector with PendingMessage objects
263 synchronized (pendingMessages) {
264 pendingMessages.clear();
265
266 Connection conn = null;
267 PreparedStatement listMessages = null;
268 ResultSet rsListMessages = null;
269 try {
270 conn = datasource.getConnection();
271 listMessages =
272 conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
273 listMessages.setString(1, repositoryName);
274 // Too simplistic. When filtering, we may need to see
275 // more than just maxPendingMessages to load the
276 // cache, so just hope that the driver and server use
277 // cursors properly.
278 // --> listMessages.setMaxRows(maxPendingMessages);
279 rsListMessages = listMessages.executeQuery();
280 // Continue to have it loop through the list of messages until we hit
281 // a possible message, or we retrieve maxPendingMessages messages.
282 // This maxPendingMessages cap is to avoid loading thousands or
283 // hundreds of thousands of messages when the spool is enourmous.
284 while (rsListMessages.next() && pendingMessages.size() < maxPendingMessages && !Thread.currentThread().isInterrupted()) {
285 String key = rsListMessages.getString(1);
286 String state = rsListMessages.getString(2);
287 long lastUpdated = rsListMessages.getTimestamp(3).getTime();
288 String errorMessage = rsListMessages.getString(4);
289 if (filter.accept(key, state, lastUpdated, errorMessage)) {
290 pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage));
291 }
292 }
293 } catch (SQLException sqle) {
294 //Log it and avoid reloading for a bit
295 getLogger().error("Error retrieving pending messages", sqle);
296 pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 + System.currentTimeMillis();
297 } finally {
298 theJDBCUtil.closeJDBCResultSet(rsListMessages);
299 theJDBCUtil.closeJDBCStatement(listMessages);
300 theJDBCUtil.closeJDBCConnection(conn);
301 }
302 }
303 }
304
305 /**
306 * Simple class to hold basic information about a message in the spool
307 */
308 class PendingMessage {
309 protected String key;
310 protected String state;
311 protected long lastUpdated;
312 protected String errorMessage;
313
314 public PendingMessage(String key, String state, long lastUpdated, String errorMessage) {
315 this.key = key;
316 this.state = state;
317 this.lastUpdated = lastUpdated;
318 this.errorMessage = errorMessage;
319 }
320 }
321 }