View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.util.connection;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.net.ServerSocket;
25  import java.net.Socket;
26  import java.net.SocketException;
27  import java.util.ArrayList;
28  import java.util.Iterator;
29  
30  import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
31  import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
32  import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
33  import org.apache.avalon.excalibur.pool.ObjectFactory;
34  import org.apache.avalon.excalibur.pool.Pool;
35  import org.apache.avalon.excalibur.pool.Poolable;
36  import org.apache.excalibur.thread.ThreadPool ;
37  import org.apache.avalon.framework.activity.Initializable;
38  import org.apache.avalon.framework.container.ContainerUtil;
39  import org.apache.avalon.framework.logger.AbstractLogEnabled;
40  
41  
42  /***
43   * Represents a single server socket managed by a connection manager.
44   * The connection manager will spawn a single ServerConnection for each
45   * server socket that the connection manager is managing.
46   *
47   */
48  public class ServerConnection extends AbstractLogEnabled
49      implements Initializable, Runnable {
50  
51      /***
52       * This is a hack to deal with the fact that there appears to be
53       * no platform-independent way to break out of a ServerSocket
54       * accept() call.  On some platforms closing either the ServerSocket
55       * itself, or its associated InputStream, causes the accept
56       * method to exit.  Unfortunately, this behavior is not consistent
57       * across platforms.  The deal with this, we introduce a polling
58       * loop of 20 seconds for the server socket.  This introduces a
59       * cost across platforms, but is necessary to maintain cross-platform
60       * functionality.
61       */
62      private static int POLLING_INTERVAL = 20*1000;
63  
64      /***
65       * The server socket which this connection is managing
66       */
67      private ServerSocket serverSocket;
68  
69      /***
70       * The connection handler factory that generates connection
71       * handlers to manage client connections to this server socket
72       */
73      private ConnectionHandlerFactory handlerFactory;
74  
75      /***
76       * The pool that produces ClientConnectionRunners
77       */
78      private Pool runnerPool;
79  
80      /***
81       * The factory used to provide ClientConnectionRunner objects
82       */
83      private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
84  
85      /***
86       * The thread pool used to spawn individual threads used to manage each
87       * client connection.
88       */
89      private ThreadPool connThreadPool;
90  
91      /***
92       * The timeout for client sockets spawned off this connection.
93       */
94      private int socketTimeout;
95  
96      /***
97       * The maximum number of open client connections that this server
98       * connection will allow.
99       */
100     private int maxOpenConn;
101 
102     /***
103      * A collection of client connection runners.
104      */
105     private final ArrayList clientConnectionRunners = new ArrayList();
106 
107     /***
108      * The thread used to manage this server connection.
109      */
110     private Thread serverConnectionThread;
111 
112     /***
113      * The sole constructor for a ServerConnection.
114      *
115      * @param serverSocket the ServerSocket associated with this ServerConnection
116      * @param handlerFactory the factory that generates ConnectionHandlers for the client
117      *                       connections spawned off this ServerConnection
118      * @param threadPool the ThreadPool used to obtain handler threads
119      * @param timeout the client idle timeout for this ServerConnection's client connections
120      * @param maxOpenConn the maximum number of open client connections allowed for this
121      *                    ServerConnection
122      */
123     public ServerConnection(ServerSocket serverSocket,
124                             ConnectionHandlerFactory handlerFactory,
125                             ThreadPool threadPool,
126                             int timeout,
127                             int maxOpenConn) {
128         this.serverSocket = serverSocket;
129         this.handlerFactory = handlerFactory;
130         connThreadPool = threadPool;
131         socketTimeout = timeout;
132         this.maxOpenConn = maxOpenConn;
133     }
134 
135     /***
136      * @see org.apache.avalon.framework.activity.Initializable#initialize()
137      */
138     public void initialize() throws Exception {
139         runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
140         ContainerUtil.enableLogging(runnerPool,getLogger());
141         ContainerUtil.initialize(runnerPool);
142     }
143 
144     /***
145      * The dispose operation is called by the owning ConnectionManager
146      * at the end of its lifecycle.  Cleans up the server connection, forcing
147      * everything to finish.
148      */
149     public void dispose() {
150         if (getLogger().isDebugEnabled()) {
151             getLogger().debug("Disposing server connection..." + this.toString());
152         }
153         synchronized( this ) {
154             if( null != serverConnectionThread ) {
155                 // Execution of this block means that the run() method
156                 // hasn't finished yet.  So we interrupt the thread
157                 // to terminate run() and wait for the run() method
158                 // to finish.  The notifyAll() at the end of run() will
159                 // wake this thread and allow dispose() to end.
160                 Thread thread = serverConnectionThread;
161                 serverConnectionThread = null;
162                 thread.interrupt();
163                 try {
164                     serverSocket.close();
165                 } catch (IOException ie) {
166                     // Ignored - we're doing this to break out of the
167                     // accept.  This minimizes the time required to
168                     // shutdown the server.  Unfortunately, this is
169                     // not guaranteed to work on all platforms.  See
170                     // the comments for POLLING_INTERVAL
171                 }
172                 try {
173                     if (POLLING_INTERVAL > 0) {
174                         wait(2L*POLLING_INTERVAL);
175                     } else {
176                         wait();
177                     }
178                 } catch (InterruptedException ie) {
179                     // Expected - just complete dispose()
180                 }
181             }
182             ContainerUtil.dispose(runnerPool);
183             runnerPool = null;
184         }
185 
186         getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
187 
188         synchronized (clientConnectionRunners) {
189             Iterator runnerIterator = clientConnectionRunners.iterator();
190             while( runnerIterator.hasNext() ) {
191                 ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
192                 runner.dispose();
193                 runner = null;
194             }
195             clientConnectionRunners.clear();
196         }
197 
198         getLogger().debug("Cleaned up clients - " + this.toString());
199 
200     }
201 
202     /***
203      * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
204      *
205      * @param clientConnectionRunner the ClientConnectionRunner to be added
206      */
207     private ClientConnectionRunner addClientConnectionRunner()
208             throws Exception {
209         synchronized (clientConnectionRunners) {
210             ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
211             clientConnectionRunners.add(clientConnectionRunner);
212             if (getLogger().isDebugEnabled()) {
213                 getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
214             }
215             return clientConnectionRunner;
216         }
217     }
218 
219     /***
220      * Removes a ClientConnectionRunner from the set managed by this ServerConnection object.
221      *
222      * @param clientConnectionRunner the ClientConnectionRunner to be removed
223      */
224     private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
225 
226        /*
227         * checking runnerPool avoids 'dead-lock' when service is disposing :
228         * (dispose() calls dispose all runners)
229         * but runner is 'running' and cleans up on exit
230         * this situation will result in a dead-lock on 'clientConnectionRunners'
231         */
232         if( runnerPool == null ) {
233             getLogger().info("ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : " + clientConnectionRunner );
234             return;
235         }
236         
237         synchronized (clientConnectionRunners) {
238             if (clientConnectionRunners.remove(clientConnectionRunner)) {
239                 if (getLogger().isDebugEnabled()) {
240                     getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
241                 }
242                 runnerPool.put(clientConnectionRunner);
243             }
244         }
245 
246         synchronized (this) { notify(); } // match the wait(...) in the run() inner loop before accept().
247     }
248 
249     /***
250      * Provides the body for the thread of execution for a ServerConnection.
251      * Connections made to the server socket are passed to an appropriate,
252      * newly created, ClientConnectionRunner
253      */
254     public void run() {
255         serverConnectionThread = Thread.currentThread();
256 
257         int ioExceptionCount = 0;
258         try {
259             serverSocket.setSoTimeout(POLLING_INTERVAL);
260         } catch (SocketException se) {
261             // Ignored - for the moment
262         }
263 
264         if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
265             StringBuffer debugBuffer =
266                 new StringBuffer(128)
267                     .append(serverConnectionThread.getName())
268                     .append(" is listening on ")
269                     .append(serverSocket.toString());
270             getLogger().debug(debugBuffer.toString());
271         }
272         while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) {
273             try {
274                 Socket clientSocket = null;
275                 try {
276                     while (maxOpenConn > 0 && clientConnectionRunners.size() >= maxOpenConn) {
277                         getLogger().warn("Maximum number of open connections (" +  clientConnectionRunners.size() + ") in use.");
278                         synchronized (this) { wait(10000); }
279                     }
280 
281                     clientSocket = serverSocket.accept();
282 
283                 } catch( InterruptedIOException iioe ) {
284                     // This exception is expected upon ServerConnection shutdown.
285                     // See the POLLING_INTERVAL comment
286                     continue;
287                 } catch( IOException se ) {
288                     if (ioExceptionCount > 0) {
289                         getLogger().error( "Fatal exception while listening on server socket.  Terminating connection.", se );
290                         break;
291                     } else {
292                         continue;
293                     }
294                 } catch( SecurityException se ) {
295                     getLogger().error( "Fatal exception while listening on server socket.  Terminating connection.", se );
296                     break;
297                 }
298                 ClientConnectionRunner runner = null;
299                 synchronized (clientConnectionRunners) {
300                     if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
301                         if (getLogger().isWarnEnabled()) {
302                            getLogger().warn("Maximum number of open connections exceeded - refusing connection.  Current number of connections is " + clientConnectionRunners.size());
303                            if (getLogger().isWarnEnabled()) {
304                                Iterator runnerIterator = clientConnectionRunners.iterator();
305                                getLogger().info("Connections: ");
306                                while( runnerIterator.hasNext() ) {
307                                    getLogger().info("    " + ((ClientConnectionRunner)runnerIterator.next()).toString());
308                                }
309                            }
310                         }
311                         try {
312                             clientSocket.close();
313                         } catch (IOException ignored) {
314                             // We ignore this exception, as we already have an error condition.
315                         }
316                         continue;
317                     } else {
318                         clientSocket.setSoTimeout(socketTimeout);
319                         runner = addClientConnectionRunner();
320                         runner.setSocket(clientSocket);
321                     }
322                 }
323                 setupLogger( runner );
324                 try {
325                     connThreadPool.execute( runner );
326                 } catch (Exception e) {
327                     // This error indicates that the underlying thread pool
328                     // is out of threads.  For robustness, we catch this and
329                     // cleanup
330                     getLogger().error("Internal error - insufficient threads available to service request.  " +
331                                       Thread.activeCount() + " threads in service request pool.", e);
332                     try {
333                         clientSocket.close();
334                     } catch (IOException ignored) {
335                         // We ignore this exception, as we already have an error condition.
336                     }
337                     // In this case, the thread will not remove the client connection runner,
338                     // so we must.
339                     removeClientConnectionRunner(runner);
340                 }
341             } catch( IOException ioe ) {
342                 getLogger().error( "Exception accepting connection", ioe );
343             } catch( Throwable e ) {
344                 getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
345             }
346         }
347         synchronized( this ) {
348             serverConnectionThread = null;
349             Thread.currentThread().interrupted();
350             notifyAll();
351         }
352     }
353 
354     /***
355      * An inner class to provide the actual body of the thread of execution
356      * that occurs upon a client connection.
357      *
358      */
359     class ClientConnectionRunner extends AbstractLogEnabled
360         implements Poolable, Runnable  {
361 
362         /***
363          * The Socket that this client connection is using for transport.
364          */
365         private Socket clientSocket;
366 
367         /***
368          * The thread of execution associated with this client connection.
369          */
370         private Thread clientSocketThread;
371 
372         /***
373          * Returns string for diagnostic logging
374          */
375         public String toString() {
376             return getClass().getName() + " for " + clientSocket + " on " + clientSocketThread;
377         }
378 
379         public ClientConnectionRunner() {
380         }
381 
382         /***
383          * The dispose operation that terminates the runner.  Should only be
384          * called by the ServerConnection that owns the ClientConnectionRunner
385          */
386         public void dispose() {
387             synchronized( this ) {
388                 if (null != clientSocketThread) {
389                     // Execution of this block means that the run() method
390                     // hasn't finished yet.  So we interrupt the thread
391                     // to terminate run() and wait for the run() method
392                     // to finish.  The notifyAll() at the end of run() will
393                     // wake this thread and allow dispose() to end.
394                     clientSocketThread.interrupt();
395                     clientSocketThread = null;
396                     try {
397                         wait();
398                     } catch (InterruptedException ie) {
399                         // Expected - return from the method
400                     }
401                 }
402             }
403         }
404 
405         /***
406          * Sets the socket for a ClientConnectionRunner.
407          *
408          * @param socket the client socket associated with this ClientConnectionRunner
409          */
410         public void setSocket(Socket socket) {
411             clientSocket = socket;
412         }
413 
414         /***
415          * Provides the body for the thread of execution dealing with a particular client
416          * connection.  An appropriate ConnectionHandler is created, applied, executed,
417          * and released.
418          */
419         public void run() {
420             ConnectionHandler handler = null;
421             try {
422                 clientSocketThread = Thread.currentThread();
423 
424                 handler = ServerConnection.this.handlerFactory.createConnectionHandler();
425                 String connectionString = null;
426                 if( getLogger().isDebugEnabled() ) {
427                     connectionString = getConnectionString();
428                     String message = "Starting " + connectionString;
429                     getLogger().debug( message );
430                 }
431 
432                 handler.handleConnection(clientSocket);
433 
434                 if( getLogger().isDebugEnabled() ) {
435                     String message = "Ending " + connectionString;
436                     getLogger().debug( message );
437                 }
438 
439             } catch( Throwable e ) {
440                 getLogger().error( "Error handling connection", e );
441             } finally {
442 
443                 // Close the underlying socket
444                 try {
445                     if (clientSocket != null) {
446                         clientSocket.close();
447                     }
448                 } catch( IOException ioe ) {
449                     getLogger().warn( "Error shutting down connection", ioe );
450                 }
451 
452                 clientSocket = null;
453 
454                 // Null out the thread, notify other threads to encourage
455                 // a context switch
456                 synchronized( this ) {
457                     clientSocketThread = null;
458 
459                     Thread.currentThread().interrupted();
460 
461                     // Release the handler and kill the reference to the handler factory
462                     //
463                     // This needs to be done after the clientSocketThread is nulled out,
464                     // otherwise we could trash a reused ClientConnectionRunner
465                     if (handler != null) {
466                         ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
467                         handler = null;
468                     }
469 
470                     // Remove this runner from the list of active connections.
471                     ServerConnection.this.removeClientConnectionRunner(this);
472 
473                     notifyAll();
474                 }
475             }
476         }
477 
478         /***
479          * Helper method to return a formatted string with connection transport information.
480          *
481          * @return a formatted string
482          */
483         private String getConnectionString() {
484             if (clientSocket == null) {
485                 return "invalid socket";
486             }
487             StringBuffer connectionBuffer
488                 = new StringBuffer(256)
489                     .append("connection on ")
490                     .append(clientSocket.getLocalAddress().getHostAddress().toString())
491                     .append(":")
492                     .append(clientSocket.getLocalPort())
493                     .append(" from ")
494                     .append(clientSocket.getInetAddress().getHostAddress().toString())
495                     .append(":")
496                     .append(clientSocket.getPort());
497             return connectionBuffer.toString();
498         }
499     }
500 
501     /***
502      * The factory for producing handlers.
503      */
504     private class ClientConnectionRunnerFactory
505         implements ObjectFactory {
506 
507         /***
508          * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
509          */
510         public Object newInstance() throws Exception {
511             return new ClientConnectionRunner();
512         }
513 
514         /***
515          * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
516          */
517         public Class getCreatedClass() {
518             return ClientConnectionRunner.class;
519         }
520 
521         /***
522          * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommision(Object)
523          */
524         public void decommission( Object object ) throws Exception {
525             return;
526         }
527     }
528 }
529 
530