1 /****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20
21
22 package org.apache.james.util.connection;
23
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.net.ServerSocket;
27 import java.net.Socket;
28 import java.net.SocketException;
29 import java.util.ArrayList;
30 import java.util.Iterator;
31
32 import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
33 import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
34 import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
35 import org.apache.avalon.excalibur.pool.ObjectFactory;
36 import org.apache.avalon.excalibur.pool.Pool;
37 import org.apache.avalon.excalibur.pool.Poolable;
38 import org.apache.excalibur.thread.ThreadPool ;
39 import org.apache.avalon.framework.activity.Initializable;
40 import org.apache.avalon.framework.container.ContainerUtil;
41 import org.apache.avalon.framework.logger.AbstractLogEnabled;
42
43
44 /**
45 * Represents a single server socket managed by a connection manager.
46 * The connection manager will spawn a single ServerConnection for each
47 * server socket that the connection manager is managing.
48 *
49 */
50 public class ServerConnection extends AbstractLogEnabled
51 implements Initializable, Runnable {
52
53 /**
54 * This is a hack to deal with the fact that there appears to be
55 * no platform-independent way to break out of a ServerSocket
56 * accept() call. On some platforms closing either the ServerSocket
57 * itself, or its associated InputStream, causes the accept
58 * method to exit. Unfortunately, this behavior is not consistent
59 * across platforms. The deal with this, we introduce a polling
60 * loop of 20 seconds for the server socket. This introduces a
61 * cost across platforms, but is necessary to maintain cross-platform
62 * functionality.
63 */
64 private static int POLLING_INTERVAL = 20*1000;
65
66 /**
67 * The server socket which this connection is managing
68 */
69 private ServerSocket serverSocket;
70
71 /**
72 * The connection handler factory that generates connection
73 * handlers to manage client connections to this server socket
74 */
75 private ConnectionHandlerFactory handlerFactory;
76
77 /**
78 * The pool that produces ClientConnectionRunners
79 */
80 private Pool runnerPool;
81
82 /**
83 * The factory used to provide ClientConnectionRunner objects
84 */
85 private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
86
87 /**
88 * The thread pool used to spawn individual threads used to manage each
89 * client connection.
90 */
91 private ThreadPool connThreadPool;
92
93 /**
94 * The timeout for client sockets spawned off this connection.
95 */
96 private int socketTimeout;
97
98 /**
99 * The maximum number of open client connections that this server
100 * connection will allow.
101 */
102 private int maxOpenConn;
103
104 /**
105 * The maximum number of open client connections per IP that this server
106 * connection will allow.
107 */
108 private int maxOpenConnPerIP;
109
110 private ConnectionPerIpMap connPerIpMap;
111
112 /**
113 * A collection of client connection runners.
114 */
115 private final ArrayList clientConnectionRunners = new ArrayList();
116
117
118 /**
119 * The thread used to manage this server connection.
120 */
121 private Thread serverConnectionThread;
122
123 /**
124 * The sole constructor for a ServerConnection.
125 *
126 * @param serverSocket the ServerSocket associated with this ServerConnection
127 * @param handlerFactory the factory that generates ConnectionHandlers for the client
128 * connections spawned off this ServerConnection
129 * @param threadPool the ThreadPool used to obtain handler threads
130 * @param timeout the client idle timeout for this ServerConnection's client connections
131 * @param maxOpenConn the maximum number of open client connections allowed for this
132 * ServerConnection
133 * @param maxOpenConnPerIP the maximum number of open client connections allowed for this
134 * ServerConnection per IP
135 */
136 public ServerConnection(ServerSocket serverSocket,
137 ConnectionHandlerFactory handlerFactory,
138 ThreadPool threadPool,
139 int timeout,
140 int maxOpenConn, int maxOpenConnPerIP) {
141 this.serverSocket = serverSocket;
142 this.handlerFactory = handlerFactory;
143 connThreadPool = threadPool;
144 socketTimeout = timeout;
145 this.maxOpenConn = maxOpenConn;
146 this.maxOpenConnPerIP = maxOpenConnPerIP;
147 }
148
149 /**
150 * @see org.apache.avalon.framework.activity.Initializable#initialize()
151 */
152 public void initialize() throws Exception {
153 runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
154 connPerIpMap = new ConnectionPerIpMap();
155 ContainerUtil.enableLogging(runnerPool,getLogger());
156 ContainerUtil.initialize(runnerPool);
157 }
158
159 /**
160 * The dispose operation is called by the owning ConnectionManager
161 * at the end of its lifecycle. Cleans up the server connection, forcing
162 * everything to finish.
163 */
164 public void dispose() {
165 if (getLogger().isDebugEnabled()) {
166 getLogger().debug("Disposing server connection..." + this.toString());
167 }
168 synchronized( this ) {
169 if( null != serverConnectionThread ) {
170 // Execution of this block means that the run() method
171 // hasn't finished yet. So we interrupt the thread
172 // to terminate run() and wait for the run() method
173 // to finish. The notifyAll() at the end of run() will
174 // wake this thread and allow dispose() to end.
175 Thread thread = serverConnectionThread;
176 serverConnectionThread = null;
177 thread.interrupt();
178 try {
179
180 serverSocket.close();
181
182 } catch (IOException ie) {
183 // Ignored - we're doing this to break out of the
184 // accept. This minimizes the time required to
185 // shutdown the server. Unfortunately, this is
186 // not guaranteed to work on all platforms. See
187 // the comments for POLLING_INTERVAL
188 }
189 try {
190 if (POLLING_INTERVAL > 0) {
191 wait(2L*POLLING_INTERVAL);
192 } else {
193 wait();
194 }
195 } catch (InterruptedException ie) {
196 // Expected - just complete dispose()
197 }
198 }
199 ContainerUtil.dispose(runnerPool);
200 runnerPool = null;
201 }
202
203 getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
204
205 synchronized (clientConnectionRunners) {
206 Iterator runnerIterator = clientConnectionRunners.iterator();
207 while( runnerIterator.hasNext() ) {
208 ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
209 runner.dispose();
210 runner = null;
211 }
212 clientConnectionRunners.clear();
213 connPerIpMap.clearConnectionPerIP();
214
215 }
216
217 getLogger().debug("Cleaned up clients - " + this.toString());
218
219 }
220
221 /**
222 * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
223 *
224 */
225 private ClientConnectionRunner addClientConnectionRunner()
226 throws Exception {
227 synchronized (clientConnectionRunners) {
228 ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
229 clientConnectionRunners.add(clientConnectionRunner);
230 if (getLogger().isDebugEnabled()) {
231 getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
232 }
233 return clientConnectionRunner;
234 }
235 }
236
237 /**
238 * Removes a ClientConnectionRunner from the set managed by this ServerConnection object.
239 *
240 * @param clientConnectionRunner the ClientConnectionRunner to be removed
241 */
242 private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
243
244 /*
245 * checking runnerPool avoids 'dead-lock' when service is disposing :
246 * (dispose() calls dispose all runners)
247 * but runner is 'running' and cleans up on exit
248 * this situation will result in a dead-lock on 'clientConnectionRunners'
249 */
250 if( runnerPool == null ) {
251 getLogger().info("ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : " + clientConnectionRunner );
252 return;
253 }
254
255 synchronized (clientConnectionRunners) {
256 if (clientConnectionRunners.remove(clientConnectionRunner)) {
257 if (getLogger().isDebugEnabled()) {
258 getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
259 }
260 runnerPool.put(clientConnectionRunner);
261 }
262 }
263
264 synchronized (this) { notify(); } // match the wait(...) in the run() inner loop before accept().
265 }
266
267
268 /**
269 * Provides the body for the thread of execution for a ServerConnection.
270 * Connections made to the server socket are passed to an appropriate,
271 * newly created, ClientConnectionRunner
272 */
273 public void run() {
274 serverConnectionThread = Thread.currentThread();
275
276 int ioExceptionCount = 0;
277 try {
278 serverSocket.setSoTimeout(POLLING_INTERVAL);
279 } catch (SocketException se) {
280 // Ignored - for the moment
281 }
282
283 if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
284 StringBuffer debugBuffer =
285 new StringBuffer(128)
286 .append(serverConnectionThread.getName())
287 .append(" is listening on ")
288 .append(serverSocket.toString());
289 getLogger().debug(debugBuffer.toString());
290 }
291 while( !Thread.interrupted() && null != serverConnectionThread ) {
292 try {
293 Socket clientSocket = null;
294 try {
295 while (maxOpenConn > 0 && clientConnectionRunners.size() >= maxOpenConn) {
296 if (getLogger().isInfoEnabled()) {
297 getLogger().info("Maximum number of open connections (" + clientConnectionRunners.size() + ") in use.");
298 }
299 synchronized (this) { wait(10000); }
300 }
301
302 clientSocket = serverSocket.accept();
303
304 } catch( InterruptedIOException iioe ) {
305 // This exception is expected upon ServerConnection shutdown.
306 // See the POLLING_INTERVAL comment
307 continue;
308 } catch( IOException se ) {
309 if (ioExceptionCount > 0) {
310 getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
311 break;
312 } else {
313 continue;
314 }
315 } catch( SecurityException se ) {
316 getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
317 break;
318 }
319 ClientConnectionRunner runner = null;
320 synchronized (clientConnectionRunners) {
321 if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
322 if (getLogger().isWarnEnabled()) {
323 getLogger().warn("Maximum number of open connections exceeded - refusing connection. Current number of connections is " + clientConnectionRunners.size());
324 if (getLogger().isInfoEnabled()) {
325 Iterator runnerIterator = clientConnectionRunners.iterator();
326 getLogger().info("Connections: ");
327 while( runnerIterator.hasNext() ) {
328 getLogger().info(" " + ((ClientConnectionRunner)runnerIterator.next()).toString());
329 }
330 }
331 }
332 try {
333 clientSocket.close();
334 } catch (IOException ignored) {
335 // We ignore this exception, as we already have an error condition.
336 }
337 continue;
338 } else if ((maxOpenConnPerIP > 0) && (connPerIpMap.getConnectionPerIP(clientSocket.getInetAddress().getHostAddress()) >= maxOpenConnPerIP)) {
339 if (getLogger().isWarnEnabled()) {
340 getLogger().warn("Maximum number of open connections per IP exceeded - refusing connection. Current number of connections for " + clientSocket.getInetAddress().getHostAddress() + " is " + connPerIpMap.getConnectionPerIP(clientSocket.getInetAddress().getHostAddress()));
341 }
342 try {
343 clientSocket.close();
344 } catch (IOException ignored) {
345 // We ignore this exception, as we already have an error condition.
346 }
347 continue;
348
349 } else {
350 connPerIpMap.addConnectionPerIP(clientSocket.getInetAddress().getHostAddress());
351 clientSocket.setSoTimeout(socketTimeout);
352 runner = addClientConnectionRunner();
353 runner.setSocket(clientSocket);
354
355 }
356 }
357 setupLogger( runner );
358 try {
359 connThreadPool.execute( runner );
360 } catch (Exception e) {
361 // This error indicates that the underlying thread pool
362 // is out of threads. For robustness, we catch this and
363 // cleanup
364 getLogger().error("Internal error - insufficient threads available to service request. " +
365 Thread.activeCount() + " threads in service request pool.", e);
366 try {
367 connPerIpMap.removeConnectionPerIP(clientSocket.getInetAddress().getHostAddress());
368 clientSocket.close();
369 } catch (IOException ignored) {
370 // We ignore this exception, as we already have an error condition.
371 }
372 // In this case, the thread will not remove the client connection runner,
373 // so we must.
374 removeClientConnectionRunner(runner);
375 }
376 } catch( IOException ioe ) {
377 getLogger().error( "Exception accepting connection", ioe );
378 } catch( Throwable e ) {
379 getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
380 }
381 }
382 synchronized( this ) {
383 serverConnectionThread = null;
384 Thread.interrupted();
385 notifyAll();
386 }
387 }
388
389 /**
390 * An inner class to provide the actual body of the thread of execution
391 * that occurs upon a client connection.
392 *
393 */
394 class ClientConnectionRunner extends AbstractLogEnabled
395 implements Poolable, Runnable {
396
397 /**
398 * The Socket that this client connection is using for transport.
399 */
400 private Socket clientSocket;
401
402 /**
403 * The thread of execution associated with this client connection.
404 */
405 private Thread clientSocketThread;
406
407 /**
408 * Returns string for diagnostic logging
409 */
410 public String toString() {
411 return getClass().getName() + " for " + clientSocket + " on " + clientSocketThread;
412 }
413
414 public ClientConnectionRunner() {
415 }
416
417 /**
418 * The dispose operation that terminates the runner. Should only be
419 * called by the ServerConnection that owns the ClientConnectionRunner
420 */
421 public void dispose() {
422 synchronized( this ) {
423 if (null != clientSocketThread) {
424 // Execution of this block means that the run() method
425 // hasn't finished yet. So we interrupt the thread
426 // to terminate run() and wait for the run() method
427 // to finish. The notifyAll() at the end of run() will
428 // wake this thread and allow dispose() to end.
429 clientSocketThread.interrupt();
430 clientSocketThread = null;
431 try {
432 wait();
433 } catch (InterruptedException ie) {
434 // Expected - return from the method
435 }
436 }
437 }
438 }
439
440 /**
441 * Sets the socket for a ClientConnectionRunner.
442 *
443 * @param socket the client socket associated with this ClientConnectionRunner
444 */
445 public void setSocket(Socket socket) {
446 clientSocket = socket;
447 }
448
449 /**
450 * Provides the body for the thread of execution dealing with a particular client
451 * connection. An appropriate ConnectionHandler is created, applied, executed,
452 * and released.
453 */
454 public void run() {
455 ConnectionHandler handler = null;
456 try {
457 clientSocketThread = Thread.currentThread();
458
459 handler = ServerConnection.this.handlerFactory.createConnectionHandler();
460 String connectionString = null;
461 if( getLogger().isDebugEnabled() ) {
462 connectionString = getConnectionString();
463 String message = "Starting " + connectionString;
464 getLogger().debug( message );
465 }
466
467 handler.handleConnection(clientSocket);
468
469 if( getLogger().isDebugEnabled() ) {
470 String message = "Ending " + connectionString;
471 getLogger().debug( message );
472 }
473
474 } catch( Throwable e ) {
475 getLogger().error( "Error handling connection", e );
476 } finally {
477
478 // remove this connection from map!
479 connPerIpMap.removeConnectionPerIP(clientSocket.getInetAddress().getHostAddress());
480
481 // Close the underlying socket
482 try {
483 if (clientSocket != null) {
484 clientSocket.close();
485 }
486 } catch( IOException ioe ) {
487 getLogger().warn( "Error shutting down connection", ioe );
488 }
489
490 clientSocket = null;
491
492 // Null out the thread, notify other threads to encourage
493 // a context switch
494 synchronized( this ) {
495 clientSocketThread = null;
496
497 Thread.interrupted();
498
499 // Release the handler and kill the reference to the handler factory
500 //
501 // This needs to be done after the clientSocketThread is nulled out,
502 // otherwise we could trash a reused ClientConnectionRunner
503 if (handler != null) {
504 ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
505 handler = null;
506 }
507
508 // Remove this runner from the list of active connections.
509 ServerConnection.this.removeClientConnectionRunner(this);
510
511 notifyAll();
512 }
513 }
514 }
515
516 /**
517 * Helper method to return a formatted string with connection transport information.
518 *
519 * @return a formatted string
520 */
521 private String getConnectionString() {
522 if (clientSocket == null) {
523 return "invalid socket";
524 }
525 StringBuffer connectionBuffer
526 = new StringBuffer(256)
527 .append("connection on ")
528 .append(clientSocket.getLocalAddress().getHostAddress().toString())
529 .append(":")
530 .append(clientSocket.getLocalPort())
531 .append(" from ")
532 .append(clientSocket.getInetAddress().getHostAddress().toString())
533 .append(":")
534 .append(clientSocket.getPort());
535 return connectionBuffer.toString();
536 }
537 }
538
539 /**
540 * The factory for producing handlers.
541 */
542 private class ClientConnectionRunnerFactory
543 implements ObjectFactory {
544
545 /**
546 * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
547 */
548 public Object newInstance() throws Exception {
549 return new ClientConnectionRunner();
550 }
551
552 /**
553 * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
554 */
555 public Class getCreatedClass() {
556 return ClientConnectionRunner.class;
557 }
558
559 /**
560 * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommission(Object)
561 */
562 public void decommission( Object object ) throws Exception {
563 return;
564 }
565 }
566 }
567
568