View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  
22  package org.apache.james.mailrepository;
23  
24  import org.apache.avalon.framework.configuration.Configuration;
25  import org.apache.avalon.framework.configuration.ConfigurationException;
26  
27  import org.apache.james.services.SpoolRepository;
28  import org.apache.mailet.Mail;
29  
30  import java.sql.Connection;
31  import java.sql.PreparedStatement;
32  import java.sql.ResultSet;
33  import java.sql.SQLException;
34  import java.util.LinkedList;
35  
36  /***
37   * Implementation of a SpoolRepository on a database.
38   *
39   * <p>Requires a configuration element in the .conf.xml file of the form:
40   *  <br>&lt;repository destinationURL="town://path"
41   *  <br>            type="MAIL"
42   *  <br>            model="SYNCHRONOUS"/&gt;
43   *  <br>            &lt;driver&gt;sun.jdbc.odbc.JdbcOdbcDriver&lt;/conn&gt;
44   *  <br>            &lt;conn&gt;jdbc:odbc:LocalDB&lt;/conn&gt;
45   *  <br>            &lt;table&gt;Message&lt;/table&gt;
46   *  <br>&lt;/repository&gt;
47   * <p>destinationURL specifies..(Serge??)
48   * <br>Type can be SPOOL or MAIL
49   * <br>Model is currently not used and may be dropped
50   * <br>conn is the location of the ...(Serge)
51   * <br>table is the name of the table in the Database to be used
52   *
53   * <p>Requires a logger called MailRepository.
54   *
55   * <p>Approach for spool manager:
56   *
57   * PendingMessage inner class
58   *
59   * accept() is called....
60   * checks whether needs to load PendingMessages()
61   * tries to get a message()
62   * if none, wait 60
63   *
64   * accept(long) is called
65   * checks whether needs to load PendingMessages
66   * tries to get a message(long)
67   * if none, wait accordingly
68   *
69   * sync checkswhetherneedstoloadPendingMessages()
70   * if pending messages has messages in immediate process, return immediately
71   * if run query in last WAIT_LIMIT time, return immediately
72   * query and build 2 vectors of Pending messages.
73   *  Ones that need immediate processing
74   *  Ones that are delayed.  put them in time order
75   * return
76   *
77   * get_a_message()
78   * loop through immediate messages.
79   *  - remove top message
80   *  - try to lock.  if successful, return.  otherwise loop.
81   * if nothing, return null
82   *
83   * get_a_message(long)
84   * try get_a_message()
85   * check top message in pending.  if ready, then remove, try to lock, return if lock.
86   * return null.
87   *
88   *
89   * @version 1.0.0, 24/04/1999
90   */
91  public class JDBCSpoolRepository extends JDBCMailRepository implements SpoolRepository {
92  
93      /***
94       * How long a thread should sleep when there are no messages to process.
95       */
96      private static int WAIT_LIMIT = 60000;
97      /***
98       * How long we have to wait before reloading the list of pending messages
99       */
100     private static int LOAD_TIME_MININUM = 1000;
101     /***
102      * A queue in memory of messages that need processing
103      */
104     private LinkedList pendingMessages = new LinkedList();
105     /***
106      * When the queue was last read
107      */
108     private long pendingMessagesLoadTime = 0;
109     /***
110      * Maximum size of the pendingMessages queue
111      */
112     private int maxPendingMessages = 0;
113 
114     /***
115      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
116      */
117     public void configure(Configuration conf) throws ConfigurationException {
118         super.configure(conf);
119         maxPendingMessages = conf.getChild("maxcache").getValueAsInteger(1000);
120     }
121 
122     /***
123      * Return a message to process.  This is a message in the spool that is not locked.
124      */
125     public synchronized Mail accept() throws InterruptedException {
126         return accept(new SpoolRepository.AcceptFilter () {
127             public boolean accept (String _, String __, long ___, String ____) {
128                 return true;
129             }
130 
131             public long getWaitTime () {
132                 return 0;
133             }
134         });
135     }
136 
137     /***
138      * Return a message that's ready to process.  If a message is of type "error"
139      * then check the last updated time, and don't try it until the long 'delay' parameter
140      * milliseconds has passed.
141      */
142     public synchronized Mail accept(final long delay) throws InterruptedException {
143         return accept (new SpoolRepository.AcceptFilter () {
144             long sleepUntil = 0;
145                 
146                 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
147                     if (Mail.ERROR.equals(state)) {
148                         //if it's an error message, test the time
149                         long processingTime = delay + lastUpdated;
150                         if (processingTime < System.currentTimeMillis()) {
151                             //It's time to process
152                             return true;
153                         } else {
154                             //We don't process this, but we want to possibly reduce the amount of time
155                             //  we sleep so we wake when this message is ready.
156                             if (sleepUntil == 0 || processingTime < sleepUntil) {
157                                 sleepUntil = processingTime;
158                             }
159                             return false;
160                         }
161                     } else {
162                         return true;
163                     }
164                 }
165                 
166 
167                 public long getWaitTime () {
168                     if (sleepUntil == 0) {
169                         // in AvalonSpoolRepository we return 0: why do we change sleepUntil?
170                         // sleepUntil = System.currentTimeMillis();
171                         return 0;
172                     }
173                     long waitTime = sleepUntil - System.currentTimeMillis();
174                     sleepUntil = 0;
175                     return waitTime <= 0 ? 1 : waitTime;
176                 }
177                 
178             });
179     }
180 
181     /***
182      * Returns an arbitrarily selected mail deposited in this Repository for
183      * which the supplied filter's accept method returns true.
184      * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
185      * based on number of retries if the mail is ready for processing.
186      * If no message is ready the method will block until one is, the amount of time to block is
187      * determined by calling the filters getWaitTime method.
188      *
189      * @return  the mail
190      */
191     public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
192         while (!Thread.currentThread().isInterrupted()) {
193             //Loop through until we are either out of pending messages or have a message
194             // that we can lock
195             PendingMessage next = null;
196             while ((next = getNextPendingMessage(filter)) != null && !Thread.currentThread().isInterrupted()) {
197                 //Check whether this is time to expire
198                 
199                 // boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage);
200                 
201                 if (/*shouldProcess && */ lock(next.key)) {
202                     try {
203                         Mail mail = retrieve(next.key);
204                         // Retrieve can return null if the mail is no longer on the spool
205                         // (i.e. another thread has gotten to it first).
206                         // In this case we simply continue to the next key
207                         if (mail == null) {
208                             unlock(next.key);
209                             continue;
210                         }
211                         return mail;
212                     } catch (javax.mail.MessagingException e) {
213                         unlock(next.key);
214                         getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
215                     }
216                 }
217             }
218             //Nothing to do... sleep!
219             long wait_time = filter.getWaitTime();
220             if (wait_time <= 0) {
221                 wait_time = WAIT_LIMIT;
222             }
223             try {
224                 wait (wait_time);
225             } catch (InterruptedException ex) {
226                 throw ex;
227             }
228         }
229         throw new InterruptedException();
230     }
231 
232     /***
233      * Needs to override this method and reset the time to load to zero.
234      * This will force a reload of the pending messages queue once that
235      * is empty... a message that gets added will sit here until that queue
236      * time has passed and the list is then reloaded.
237      */
238     public void store(Mail mc) throws javax.mail.MessagingException {
239         pendingMessagesLoadTime = 0;
240         super.store(mc);
241     }
242 
243     /***
244      * If not empty, gets the next pending message.  Otherwise checks
245      * checks the last time pending messages was loaded and load if
246      * it's been more than 1 second (should be configurable).
247      */
248     private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter filter) {
249         synchronized (pendingMessages) {
250             if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) {
251                 // pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis();
252                 loadPendingMessages(filter);
253                 pendingMessagesLoadTime = Math.max(filter.getWaitTime(), LOAD_TIME_MININUM) + System.currentTimeMillis();
254             }
255 
256             if (pendingMessages.size() == 0) {
257                 return null;
258             } else {
259                 return (PendingMessage)pendingMessages.removeFirst();
260             }
261         }
262     }
263 
264     /***
265      * Retrieves the pending messages that are in the database
266      */
267     private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
268         //Loads a vector with PendingMessage objects
269         synchronized (pendingMessages) {
270             pendingMessages.clear();
271 
272             Connection conn = null;
273             PreparedStatement listMessages = null;
274             ResultSet rsListMessages = null;
275             try {
276                 conn = datasource.getConnection();
277                 listMessages =
278                     conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
279                 listMessages.setString(1, repositoryName);
280                 // Too simplistic.  When filtering, we may need to see
281                 // more than just maxPendingMessages to load the
282                 // cache, so just hope that the driver and server use
283                 // cursors properly.
284                 // --> listMessages.setMaxRows(maxPendingMessages);
285                 rsListMessages = listMessages.executeQuery();
286                 // Continue to have it loop through the list of messages until we hit
287                 // a possible message, or we retrieve maxPendingMessages messages.
288                 // This maxPendingMessages cap is to avoid loading thousands or
289                 // hundreds of thousands of messages when the spool is enourmous.
290                 while (rsListMessages.next() && pendingMessages.size() < maxPendingMessages && !Thread.currentThread().isInterrupted()) {
291                     String key = rsListMessages.getString(1);
292                     String state = rsListMessages.getString(2);
293                     long lastUpdated = rsListMessages.getTimestamp(3).getTime();
294                     String errorMessage = rsListMessages.getString(4);
295                     if (filter.accept(key, state, lastUpdated, errorMessage)) {
296                         pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage));
297                     }
298                 }
299             } catch (SQLException sqle) {
300                 //Log it and avoid reloading for a bit
301                 getLogger().error("Error retrieving pending messages", sqle);
302                 pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 + System.currentTimeMillis();
303             } finally {
304                 theJDBCUtil.closeJDBCResultSet(rsListMessages);
305                 theJDBCUtil.closeJDBCStatement(listMessages);
306                 theJDBCUtil.closeJDBCConnection(conn);
307             }
308         }
309     }
310 
311     /***
312      * Simple class to hold basic information about a message in the spool
313      */
314     class PendingMessage {
315         protected String key;
316         protected String state;
317         protected long lastUpdated;
318         protected String errorMessage;
319 
320         public PendingMessage(String key, String state, long lastUpdated, String errorMessage) {
321             this.key = key;
322             this.state = state;
323             this.lastUpdated = lastUpdated;
324             this.errorMessage = errorMessage;
325         }
326     }
327 }