View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.util.mordred;
21  
22  import org.apache.avalon.excalibur.datasource.DataSourceComponent;
23  import org.apache.avalon.framework.activity.Disposable;
24  import org.apache.avalon.framework.configuration.Configurable;
25  import org.apache.avalon.framework.configuration.Configuration;
26  import org.apache.avalon.framework.configuration.ConfigurationException;
27  import org.apache.avalon.framework.logger.AbstractLogEnabled;
28  
29  import java.io.PrintWriter;
30  import java.io.StringWriter;
31  import java.sql.Connection;
32  import java.sql.SQLException;
33  import java.util.ArrayList;
34  
35  
36  /***
37   * <p>
38   * This is a <b>reliable</b> DataSource implementation, based on the pooling logic written for <a
39   * href="http://share.whichever.com/">Town</a> and the configuration found in Avalon's excalibur
40   * code.
41   * </p>
42   *
43   * <p>
44   * This uses the normal <code>java.sql.Connection</code> object and
45   * <code>java.sql.DriverManager</code>.  The Configuration is like this:
46   * <pre>
47   *   &lt;jdbc&gt;
48   *     &lt;pool-controller min="<i>5</i>" max="<i>10</i>" connection-class="<i>my.overrided.ConnectionClass</i>"&gt;
49   *       &lt;keep-alive&gt;select 1&lt;/keep-alive&gt;
50   *     &lt;/pool-controller&gt;
51   *     &lt;driver&gt;<i>com.database.jdbc.JdbcDriver</i>&lt;/driver&gt;
52   *     &lt;dburl&gt;<i>jdbc:driver://host/mydb</i>&lt;/dburl&gt;
53   *     &lt;user&gt;<i>username</i>&lt;/user&gt;
54   *     &lt;password&gt;<i>password</i>&lt;/password&gt;
55   *   &lt;/jdbc&gt;
56   * </pre>
57   * </p>
58   *
59   * @version CVS $Revision: 494012 $
60   * @since 4.0
61   */
62  public class JdbcDataSource extends AbstractLogEnabled
63      implements Configurable,
64                 Runnable,
65                 Disposable,
66                 DataSourceComponent {
67      // The limit that an active connection can be running
68      public static final long ACTIVE_CONN_TIME_LIMIT = 60000; // (one minute)
69      public static final long ACTIVE_CONN_HARD_TIME_LIMIT = 2*ACTIVE_CONN_TIME_LIMIT;
70      // How long before you kill off a connection due to inactivity
71      public static final long CONN_IDLE_LIMIT        = 600000; // (10 minutes)
72      private static final boolean DEEP_DEBUG         = false;
73      private static int total_served                 = 0;
74      // This is a temporary variable used to track how many active threads
75      // are in createConnection().  This is to prevent to many connections
76      // from being opened at once.
77      private int connCreationsInProgress             = 0;
78      // The error message is the conn pooler cannot serve connections for whatever reason
79      private String connErrorMessage                 = null;
80      // the last time a connection was created...
81      private long connLastCreated                    = 0;
82      // connection number for like of this broker
83      private int connectionCount;
84      // Driver class
85      private String jdbcDriver;
86      // Password to login to database
87      private String jdbcPassword;
88      // Server to connect to database (this really is the jdbc URL)
89      private String jdbcURL;
90      // Username to login to database
91      private String jdbcUsername;
92      // Maximum number of connections to have open at any point
93      private int maxConn                             = 0;
94      // collection of connection objects
95      private ArrayList pool;
96      // Thread that checks for dead/aged connections and removes them from pool
97      private Thread reaper;
98      // Flag to indicate whether reaper thread should run
99      private boolean reaperActive                    = false;
100     // a SQL command to execute to see if the connection is still ok
101     private String verifyConnSQL;
102 
103     /***
104      * Implements the ConnDefinition behavior when a connection is needed. Checks the pool of
105      * connections to see if there is one available.  If there is not and we are below the max
106      * number of connections limit, it tries to create another connection.  It retries this 10
107      * times until a connection is available or can be created
108      *
109      * @return java.sql.Connection
110      * @throws SQLException Document throws!
111      */
112     public Connection getConnection() throws SQLException {
113         //If the conn definition has a fatal connection problem, need to return that error
114         if(connErrorMessage != null) {
115             throw new SQLException(connErrorMessage);
116         }
117         //Look through our list of open connections right now, starting from beginning.
118         //If we find one, book it.
119         int count                                   = total_served++;
120         if(DEEP_DEBUG) {
121             StringBuffer deepDebugBuffer =
122                 new StringBuffer(128)
123                         .append((new java.util.Date()).toString())
124                         .append(" trying to get a connection (")
125                         .append(count)
126                         .append(")");
127             System.out.println(deepDebugBuffer.toString());
128         }
129         for(int attempts = 1; attempts <= 100; attempts++) {
130             synchronized(pool) {
131                 for(int i = 0; i < pool.size(); i++) {
132                     PoolConnEntry entry = (PoolConnEntry)pool.get(i);
133                     //Set the appropriate flags to make this connection
134                     //marked as in use
135                     try {
136                         if(entry.lock()) {
137                             if(DEEP_DEBUG) {
138                                 StringBuffer deepDebugBuffer =
139                                     new StringBuffer(128)
140                                             .append((new java.util.Date()).toString())
141                                             .append(" return a connection (")
142                                             .append(count)
143                                             .append(")");
144                                 System.out.println(deepDebugBuffer.toString());
145                             }
146                             return entry;
147                         }
148                     } catch(SQLException se) {
149                         //Somehow a closed connection appeared in our pool.
150                         //Remove it immediately.
151                         finalizeEntry(entry);
152                         continue;
153                     }
154                     //we were unable to get a lock on this entry.. so continue through list
155                 } //loop through existing connections
156                 //If we have 0, create another
157                 if(DEEP_DEBUG) {
158                     System.out.println(pool.size());
159                 }
160                 try {
161                     if(pool.size() == 0) {
162                         //create a connection
163                         PoolConnEntry entry = createConn();
164                         if(entry != null) {
165                             if(DEEP_DEBUG) {
166                                 StringBuffer deepDebugBuffer =
167                                     new StringBuffer(128)
168                                             .append((new java.util.Date()).toString())
169                                             .append(" returning new connection (")
170                                             .append(count)
171                                             .append(")");
172                                 System.out.println(deepDebugBuffer.toString());
173                             }
174                             return entry;
175                         }
176                         //looks like a connection was already created
177                     } else {
178                         //Since we didn't find one, and we have < max connections, then consider whether
179                         //  we create another
180                         //if we've hit the 3rd attempt without getting a connection,
181                         //  let's create another to anticipate a slow down
182                         if((attempts == 2) && (pool.size() < maxConn || maxConn == 0)) {
183                             PoolConnEntry entry = createConn();
184                             if(entry != null) {
185                                 if(DEEP_DEBUG) {
186                                     StringBuffer deepDebugBuffer =
187                                         new StringBuffer(32)
188                                                 .append(" returning new connection (")
189                                                 .append(count)
190                                                 .append(")");
191                                     System.out.println(deepDebugBuffer.toString());
192                                 }
193                                 return entry;
194                             } else {
195                                 attempts = 1;
196                             }
197                         }
198                     }
199                 } catch(SQLException sqle) {
200                     //Ignore... error creating the connection
201                     StringWriter sout = new StringWriter();
202                     PrintWriter pout  = new PrintWriter(sout, true);
203                     pout.println("Error creating connection: ");
204                     sqle.printStackTrace(pout);
205                     if (getLogger().isErrorEnabled()) {
206                         getLogger().error(sout.toString());
207                     }
208                 }
209             }
210             //otherwise sleep 50ms 10 times, then create a connection
211             try {
212                 Thread.currentThread().sleep(50);
213             } catch(InterruptedException ie) {
214             }
215         }
216         // Give up... no connections available
217         throw new SQLException("Giving up... no connections available.");
218     }
219 
220     /***
221      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
222      */
223     public void configure(final Configuration configuration)
224                    throws ConfigurationException {
225         try {
226             jdbcDriver    = configuration.getChild("driver").getValue(null);
227             jdbcURL       = configuration.getChild("dburl").getValue(null);
228             jdbcUsername  = configuration.getChild("user").getValue(null);
229             jdbcPassword  = configuration.getChild("password").getValue(null);
230             maxConn       = configuration.getChild("max").getValueAsInteger(2);
231             //logfilename?
232             verifyConnSQL = configuration.getChild("keep-alive").getValue(null);
233             //Not support from Town: logfilename
234             //Not supporting from Excalibur: pool-controller, min, auto-commit, oradb, connection-class
235             if(jdbcDriver == null) {
236                 throw new ConfigurationException("You need to specify a valid driver, e.g., <driver>my.class</driver>");
237             }
238             try {
239                 if (getLogger().isDebugEnabled()) {
240                     getLogger().debug("Loading new driver: " + jdbcDriver);
241                 }
242                 // TODO: Figure out why this breaks when we change the Class.forName to
243                 //       a loadClass method call on the class loader.
244                 // DO NOT MESS WITH THIS UNLESS YOU ARE WILLING TO TEST
245                 // AND FIX THE PROBLEMS!
246                 Class.forName(jdbcDriver, true, Thread.currentThread().getContextClassLoader());
247                 // These variations do NOT work:
248                 // getClass().getClassLoader().loadClass(jdbcDriver);                    -- DON'T USE -- BROKEN!!
249                 // Thread.currentThread().getContextClassLoader().loadClass(jdbcDriver); -- DON'T USE -- BROKEN!!
250             } catch(ClassNotFoundException cnfe) {
251                 StringBuffer exceptionBuffer =
252                     new StringBuffer(128)
253                             .append("'")
254                             .append(jdbcDriver)
255                             .append("' could not be found in classloader.  Please specify a valid JDBC driver");
256                 String exceptionMessage = exceptionBuffer.toString();
257                 getLogger().error(exceptionMessage);
258                 throw new ConfigurationException(exceptionMessage);
259             }
260             if(jdbcURL == null) {
261                 throw new ConfigurationException("You need to specify a valid JDBC connection string, e.g., <dburl>jdbc:driver:database</dburl>");
262             }
263             if(maxConn < 0) {
264                 throw new ConfigurationException("Maximum number of connections specified must be at least 1 (0 means no limit).");
265             }
266             if (getLogger().isDebugEnabled()) {
267                 getLogger().debug("Starting connection pooler");
268                 getLogger().debug("driver = " + jdbcDriver);
269                 getLogger().debug("dburl = " + jdbcURL);
270                 getLogger().debug("username = " + jdbcUsername);
271                 //We don't show the password
272                 getLogger().debug("max connections = " + maxConn);
273             }
274             pool         = new ArrayList();
275             reaperActive = true;
276             reaper       = new Thread(this);
277             reaper.setDaemon(true);
278             reaper.start();
279         } catch(ConfigurationException ce) {
280             //Let this pass through...
281             throw ce;
282         }
283          catch(Exception e) {
284             throw new ConfigurationException("Error configuring JdbcDataSource", e);
285         }
286     }
287 
288     /***
289      * The dispose operation is called at the end of a components lifecycle.
290      * Cleans up all JDBC connections.
291      *
292      * @throws Exception if an error is encountered during shutdown
293      */
294     public void dispose() {
295         // Stop the background monitoring thread
296         if(reaper != null) {
297             reaperActive = false;
298             //In case it's sleeping, help it quit faster
299             reaper.interrupt();
300             reaper = null;
301         }
302         // The various entries will finalize themselves once the reference
303         // is removed, so no need to do it here
304     }
305 
306     /***
307      * Implements the ConnDefinition behavior when something bad has happened to a connection. If a
308      * sql command was provided in the properties file, it will run this and attempt to determine
309      * whether the connection is still valid.  If it is, it recycles this connection back into the
310      * pool.  If it is not, it closes the connection.
311      *
312      * @param entry the connection that had problems
313      * @deprecated - No longer used in the new approach.
314      */
315     public void killConnection(PoolConnEntry entry) {
316         if(entry != null) {
317             // if we were provided SQL to test the connection with, we will use
318             // this and possibly just release the connection after clearing warnings
319             if(verifyConnSQL != null) {
320                 try {
321                     // Test this connection
322                     java.sql.Statement stmt = null;
323                     try {
324                         stmt = entry.createStatement();
325                         stmt.execute(verifyConnSQL);
326                     } finally {
327                         try {
328                             if (stmt != null) {
329                                 stmt.close();
330                             }
331                         } catch (SQLException sqle) {
332                             // Failure to close ignored on test connection
333                         }
334                     }
335                     // Passed test... recycle the entry
336                     entry.unlock();
337                 } catch(SQLException e1) {
338                     // Failed test... close the entry
339                     finalizeEntry(entry);
340                 }
341             } else {
342                 // No SQL was provided... we have to kill this entry to be sure
343                 finalizeEntry(entry);
344             }
345             return;
346         } else {
347             if (getLogger().isWarnEnabled()) {
348                 getLogger().warn("----> Could not find connection to kill!!!");
349             }
350             return;
351         }
352     }
353 
354     /***
355      * Implements the ConnDefinition behavior when a connection is no longer needed. This resets
356      * flags on the wrapper of the connection to allow others to use this connection.
357      *
358      * @param entry
359      */
360     public void releaseConnection(PoolConnEntry entry) {
361         //PoolConnEntry entry = findEntry(conn);
362         if(entry != null) {
363             entry.unlock();
364             return;
365         } else {
366             if (getLogger().isWarnEnabled()) {
367                 getLogger().warn("----> Could not find the connection to free!!!");
368             }
369             return;
370         }
371     }
372 
373     /***
374      * Background thread that checks if there are fewer connections open than minConn specifies,
375      * and checks whether connections have been checked out for too long, killing them.
376      */
377     public void run() {
378         try {
379             while(reaperActive) {
380                 synchronized(pool) {
381                     for(int i = 0; i < pool.size(); i++) try {
382                         PoolConnEntry entry = (PoolConnEntry)pool.get(i);
383                         long age            = System.currentTimeMillis() - entry.getLastActivity();
384                         synchronized(entry) {
385                             if((entry.getStatus() == PoolConnEntry.ACTIVE) &&
386                                (age > ACTIVE_CONN_HARD_TIME_LIMIT)) {
387                                 StringBuffer logBuffer =
388                                     new StringBuffer(128)
389                                             .append(" ***** connection ")
390                                             .append(entry.getId())
391                                             .append(" is way too old: ")
392                                             .append(age)
393                                             .append(" > ")
394                                             .append(ACTIVE_CONN_HARD_TIME_LIMIT)
395                                             .append(" and will be closed.");
396                                 getLogger().info(logBuffer.toString());
397                                 // This connection is way too old...
398                                 // kill it no matter what
399                                 finalizeEntry(entry);
400                                 continue;
401                             }
402                             if((entry.getStatus() == PoolConnEntry.ACTIVE) &&
403                                (age > ACTIVE_CONN_TIME_LIMIT)) {
404                                 StringBuffer logBuffer =
405                                     new StringBuffer(128)
406                                             .append(" ***** connection ")
407                                             .append(entry.getId())
408                                             .append(" is way too old: ")
409                                             .append(age)
410                                             .append(" > ")
411                                             .append(ACTIVE_CONN_TIME_LIMIT);
412                                 getLogger().info(logBuffer.toString());
413                                 // This connection is way too old...
414                                 // just log it for now.
415                                 continue;
416                             }
417                             if((entry.getStatus() == PoolConnEntry.AVAILABLE) &&
418                                (age > CONN_IDLE_LIMIT)) {
419                                 //We've got a connection that's too old... kill it
420                                 finalizeEntry(entry);
421                                 continue;
422                             }
423                         }
424                     }
425                     catch (Throwable ex)
426                     {
427                         StringWriter sout = new StringWriter();
428                         PrintWriter pout = new PrintWriter(sout, true);
429                         pout.println("Reaper Error: ");
430                         ex.printStackTrace(pout);
431                         if (getLogger().isErrorEnabled()) {
432                             getLogger().error(sout.toString());
433                         }
434                     }
435                 }
436                 try {
437                     // Check for activity every 5 seconds
438                     Thread.sleep(5000L);
439                 } catch(InterruptedException ex) {
440                 }
441             }
442         } finally {
443             Thread.currentThread().interrupted();
444         }
445     }
446 
447     protected void debug(String message) {
448         getLogger().debug(message);
449     }
450 
451     protected void info(String message) {
452         getLogger().info(message);
453     }
454 
455     /*
456      * This is a real hack, but oh well for now
457      */
458     protected void warn(String message) {
459         getLogger().warn(message);
460     }
461 
462     /***
463      * Creates a new connection as per these properties, adds it to the pool, and logs the creation.
464      *
465      * @return PoolConnEntry the new connection wrapped as an entry
466      * @throws SQLException
467      */
468     private PoolConnEntry createConn() throws SQLException {
469         PoolConnEntry entry = null;
470         synchronized(pool) {
471             if(connCreationsInProgress > 0) {
472                 //We are already creating one in another place
473                 return null;
474             }
475             long now = System.currentTimeMillis();
476             if((now - connLastCreated) < (1000 * pool.size())) {
477                 //We don't want to scale up too quickly...
478                 if(DEEP_DEBUG) {
479                     System.err.println("We don't want to scale up too quickly");
480                 }
481                 return null;
482             }
483             if((maxConn == 0) || (pool.size() < maxConn)) {
484                 connCreationsInProgress++;
485                 connLastCreated = now;
486             } else {
487                 // We've already hit a limit... fail silently
488                 if (getLogger().isDebugEnabled())
489                 {
490                     StringBuffer logBuffer =
491                         new StringBuffer(128)
492                                 .append("Connection limit hit... ")
493                                 .append(pool.size())
494                                 .append(" in pool and ")
495                                 .append(connCreationsInProgress)
496                                 .append(" + on the way.");
497                     getLogger().debug(logBuffer.toString());
498                 }
499                 return null;
500             }
501             try {
502                 entry = new PoolConnEntry(this,
503                                           java.sql.DriverManager.getConnection(jdbcURL, jdbcUsername,
504                                                                                jdbcPassword),
505                                           ++connectionCount);
506                 if (getLogger().isDebugEnabled())
507                 {
508                     getLogger().debug("Opening connection " + entry);
509                 }
510                 entry.lock();
511                 pool.add(entry);
512                 return entry;
513             } catch(SQLException sqle) {
514                 //Shouldn't ever happen, but it did, just return null.
515                 // Exception from DriverManager.getConnection() - log it, and return null
516                 StringWriter sout = new StringWriter();
517                 PrintWriter pout = new PrintWriter(sout, true);
518                 pout.println("Error creating connection: ");
519                 sqle.printStackTrace(pout);
520                 if (getLogger().isErrorEnabled()) {
521                     getLogger().error(sout.toString());
522                 }
523                 return null;
524             } finally {
525                     connCreationsInProgress--;
526             }
527         }
528     }
529 
530     /***
531      * Closes a connection and removes it from the pool.
532      *
533      * @param entry entry
534      */
535     private void finalizeEntry(PoolConnEntry entry) {
536         synchronized(pool) {
537             try {
538                 entry.finalize();
539             } catch(Exception fe) {
540             }
541             pool.remove(entry);
542         }
543     }
544 }