View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  
22  package org.apache.james.mailrepository;
23  
24  import org.apache.avalon.framework.configuration.Configuration;
25  import org.apache.avalon.framework.configuration.ConfigurationException;
26  
27  import org.apache.james.services.SpoolRepository;
28  import org.apache.mailet.Mail;
29  
30  import java.sql.Connection;
31  import java.sql.PreparedStatement;
32  import java.sql.ResultSet;
33  import java.sql.SQLException;
34  import java.util.LinkedList;
35  
36  /**
37   * Implementation of a SpoolRepository on a database.
38   *
39   * <p>Requires a configuration element in the .conf.xml file of the form:
40   *  <br>&lt;repository destinationURL="town://path"
41   *  <br>            type="MAIL"
42   *  <br>            model="SYNCHRONOUS"/&gt;
43   *  <br>            &lt;driver&gt;sun.jdbc.odbc.JdbcOdbcDriver&lt;/conn&gt;
44   *  <br>            &lt;conn&gt;jdbc:odbc:LocalDB&lt;/conn&gt;
45   *  <br>            &lt;table&gt;Message&lt;/table&gt;
46   *  <br>&lt;/repository&gt;
47   * <p>destinationURL specifies..(Serge??)
48   * <br>Type can be SPOOL or MAIL
49   * <br>Model is currently not used and may be dropped
50   * <br>conn is the location of the ...(Serge)
51   * <br>table is the name of the table in the Database to be used
52   *
53   * <p>Requires a logger called MailRepository.
54   *
55   * <p>Approach for spool manager:
56   *
57   * PendingMessage inner class
58   *
59   * accept() is called....
60   * checks whether needs to load PendingMessages()
61   * tries to get a message()
62   * if none, wait 60
63   *
64   * accept(long) is called
65   * checks whether needs to load PendingMessages
66   * tries to get a message(long)
67   * if none, wait accordingly
68   *
69   * sync checkswhetherneedstoloadPendingMessages()
70   * if pending messages has messages in immediate process, return immediately
71   * if run query in last WAIT_LIMIT time, return immediately
72   * query and build 2 vectors of Pending messages.
73   *  Ones that need immediate processing
74   *  Ones that are delayed.  put them in time order
75   * return
76   *
77   * get_a_message()
78   * loop through immediate messages.
79   *  - remove top message
80   *  - try to lock.  if successful, return.  otherwise loop.
81   * if nothing, return null
82   *
83   * get_a_message(long)
84   * try get_a_message()
85   * check top message in pending.  if ready, then remove, try to lock, return if lock.
86   * return null.
87   *
88   *
89   * @version 1.0.0, 24/04/1999
90   */
91  public class JDBCSpoolRepository extends JDBCMailRepository implements SpoolRepository {
92  
93      /**
94       * How long a thread should sleep when there are no messages to process.
95       */
96      private static int WAIT_LIMIT = 60000;
97      /**
98       * How long we have to wait before reloading the list of pending messages
99       */
100     private static int LOAD_TIME_MININUM = 1000;
101     /**
102      * A queue in memory of messages that need processing
103      */
104     private LinkedList pendingMessages = new LinkedList();
105     /**
106      * When the queue was last read
107      */
108     private long pendingMessagesLoadTime = 0;
109     /**
110      * Maximum size of the pendingMessages queue
111      */
112     private int maxPendingMessages = 0;
113 
114     /**
115      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
116      */
117     public void configure(Configuration conf) throws ConfigurationException {
118         super.configure(conf);
119         maxPendingMessages = conf.getChild("maxcache").getValueAsInteger(1000);
120     }
121 
122     /**
123      * @see org.apache.james.services.SpoolRepository#accept()
124      */
125     public synchronized Mail accept() throws InterruptedException {
126         return accept(new SpoolRepository.AcceptFilter () {
127             public boolean accept (String _, String __, long ___, String ____) {
128                 return true;
129             }
130 
131             public long getWaitTime () {
132                 return 0;
133             }
134         });
135     }
136 
137     /**
138      * @see org.apache.james.services.SpoolRepository#accept(long)
139      */
140     public synchronized Mail accept(final long delay) throws InterruptedException {
141         return accept (new SpoolRepository.AcceptFilter () {
142             long sleepUntil = 0;
143                 
144                 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
145                     if (Mail.ERROR.equals(state)) {
146                         //if it's an error message, test the time
147                         long processingTime = delay + lastUpdated;
148                         if (processingTime < System.currentTimeMillis()) {
149                             //It's time to process
150                             return true;
151                         } else {
152                             //We don't process this, but we want to possibly reduce the amount of time
153                             //  we sleep so we wake when this message is ready.
154                             if (sleepUntil == 0 || processingTime < sleepUntil) {
155                                 sleepUntil = processingTime;
156                             }
157                             return false;
158                         }
159                     } else {
160                         return true;
161                     }
162                 }
163                 
164 
165                 public long getWaitTime () {
166                     if (sleepUntil == 0) {
167                         // in AvalonSpoolRepository we return 0: why do we change sleepUntil?
168                         // sleepUntil = System.currentTimeMillis();
169                         return 0;
170                     }
171                     long waitTime = sleepUntil - System.currentTimeMillis();
172                     sleepUntil = 0;
173                     return waitTime <= 0 ? 1 : waitTime;
174                 }
175                 
176             });
177     }
178 
179     /**
180     /**
181      * @see org.apache.james.services.SpoolRepository#accept(org.apache.james.services.SpoolRepository.AcceptFilter)
182      */
183     public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
184         while (!Thread.currentThread().isInterrupted()) {
185             //Loop through until we are either out of pending messages or have a message
186             // that we can lock
187             PendingMessage next = null;
188             while ((next = getNextPendingMessage(filter)) != null && !Thread.currentThread().isInterrupted()) {
189                 //Check whether this is time to expire
190                 
191                 // boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage);
192                 
193                 if (/*shouldProcess && */ lock(next.key)) {
194                     try {
195                         Mail mail = retrieve(next.key);
196                         // Retrieve can return null if the mail is no longer on the spool
197                         // (i.e. another thread has gotten to it first).
198                         // In this case we simply continue to the next key
199                         if (mail == null) {
200                             unlock(next.key);
201                             continue;
202                         }
203                         return mail;
204                     } catch (javax.mail.MessagingException e) {
205                         unlock(next.key);
206                         getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
207                     }
208                 }
209             }
210             //Nothing to do... sleep!
211             long wait_time = filter.getWaitTime();
212             if (wait_time <= 0) {
213                 wait_time = WAIT_LIMIT;
214             }
215             try {
216                 wait (wait_time);
217             } catch (InterruptedException ex) {
218                 throw ex;
219             }
220         }
221         throw new InterruptedException();
222     }
223 
224     /**
225      * Needs to override this method and reset the time to load to zero.
226      * This will force a reload of the pending messages queue once that
227      * is empty... a message that gets added will sit here until that queue
228      * time has passed and the list is then reloaded.
229      * 
230      * @see org.apache.james.mailrepository.AbstractMailRepository#store(Mail)
231      */
232     public void store(Mail mc) throws javax.mail.MessagingException {
233         pendingMessagesLoadTime = 0;
234         super.store(mc);
235     }
236 
237     /**
238      * If not empty, gets the next pending message.  Otherwise checks
239      * checks the last time pending messages was loaded and load if
240      * it's been more than 1 second (should be configurable).
241      */
242     private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter filter) {
243         synchronized (pendingMessages) {
244             if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) {
245                 // pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis();
246                 loadPendingMessages(filter);
247                 pendingMessagesLoadTime = Math.max(filter.getWaitTime(), LOAD_TIME_MININUM) + System.currentTimeMillis();
248             }
249 
250             if (pendingMessages.size() == 0) {
251                 return null;
252             } else {
253                 return (PendingMessage)pendingMessages.removeFirst();
254             }
255         }
256     }
257 
258     /**
259      * Retrieves the pending messages that are in the database
260      */
261     private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
262         //Loads a vector with PendingMessage objects
263         synchronized (pendingMessages) {
264             pendingMessages.clear();
265 
266             Connection conn = null;
267             PreparedStatement listMessages = null;
268             ResultSet rsListMessages = null;
269             try {
270                 conn = datasource.getConnection();
271                 listMessages =
272                     conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
273                 listMessages.setString(1, repositoryName);
274                 // Too simplistic.  When filtering, we may need to see
275                 // more than just maxPendingMessages to load the
276                 // cache, so just hope that the driver and server use
277                 // cursors properly.
278                 // --> listMessages.setMaxRows(maxPendingMessages);
279                 rsListMessages = listMessages.executeQuery();
280                 // Continue to have it loop through the list of messages until we hit
281                 // a possible message, or we retrieve maxPendingMessages messages.
282                 // This maxPendingMessages cap is to avoid loading thousands or
283                 // hundreds of thousands of messages when the spool is enourmous.
284                 while (rsListMessages.next() && pendingMessages.size() < maxPendingMessages && !Thread.currentThread().isInterrupted()) {
285                     String key = rsListMessages.getString(1);
286                     String state = rsListMessages.getString(2);
287                     long lastUpdated = rsListMessages.getTimestamp(3).getTime();
288                     String errorMessage = rsListMessages.getString(4);
289                     if (filter.accept(key, state, lastUpdated, errorMessage)) {
290                         pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage));
291                     }
292                 }
293             } catch (SQLException sqle) {
294                 //Log it and avoid reloading for a bit
295                 getLogger().error("Error retrieving pending messages", sqle);
296                 pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 + System.currentTimeMillis();
297             } finally {
298                 theJDBCUtil.closeJDBCResultSet(rsListMessages);
299                 theJDBCUtil.closeJDBCStatement(listMessages);
300                 theJDBCUtil.closeJDBCConnection(conn);
301             }
302         }
303     }
304 
305     /**
306      * Simple class to hold basic information about a message in the spool
307      */
308     class PendingMessage {
309         protected String key;
310         protected String state;
311         protected long lastUpdated;
312         protected String errorMessage;
313 
314         public PendingMessage(String key, String state, long lastUpdated, String errorMessage) {
315             this.key = key;
316             this.state = state;
317             this.lastUpdated = lastUpdated;
318             this.errorMessage = errorMessage;
319         }
320     }
321 }