View Javadoc

1   /************************************************************************
2    * Copyright (c) 2000-2006 The Apache Software Foundation.             *
3    * All rights reserved.                                                *
4    * ------------------------------------------------------------------- *
5    * Licensed under the Apache License, Version 2.0 (the "License"); you *
6    * may not use this file except in compliance with the License. You    *
7    * may obtain a copy of the License at:                                *
8    *                                                                     *
9    *     http://www.apache.org/licenses/LICENSE-2.0                      *
10   *                                                                     *
11   * Unless required by applicable law or agreed to in writing, software *
12   * distributed under the License is distributed on an "AS IS" BASIS,   *
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or     *
14   * implied.  See the License for the specific language governing       *
15   * permissions and limitations under the License.                      *
16   ***********************************************************************/
17  
18  package org.apache.james.util.connection;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.net.ServerSocket;
23  import java.net.Socket;
24  import java.net.SocketException;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  
28  import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
29  import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
30  import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
31  import org.apache.avalon.excalibur.pool.ObjectFactory;
32  import org.apache.avalon.excalibur.pool.Pool;
33  import org.apache.avalon.excalibur.pool.Poolable;
34  import org.apache.excalibur.thread.ThreadPool ;
35  import org.apache.avalon.framework.activity.Initializable;
36  import org.apache.avalon.framework.container.ContainerUtil;
37  import org.apache.avalon.framework.logger.AbstractLogEnabled;
38  
39  
40  /***
41   * Represents a single server socket managed by a connection manager.
42   * The connection manager will spawn a single ServerConnection for each
43   * server socket that the connection manager is managing.
44   *
45   */
46  public class ServerConnection extends AbstractLogEnabled
47      implements Initializable, Runnable {
48  
49      /***
50       * This is a hack to deal with the fact that there appears to be
51       * no platform-independent way to break out of a ServerSocket
52       * accept() call.  On some platforms closing either the ServerSocket
53       * itself, or its associated InputStream, causes the accept
54       * method to exit.  Unfortunately, this behavior is not consistent
55       * across platforms.  The deal with this, we introduce a polling
56       * loop of 20 seconds for the server socket.  This introduces a
57       * cost across platforms, but is necessary to maintain cross-platform
58       * functionality.
59       */
60      private static int POLLING_INTERVAL = 20*1000;
61  
62      /***
63       * The server socket which this connection is managing
64       */
65      private ServerSocket serverSocket;
66  
67      /***
68       * The connection handler factory that generates connection
69       * handlers to manage client connections to this server socket
70       */
71      private ConnectionHandlerFactory handlerFactory;
72  
73      /***
74       * The pool that produces ClientConnectionRunners
75       */
76      private Pool runnerPool;
77  
78      /***
79       * The factory used to provide ClientConnectionRunner objects
80       */
81      private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
82  
83      /***
84       * The thread pool used to spawn individual threads used to manage each
85       * client connection.
86       */
87      private ThreadPool connThreadPool;
88  
89      /***
90       * The timeout for client sockets spawned off this connection.
91       */
92      private int socketTimeout;
93  
94      /***
95       * The maximum number of open client connections that this server
96       * connection will allow.
97       */
98      private int maxOpenConn;
99  
100     /***
101      * A collection of client connection runners.
102      */
103     private final ArrayList clientConnectionRunners = new ArrayList();
104 
105     /***
106      * The thread used to manage this server connection.
107      */
108     private Thread serverConnectionThread;
109 
110     /***
111      * The sole constructor for a ServerConnection.
112      *
113      * @param serverSocket the ServerSocket associated with this ServerConnection
114      * @param handlerFactory the factory that generates ConnectionHandlers for the client
115      *                       connections spawned off this ServerConnection
116      * @param threadPool the ThreadPool used to obtain handler threads
117      * @param timeout the client idle timeout for this ServerConnection's client connections
118      * @param maxOpenConn the maximum number of open client connections allowed for this
119      *                    ServerConnection
120      */
121     public ServerConnection(ServerSocket serverSocket,
122                             ConnectionHandlerFactory handlerFactory,
123                             ThreadPool threadPool,
124                             int timeout,
125                             int maxOpenConn) {
126         this.serverSocket = serverSocket;
127         this.handlerFactory = handlerFactory;
128         connThreadPool = threadPool;
129         socketTimeout = timeout;
130         this.maxOpenConn = maxOpenConn;
131     }
132 
133     /***
134      * @see org.apache.avalon.framework.activity.Initializable#initialize()
135      */
136     public void initialize() throws Exception {
137         runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
138         ContainerUtil.enableLogging(runnerPool,getLogger());
139         ContainerUtil.initialize(runnerPool);
140     }
141 
142     /***
143      * The dispose operation is called by the owning ConnectionManager
144      * at the end of its lifecycle.  Cleans up the server connection, forcing
145      * everything to finish.
146      */
147     public void dispose() {
148         if (getLogger().isDebugEnabled()) {
149             getLogger().debug("Disposing server connection..." + this.toString());
150         }
151         synchronized( this ) {
152             if( null != serverConnectionThread ) {
153                 // Execution of this block means that the run() method
154                 // hasn't finished yet.  So we interrupt the thread
155                 // to terminate run() and wait for the run() method
156                 // to finish.  The notifyAll() at the end of run() will
157                 // wake this thread and allow dispose() to end.
158                 Thread thread = serverConnectionThread;
159                 serverConnectionThread = null;
160                 thread.interrupt();
161                 try {
162                     serverSocket.close();
163                 } catch (IOException ie) {
164                     // Ignored - we're doing this to break out of the
165                     // accept.  This minimizes the time required to
166                     // shutdown the server.  Unfortunately, this is
167                     // not guaranteed to work on all platforms.  See
168                     // the comments for POLLING_INTERVAL
169                 }
170                 try {
171                     if (POLLING_INTERVAL > 0) {
172                         wait(2L*POLLING_INTERVAL);
173                     } else {
174                         wait();
175                     }
176                 } catch (InterruptedException ie) {
177                     // Expected - just complete dispose()
178                 }
179             }
180             ContainerUtil.dispose(runnerPool);
181             runnerPool = null;
182         }
183 
184         getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
185 
186         synchronized (clientConnectionRunners) {
187             Iterator runnerIterator = clientConnectionRunners.iterator();
188             while( runnerIterator.hasNext() ) {
189                 ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
190                 runner.dispose();
191                 runner = null;
192             }
193             clientConnectionRunners.clear();
194         }
195 
196         getLogger().debug("Cleaned up clients - " + this.toString());
197 
198     }
199 
200     /***
201      * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
202      *
203      * @param clientConnectionRunner the ClientConnectionRunner to be added
204      */
205     private ClientConnectionRunner addClientConnectionRunner()
206             throws Exception {
207         synchronized (clientConnectionRunners) {
208             ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
209             clientConnectionRunners.add(clientConnectionRunner);
210             if (getLogger().isDebugEnabled()) {
211                 getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
212             }
213             return clientConnectionRunner;
214         }
215     }
216 
217     /***
218      * Removes a ClientConnectionRunner from the set managed by this ServerConnection object.
219      *
220      * @param clientConnectionRunner the ClientConnectionRunner to be removed
221      */
222     private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
223 
224        /*
225         * checking runnerPool avoids 'dead-lock' when service is disposing :
226         * (dispose() calls dispose all runners)
227         * but runner is 'running' and cleans up on exit
228         * this situation will result in a dead-lock on 'clientConnectionRunners'
229         */
230         if( runnerPool == null ) {
231             getLogger().info("ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : " + clientConnectionRunner );
232             return;
233         }
234         
235         synchronized (clientConnectionRunners) {
236             if (clientConnectionRunners.remove(clientConnectionRunner)) {
237                 if (getLogger().isDebugEnabled()) {
238                     getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
239                 }
240                 runnerPool.put(clientConnectionRunner);
241             }
242         }
243     }
244 
245     /***
246      * Provides the body for the thread of execution for a ServerConnection.
247      * Connections made to the server socket are passed to an appropriate,
248      * newly created, ClientConnectionRunner
249      */
250     public void run() {
251         serverConnectionThread = Thread.currentThread();
252 
253         int ioExceptionCount = 0;
254         try {
255             serverSocket.setSoTimeout(POLLING_INTERVAL);
256         } catch (SocketException se) {
257             // Ignored - for the moment
258         }
259 
260         if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
261             StringBuffer debugBuffer =
262                 new StringBuffer(128)
263                     .append(serverConnectionThread.getName())
264                     .append(" is listening on ")
265                     .append(serverSocket.toString());
266             getLogger().debug(debugBuffer.toString());
267         }
268         while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) {
269             try {
270                 Socket clientSocket = null;
271                 try {
272                     clientSocket = serverSocket.accept();
273                 } catch( InterruptedIOException iioe ) {
274                     // This exception is expected upon ServerConnection shutdown.
275                     // See the POLLING_INTERVAL comment
276                     continue;
277                 } catch( IOException se ) {
278                     if (ioExceptionCount > 0) {
279                         getLogger().error( "Fatal exception while listening on server socket.  Terminating connection.", se );
280                         break;
281                     } else {
282                         continue;
283                     }
284                 } catch( SecurityException se ) {
285                     getLogger().error( "Fatal exception while listening on server socket.  Terminating connection.", se );
286                     break;
287                 }
288                 ClientConnectionRunner runner = null;
289                 synchronized (clientConnectionRunners) {
290                     if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
291                         if (getLogger().isWarnEnabled()) {
292                            getLogger().warn("Maximum number of open connections exceeded - refusing connection.  Current number of connections is " + clientConnectionRunners.size());
293                            if (getLogger().isWarnEnabled()) {
294                                Iterator runnerIterator = clientConnectionRunners.iterator();
295                                getLogger().info("Connections: ");
296                                while( runnerIterator.hasNext() ) {
297                                    getLogger().info("    " + ((ClientConnectionRunner)runnerIterator.next()).toString());
298                                }
299                            }
300                         }
301                         try {
302                             clientSocket.close();
303                         } catch (IOException ignored) {
304                             // We ignore this exception, as we already have an error condition.
305                         }
306                         continue;
307                     } else {
308                         clientSocket.setSoTimeout(socketTimeout);
309                         runner = addClientConnectionRunner();
310                         runner.setSocket(clientSocket);
311                     }
312                 }
313                 setupLogger( runner );
314                 try {
315                     connThreadPool.execute( runner );
316                 } catch (Exception e) {
317                     // This error indicates that the underlying thread pool
318                     // is out of threads.  For robustness, we catch this and
319                     // cleanup
320                     getLogger().error("Internal error - insufficient threads available to service request.  " +
321                                       Thread.activeCount() + " threads in service request pool.", e);
322                     try {
323                         clientSocket.close();
324                     } catch (IOException ignored) {
325                         // We ignore this exception, as we already have an error condition.
326                     }
327                     // In this case, the thread will not remove the client connection runner,
328                     // so we must.
329                     removeClientConnectionRunner(runner);
330                 }
331             } catch( IOException ioe ) {
332                 getLogger().error( "Exception accepting connection", ioe );
333             } catch( Throwable e ) {
334                 getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
335             }
336         }
337         synchronized( this ) {
338             serverConnectionThread = null;
339             Thread.currentThread().interrupted();
340             notifyAll();
341         }
342     }
343 
344     /***
345      * An inner class to provide the actual body of the thread of execution
346      * that occurs upon a client connection.
347      *
348      */
349     class ClientConnectionRunner extends AbstractLogEnabled
350         implements Poolable, Runnable  {
351 
352         /***
353          * The Socket that this client connection is using for transport.
354          */
355         private Socket clientSocket;
356 
357         /***
358          * The thread of execution associated with this client connection.
359          */
360         private Thread clientSocketThread;
361 
362         /***
363          * Returns string for diagnostic logging
364          */
365         public String toString() {
366             return getClass().getName() + " for " + clientSocket + " on " + clientSocketThread;
367         }
368 
369         public ClientConnectionRunner() {
370         }
371 
372         /***
373          * The dispose operation that terminates the runner.  Should only be
374          * called by the ServerConnection that owns the ClientConnectionRunner
375          */
376         public void dispose() {
377             synchronized( this ) {
378                 if (null != clientSocketThread) {
379                     // Execution of this block means that the run() method
380                     // hasn't finished yet.  So we interrupt the thread
381                     // to terminate run() and wait for the run() method
382                     // to finish.  The notifyAll() at the end of run() will
383                     // wake this thread and allow dispose() to end.
384                     clientSocketThread.interrupt();
385                     clientSocketThread = null;
386                     try {
387                         wait();
388                     } catch (InterruptedException ie) {
389                         // Expected - return from the method
390                     }
391                 }
392             }
393         }
394 
395         /***
396          * Sets the socket for a ClientConnectionRunner.
397          *
398          * @param socket the client socket associated with this ClientConnectionRunner
399          */
400         public void setSocket(Socket socket) {
401             clientSocket = socket;
402         }
403 
404         /***
405          * Provides the body for the thread of execution dealing with a particular client
406          * connection.  An appropriate ConnectionHandler is created, applied, executed,
407          * and released.
408          */
409         public void run() {
410             ConnectionHandler handler = null;
411             try {
412                 clientSocketThread = Thread.currentThread();
413 
414                 handler = ServerConnection.this.handlerFactory.createConnectionHandler();
415                 String connectionString = null;
416                 if( getLogger().isDebugEnabled() ) {
417                     connectionString = getConnectionString();
418                     String message = "Starting " + connectionString;
419                     getLogger().debug( message );
420                 }
421 
422                 handler.handleConnection(clientSocket);
423 
424                 if( getLogger().isDebugEnabled() ) {
425                     String message = "Ending " + connectionString;
426                     getLogger().debug( message );
427                 }
428 
429             } catch( Throwable e ) {
430                 getLogger().error( "Error handling connection", e );
431             } finally {
432 
433                 // Close the underlying socket
434                 try {
435                     if (clientSocket != null) {
436                         clientSocket.close();
437                     }
438                 } catch( IOException ioe ) {
439                     getLogger().warn( "Error shutting down connection", ioe );
440                 }
441 
442                 clientSocket = null;
443 
444                 // Null out the thread, notify other threads to encourage
445                 // a context switch
446                 synchronized( this ) {
447                     clientSocketThread = null;
448 
449                     Thread.currentThread().interrupted();
450 
451                     // Release the handler and kill the reference to the handler factory
452                     //
453                     // This needs to be done after the clientSocketThread is nulled out,
454                     // otherwise we could trash a reused ClientConnectionRunner
455                     if (handler != null) {
456                         ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
457                         handler = null;
458                     }
459 
460                     // Remove this runner from the list of active connections.
461                     ServerConnection.this.removeClientConnectionRunner(this);
462 
463                     notifyAll();
464                 }
465             }
466         }
467 
468         /***
469          * Helper method to return a formatted string with connection transport information.
470          *
471          * @return a formatted string
472          */
473         private String getConnectionString() {
474             if (clientSocket == null) {
475                 return "invalid socket";
476             }
477             StringBuffer connectionBuffer
478                 = new StringBuffer(256)
479                     .append("connection on ")
480                     .append(clientSocket.getLocalAddress().getHostAddress().toString())
481                     .append(":")
482                     .append(clientSocket.getLocalPort())
483                     .append(" from ")
484                     .append(clientSocket.getInetAddress().getHostAddress().toString())
485                     .append(":")
486                     .append(clientSocket.getPort());
487             return connectionBuffer.toString();
488         }
489     }
490 
491     /***
492      * The factory for producing handlers.
493      */
494     private class ClientConnectionRunnerFactory
495         implements ObjectFactory {
496 
497         /***
498          * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
499          */
500         public Object newInstance() throws Exception {
501             return new ClientConnectionRunner();
502         }
503 
504         /***
505          * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
506          */
507         public Class getCreatedClass() {
508             return ClientConnectionRunner.class;
509         }
510 
511         /***
512          * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommision(Object)
513          */
514         public void decommission( Object object ) throws Exception {
515             return;
516         }
517     }
518 }
519 
520