View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  
22  package org.apache.james.util.connection;
23  
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.net.ServerSocket;
27  import java.net.Socket;
28  import java.net.SocketException;
29  import java.util.ArrayList;
30  import java.util.Iterator;
31  
32  import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
33  import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
34  import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
35  import org.apache.avalon.excalibur.pool.ObjectFactory;
36  import org.apache.avalon.excalibur.pool.Pool;
37  import org.apache.avalon.excalibur.pool.Poolable;
38  import org.apache.excalibur.thread.ThreadPool ;
39  import org.apache.avalon.framework.activity.Initializable;
40  import org.apache.avalon.framework.container.ContainerUtil;
41  import org.apache.avalon.framework.logger.AbstractLogEnabled;
42  
43  
44  /**
45   * Represents a single server socket managed by a connection manager.
46   * The connection manager will spawn a single ServerConnection for each
47   * server socket that the connection manager is managing.
48   *
49   */
50  public class ServerConnection extends AbstractLogEnabled
51      implements Initializable, Runnable {
52  
53      /**
54       * This is a hack to deal with the fact that there appears to be
55       * no platform-independent way to break out of a ServerSocket
56       * accept() call.  On some platforms closing either the ServerSocket
57       * itself, or its associated InputStream, causes the accept
58       * method to exit.  Unfortunately, this behavior is not consistent
59       * across platforms.  The deal with this, we introduce a polling
60       * loop of 20 seconds for the server socket.  This introduces a
61       * cost across platforms, but is necessary to maintain cross-platform
62       * functionality.
63       */
64      private static int POLLING_INTERVAL = 20*1000;
65  
66      /**
67       * The server socket which this connection is managing
68       */
69      private ServerSocket serverSocket;
70  
71      /**
72       * The connection handler factory that generates connection
73       * handlers to manage client connections to this server socket
74       */
75      private ConnectionHandlerFactory handlerFactory;
76  
77      /**
78       * The pool that produces ClientConnectionRunners
79       */
80      private Pool runnerPool;
81  
82      /**
83       * The factory used to provide ClientConnectionRunner objects
84       */
85      private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
86  
87      /**
88       * The thread pool used to spawn individual threads used to manage each
89       * client connection.
90       */
91      private ThreadPool connThreadPool;
92  
93      /**
94       * The timeout for client sockets spawned off this connection.
95       */
96      private int socketTimeout;
97  
98      /**
99       * The maximum number of open client connections that this server
100      * connection will allow.
101      */
102     private int maxOpenConn;
103     
104     /**
105      * The maximum number of open client connections per IP that this server
106      * connection will allow.
107      */
108     private int maxOpenConnPerIP;
109     
110     private ConnectionPerIpMap connPerIpMap;
111 
112     /**
113      * A collection of client connection runners.
114      */
115     private final ArrayList clientConnectionRunners = new ArrayList();
116     
117 
118     /**
119      * The thread used to manage this server connection.
120      */
121     private Thread serverConnectionThread;
122 
123     /**
124      * The sole constructor for a ServerConnection.
125      *
126      * @param serverSocket the ServerSocket associated with this ServerConnection
127      * @param handlerFactory the factory that generates ConnectionHandlers for the client
128      *                       connections spawned off this ServerConnection
129      * @param threadPool the ThreadPool used to obtain handler threads
130      * @param timeout the client idle timeout for this ServerConnection's client connections
131      * @param maxOpenConn the maximum number of open client connections allowed for this
132      *                    ServerConnection
133      * @param maxOpenConnPerIP the maximum number of open client connections allowed for this
134      *                    ServerConnection per IP
135      */
136     public ServerConnection(ServerSocket serverSocket,
137                             ConnectionHandlerFactory handlerFactory,
138                             ThreadPool threadPool,
139                             int timeout,
140                             int maxOpenConn, int maxOpenConnPerIP) {
141         this.serverSocket = serverSocket;
142         this.handlerFactory = handlerFactory;
143         connThreadPool = threadPool;
144         socketTimeout = timeout;
145         this.maxOpenConn = maxOpenConn;
146         this.maxOpenConnPerIP = maxOpenConnPerIP;
147     }
148 
149     /**
150      * @see org.apache.avalon.framework.activity.Initializable#initialize()
151      */
152     public void initialize() throws Exception {
153         runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
154         connPerIpMap = new ConnectionPerIpMap();
155         ContainerUtil.enableLogging(runnerPool,getLogger());
156         ContainerUtil.initialize(runnerPool);
157     }
158 
159     /**
160      * The dispose operation is called by the owning ConnectionManager
161      * at the end of its lifecycle.  Cleans up the server connection, forcing
162      * everything to finish.
163      */
164     public void dispose() {
165         if (getLogger().isDebugEnabled()) {
166             getLogger().debug("Disposing server connection..." + this.toString());
167         }
168         synchronized( this ) {
169             if( null != serverConnectionThread ) {
170                 // Execution of this block means that the run() method
171                 // hasn't finished yet.  So we interrupt the thread
172                 // to terminate run() and wait for the run() method
173                 // to finish.  The notifyAll() at the end of run() will
174                 // wake this thread and allow dispose() to end.
175                 Thread thread = serverConnectionThread;
176                 serverConnectionThread = null;
177                 thread.interrupt();
178                 try {
179                     
180                     serverSocket.close();
181                     
182                 } catch (IOException ie) {
183                     // Ignored - we're doing this to break out of the
184                     // accept.  This minimizes the time required to
185                     // shutdown the server.  Unfortunately, this is
186                     // not guaranteed to work on all platforms.  See
187                     // the comments for POLLING_INTERVAL
188                 }
189                 try {
190                     if (POLLING_INTERVAL > 0) {
191                         wait(2L*POLLING_INTERVAL);
192                     } else {
193                         wait();
194                     }
195                 } catch (InterruptedException ie) {
196                     // Expected - just complete dispose()
197                 }
198             }
199             ContainerUtil.dispose(runnerPool);
200             runnerPool = null;
201         }
202 
203         getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
204 
205         synchronized (clientConnectionRunners) {
206             Iterator runnerIterator = clientConnectionRunners.iterator();
207             while( runnerIterator.hasNext() ) {
208                 ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
209                 runner.dispose();
210                 runner = null;
211             }
212             clientConnectionRunners.clear();
213             connPerIpMap.clearConnectionPerIP();
214             
215         }
216 
217         getLogger().debug("Cleaned up clients - " + this.toString());
218 
219     }
220 
221     /**
222      * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
223      *
224      */
225     private ClientConnectionRunner addClientConnectionRunner()
226             throws Exception {
227         synchronized (clientConnectionRunners) {
228             ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
229             clientConnectionRunners.add(clientConnectionRunner);
230             if (getLogger().isDebugEnabled()) {
231                 getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
232             }
233             return clientConnectionRunner;
234         }
235     }
236 
237     /**
238      * Removes a ClientConnectionRunner from the set managed by this ServerConnection object.
239      *
240      * @param clientConnectionRunner the ClientConnectionRunner to be removed
241      */
242     private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
243 
244        /*
245         * checking runnerPool avoids 'dead-lock' when service is disposing :
246         * (dispose() calls dispose all runners)
247         * but runner is 'running' and cleans up on exit
248         * this situation will result in a dead-lock on 'clientConnectionRunners'
249         */
250         if( runnerPool == null ) {
251             getLogger().info("ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : " + clientConnectionRunner );
252             return;
253         }
254         
255         synchronized (clientConnectionRunners) {
256             if (clientConnectionRunners.remove(clientConnectionRunner)) {
257                 if (getLogger().isDebugEnabled()) {
258                     getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
259                 }
260                 runnerPool.put(clientConnectionRunner);
261             }
262         }
263 
264         synchronized (this) { notify(); } // match the wait(...) in the run() inner loop before accept().
265     }
266 
267 
268     /**
269      * Provides the body for the thread of execution for a ServerConnection.
270      * Connections made to the server socket are passed to an appropriate,
271      * newly created, ClientConnectionRunner
272      */
273     public void run() {
274         serverConnectionThread = Thread.currentThread();
275 
276         int ioExceptionCount = 0;
277         try {
278             serverSocket.setSoTimeout(POLLING_INTERVAL);
279         } catch (SocketException se) {
280             // Ignored - for the moment
281         }
282 
283         if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
284             StringBuffer debugBuffer =
285                 new StringBuffer(128)
286                     .append(serverConnectionThread.getName())
287                     .append(" is listening on ")
288                     .append(serverSocket.toString());
289             getLogger().debug(debugBuffer.toString());
290         }
291         while( !Thread.interrupted() && null != serverConnectionThread ) {
292             try {
293                 Socket clientSocket = null;
294                 try {
295                     while (maxOpenConn > 0 && clientConnectionRunners.size() >= maxOpenConn) {
296                         if (getLogger().isInfoEnabled()) {
297                             getLogger().info("Maximum number of open connections (" +  clientConnectionRunners.size() + ") in use.");
298                         }
299                         synchronized (this) { wait(10000); }
300                     }
301 
302                     clientSocket = serverSocket.accept();
303 
304                 } catch( InterruptedIOException iioe ) {
305                     // This exception is expected upon ServerConnection shutdown.
306                     // See the POLLING_INTERVAL comment
307                     continue;
308                 } catch( IOException se ) {
309                     if (ioExceptionCount > 0) {
310                         getLogger().error( "Fatal exception while listening on server socket.  Terminating connection.", se );
311                         break;
312                     } else {
313                         continue;
314                     }
315                 } catch( SecurityException se ) {
316                     getLogger().error( "Fatal exception while listening on server socket.  Terminating connection.", se );
317                     break;
318                 }
319                 ClientConnectionRunner runner = null;
320                 synchronized (clientConnectionRunners) {
321                     if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
322                         if (getLogger().isWarnEnabled()) {
323                            getLogger().warn("Maximum number of open connections exceeded - refusing connection.  Current number of connections is " + clientConnectionRunners.size());
324                            if (getLogger().isInfoEnabled()) {
325                                Iterator runnerIterator = clientConnectionRunners.iterator();
326                                getLogger().info("Connections: ");
327                                while( runnerIterator.hasNext() ) {
328                                    getLogger().info("    " + ((ClientConnectionRunner)runnerIterator.next()).toString());
329                                }
330                            }
331                         }
332                         try {
333                             clientSocket.close();
334                         } catch (IOException ignored) {
335                             // We ignore this exception, as we already have an error condition.
336                         }
337                         continue;
338                     } else if ((maxOpenConnPerIP > 0) && (connPerIpMap.getConnectionPerIP(clientSocket.getInetAddress().getHostAddress()) >= maxOpenConnPerIP)) {
339                         if (getLogger().isWarnEnabled()) {
340                             getLogger().warn("Maximum number of open connections per IP exceeded - refusing connection.  Current number of connections for " + clientSocket.getInetAddress().getHostAddress() + " is " + connPerIpMap.getConnectionPerIP(clientSocket.getInetAddress().getHostAddress()));
341                         }
342                         try {
343                             clientSocket.close();
344                         } catch (IOException ignored) {
345                             // We ignore this exception, as we already have an error condition.
346                         }
347                         continue;
348                         
349                     } else {
350                     connPerIpMap.addConnectionPerIP(clientSocket.getInetAddress().getHostAddress());
351                         clientSocket.setSoTimeout(socketTimeout);
352                         runner = addClientConnectionRunner();
353                         runner.setSocket(clientSocket);
354                         
355                     }
356                 }
357                 setupLogger( runner );
358                 try {
359                     connThreadPool.execute( runner );
360                 } catch (Exception e) {
361                     // This error indicates that the underlying thread pool
362                     // is out of threads.  For robustness, we catch this and
363                     // cleanup
364                     getLogger().error("Internal error - insufficient threads available to service request.  " +
365                                       Thread.activeCount() + " threads in service request pool.", e);
366                     try {
367                     connPerIpMap.removeConnectionPerIP(clientSocket.getInetAddress().getHostAddress());
368                         clientSocket.close();
369                     } catch (IOException ignored) {
370                         // We ignore this exception, as we already have an error condition.
371                     }
372                     // In this case, the thread will not remove the client connection runner,
373                     // so we must.
374                     removeClientConnectionRunner(runner);
375                 }
376             } catch( IOException ioe ) {
377                 getLogger().error( "Exception accepting connection", ioe );
378             } catch( Throwable e ) {
379                 getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
380             }
381         }
382         synchronized( this ) {
383             serverConnectionThread = null;
384             Thread.interrupted();
385             notifyAll();
386         }
387     }
388 
389     /**
390      * An inner class to provide the actual body of the thread of execution
391      * that occurs upon a client connection.
392      *
393      */
394     class ClientConnectionRunner extends AbstractLogEnabled
395         implements Poolable, Runnable  {
396 
397         /**
398          * The Socket that this client connection is using for transport.
399          */
400         private Socket clientSocket;
401 
402         /**
403          * The thread of execution associated with this client connection.
404          */
405         private Thread clientSocketThread;
406 
407         /**
408          * Returns string for diagnostic logging
409          */
410         public String toString() {
411             return getClass().getName() + " for " + clientSocket + " on " + clientSocketThread;
412         }
413 
414         public ClientConnectionRunner() {
415         }
416         
417         /**
418          * The dispose operation that terminates the runner.  Should only be
419          * called by the ServerConnection that owns the ClientConnectionRunner
420          */
421         public void dispose() {
422             synchronized( this ) {
423                 if (null != clientSocketThread) {
424                     // Execution of this block means that the run() method
425                     // hasn't finished yet.  So we interrupt the thread
426                     // to terminate run() and wait for the run() method
427                     // to finish.  The notifyAll() at the end of run() will
428                     // wake this thread and allow dispose() to end.
429                     clientSocketThread.interrupt();
430                     clientSocketThread = null;
431                     try {
432                         wait();
433                     } catch (InterruptedException ie) {
434                         // Expected - return from the method
435                     }
436                 }
437             }
438         }
439 
440         /**
441          * Sets the socket for a ClientConnectionRunner.
442          *
443          * @param socket the client socket associated with this ClientConnectionRunner
444          */
445         public void setSocket(Socket socket) {
446             clientSocket = socket;
447         }
448 
449         /**
450          * Provides the body for the thread of execution dealing with a particular client
451          * connection.  An appropriate ConnectionHandler is created, applied, executed,
452          * and released.
453          */
454         public void run() {
455             ConnectionHandler handler = null;
456             try {
457                 clientSocketThread = Thread.currentThread();
458 
459                 handler = ServerConnection.this.handlerFactory.createConnectionHandler();
460                 String connectionString = null;
461                 if( getLogger().isDebugEnabled() ) {
462                     connectionString = getConnectionString();
463                     String message = "Starting " + connectionString;
464                     getLogger().debug( message );
465                 }
466 
467                 handler.handleConnection(clientSocket);
468 
469                 if( getLogger().isDebugEnabled() ) {
470                     String message = "Ending " + connectionString;
471                     getLogger().debug( message );
472                 }
473 
474             } catch( Throwable e ) {
475                 getLogger().error( "Error handling connection", e );
476             } finally {
477 
478                 // remove this connection from map!
479             connPerIpMap.removeConnectionPerIP(clientSocket.getInetAddress().getHostAddress());
480                 
481                 // Close the underlying socket
482                 try {
483                     if (clientSocket != null) {
484                         clientSocket.close();
485                     }
486                 } catch( IOException ioe ) {
487                     getLogger().warn( "Error shutting down connection", ioe );
488                 }
489 
490                 clientSocket = null;
491 
492                 // Null out the thread, notify other threads to encourage
493                 // a context switch
494                 synchronized( this ) {
495                     clientSocketThread = null;
496 
497                     Thread.interrupted();
498 
499                     // Release the handler and kill the reference to the handler factory
500                     //
501                     // This needs to be done after the clientSocketThread is nulled out,
502                     // otherwise we could trash a reused ClientConnectionRunner
503                     if (handler != null) {
504                         ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
505                         handler = null;
506                     }
507 
508                     // Remove this runner from the list of active connections.
509                     ServerConnection.this.removeClientConnectionRunner(this);
510 
511                     notifyAll();
512                 }
513             }
514         }
515 
516         /**
517          * Helper method to return a formatted string with connection transport information.
518          *
519          * @return a formatted string
520          */
521         private String getConnectionString() {
522             if (clientSocket == null) {
523                 return "invalid socket";
524             }
525             StringBuffer connectionBuffer
526                 = new StringBuffer(256)
527                     .append("connection on ")
528                     .append(clientSocket.getLocalAddress().getHostAddress().toString())
529                     .append(":")
530                     .append(clientSocket.getLocalPort())
531                     .append(" from ")
532                     .append(clientSocket.getInetAddress().getHostAddress().toString())
533                     .append(":")
534                     .append(clientSocket.getPort());
535             return connectionBuffer.toString();
536         }
537     }
538 
539     /**
540      * The factory for producing handlers.
541      */
542     private class ClientConnectionRunnerFactory
543         implements ObjectFactory {
544 
545         /**
546          * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
547          */
548         public Object newInstance() throws Exception {
549             return new ClientConnectionRunner();
550         }
551 
552         /**
553          * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
554          */
555         public Class getCreatedClass() {
556             return ClientConnectionRunner.class;
557         }
558 
559         /**
560          * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommission(Object)
561          */
562         public void decommission( Object object ) throws Exception {
563             return;
564         }
565     }
566 }
567 
568