1 /************************************************************************
2 * Copyright (c) 2000-2006 The Apache Software Foundation. *
3 * All rights reserved. *
4 * ------------------------------------------------------------------- *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you *
6 * may not use this file except in compliance with the License. You *
7 * may obtain a copy of the License at: *
8 * *
9 * http://www.apache.org/licenses/LICENSE-2.0 *
10 * *
11 * Unless required by applicable law or agreed to in writing, software *
12 * distributed under the License is distributed on an "AS IS" BASIS, *
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14 * implied. See the License for the specific language governing *
15 * permissions and limitations under the License. *
16 ***********************************************************************/
17
18 package org.apache.james.util.connection;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.net.ServerSocket;
23 import java.net.Socket;
24 import java.net.SocketException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27
28 import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
29 import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
30 import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
31 import org.apache.avalon.excalibur.pool.ObjectFactory;
32 import org.apache.avalon.excalibur.pool.Pool;
33 import org.apache.avalon.excalibur.pool.Poolable;
34 import org.apache.excalibur.thread.ThreadPool ;
35 import org.apache.avalon.framework.activity.Initializable;
36 import org.apache.avalon.framework.container.ContainerUtil;
37 import org.apache.avalon.framework.logger.AbstractLogEnabled;
38
39
40 /***
41 * Represents a single server socket managed by a connection manager.
42 * The connection manager will spawn a single ServerConnection for each
43 * server socket that the connection manager is managing.
44 *
45 */
46 public class ServerConnection extends AbstractLogEnabled
47 implements Initializable, Runnable {
48
49 /***
50 * This is a hack to deal with the fact that there appears to be
51 * no platform-independent way to break out of a ServerSocket
52 * accept() call. On some platforms closing either the ServerSocket
53 * itself, or its associated InputStream, causes the accept
54 * method to exit. Unfortunately, this behavior is not consistent
55 * across platforms. The deal with this, we introduce a polling
56 * loop of 20 seconds for the server socket. This introduces a
57 * cost across platforms, but is necessary to maintain cross-platform
58 * functionality.
59 */
60 private static int POLLING_INTERVAL = 20*1000;
61
62 /***
63 * The server socket which this connection is managing
64 */
65 private ServerSocket serverSocket;
66
67 /***
68 * The connection handler factory that generates connection
69 * handlers to manage client connections to this server socket
70 */
71 private ConnectionHandlerFactory handlerFactory;
72
73 /***
74 * The pool that produces ClientConnectionRunners
75 */
76 private Pool runnerPool;
77
78 /***
79 * The factory used to provide ClientConnectionRunner objects
80 */
81 private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
82
83 /***
84 * The thread pool used to spawn individual threads used to manage each
85 * client connection.
86 */
87 private ThreadPool connThreadPool;
88
89 /***
90 * The timeout for client sockets spawned off this connection.
91 */
92 private int socketTimeout;
93
94 /***
95 * The maximum number of open client connections that this server
96 * connection will allow.
97 */
98 private int maxOpenConn;
99
100 /***
101 * A collection of client connection runners.
102 */
103 private final ArrayList clientConnectionRunners = new ArrayList();
104
105 /***
106 * The thread used to manage this server connection.
107 */
108 private Thread serverConnectionThread;
109
110 /***
111 * The sole constructor for a ServerConnection.
112 *
113 * @param serverSocket the ServerSocket associated with this ServerConnection
114 * @param handlerFactory the factory that generates ConnectionHandlers for the client
115 * connections spawned off this ServerConnection
116 * @param threadPool the ThreadPool used to obtain handler threads
117 * @param timeout the client idle timeout for this ServerConnection's client connections
118 * @param maxOpenConn the maximum number of open client connections allowed for this
119 * ServerConnection
120 */
121 public ServerConnection(ServerSocket serverSocket,
122 ConnectionHandlerFactory handlerFactory,
123 ThreadPool threadPool,
124 int timeout,
125 int maxOpenConn) {
126 this.serverSocket = serverSocket;
127 this.handlerFactory = handlerFactory;
128 connThreadPool = threadPool;
129 socketTimeout = timeout;
130 this.maxOpenConn = maxOpenConn;
131 }
132
133 /***
134 * @see org.apache.avalon.framework.activity.Initializable#initialize()
135 */
136 public void initialize() throws Exception {
137 runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
138 ContainerUtil.enableLogging(runnerPool,getLogger());
139 ContainerUtil.initialize(runnerPool);
140 }
141
142 /***
143 * The dispose operation is called by the owning ConnectionManager
144 * at the end of its lifecycle. Cleans up the server connection, forcing
145 * everything to finish.
146 */
147 public void dispose() {
148 if (getLogger().isDebugEnabled()) {
149 getLogger().debug("Disposing server connection..." + this.toString());
150 }
151 synchronized( this ) {
152 if( null != serverConnectionThread ) {
153
154
155
156
157
158 Thread thread = serverConnectionThread;
159 serverConnectionThread = null;
160 thread.interrupt();
161 try {
162 serverSocket.close();
163 } catch (IOException ie) {
164
165
166
167
168
169 }
170 try {
171 if (POLLING_INTERVAL > 0) {
172 wait(2L*POLLING_INTERVAL);
173 } else {
174 wait();
175 }
176 } catch (InterruptedException ie) {
177
178 }
179 }
180 ContainerUtil.dispose(runnerPool);
181 runnerPool = null;
182 }
183
184 getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
185
186 synchronized (clientConnectionRunners) {
187 Iterator runnerIterator = clientConnectionRunners.iterator();
188 while( runnerIterator.hasNext() ) {
189 ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
190 runner.dispose();
191 runner = null;
192 }
193 clientConnectionRunners.clear();
194 }
195
196 getLogger().debug("Cleaned up clients - " + this.toString());
197
198 }
199
200 /***
201 * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
202 *
203 * @param clientConnectionRunner the ClientConnectionRunner to be added
204 */
205 private ClientConnectionRunner addClientConnectionRunner()
206 throws Exception {
207 synchronized (clientConnectionRunners) {
208 ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
209 clientConnectionRunners.add(clientConnectionRunner);
210 if (getLogger().isDebugEnabled()) {
211 getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
212 }
213 return clientConnectionRunner;
214 }
215 }
216
217 /***
218 * Removes a ClientConnectionRunner from the set managed by this ServerConnection object.
219 *
220 * @param clientConnectionRunner the ClientConnectionRunner to be removed
221 */
222 private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
223
224
225
226
227
228
229
230 if( runnerPool == null ) {
231 getLogger().info("ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : " + clientConnectionRunner );
232 return;
233 }
234
235 synchronized (clientConnectionRunners) {
236 if (clientConnectionRunners.remove(clientConnectionRunner)) {
237 if (getLogger().isDebugEnabled()) {
238 getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
239 }
240 runnerPool.put(clientConnectionRunner);
241 }
242 }
243 }
244
245 /***
246 * Provides the body for the thread of execution for a ServerConnection.
247 * Connections made to the server socket are passed to an appropriate,
248 * newly created, ClientConnectionRunner
249 */
250 public void run() {
251 serverConnectionThread = Thread.currentThread();
252
253 int ioExceptionCount = 0;
254 try {
255 serverSocket.setSoTimeout(POLLING_INTERVAL);
256 } catch (SocketException se) {
257
258 }
259
260 if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
261 StringBuffer debugBuffer =
262 new StringBuffer(128)
263 .append(serverConnectionThread.getName())
264 .append(" is listening on ")
265 .append(serverSocket.toString());
266 getLogger().debug(debugBuffer.toString());
267 }
268 while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) {
269 try {
270 Socket clientSocket = null;
271 try {
272 clientSocket = serverSocket.accept();
273 } catch( InterruptedIOException iioe ) {
274
275
276 continue;
277 } catch( IOException se ) {
278 if (ioExceptionCount > 0) {
279 getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
280 break;
281 } else {
282 continue;
283 }
284 } catch( SecurityException se ) {
285 getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
286 break;
287 }
288 ClientConnectionRunner runner = null;
289 synchronized (clientConnectionRunners) {
290 if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
291 if (getLogger().isWarnEnabled()) {
292 getLogger().warn("Maximum number of open connections exceeded - refusing connection. Current number of connections is " + clientConnectionRunners.size());
293 if (getLogger().isWarnEnabled()) {
294 Iterator runnerIterator = clientConnectionRunners.iterator();
295 getLogger().info("Connections: ");
296 while( runnerIterator.hasNext() ) {
297 getLogger().info(" " + ((ClientConnectionRunner)runnerIterator.next()).toString());
298 }
299 }
300 }
301 try {
302 clientSocket.close();
303 } catch (IOException ignored) {
304
305 }
306 continue;
307 } else {
308 clientSocket.setSoTimeout(socketTimeout);
309 runner = addClientConnectionRunner();
310 runner.setSocket(clientSocket);
311 }
312 }
313 setupLogger( runner );
314 try {
315 connThreadPool.execute( runner );
316 } catch (Exception e) {
317
318
319
320 getLogger().error("Internal error - insufficient threads available to service request. " +
321 Thread.activeCount() + " threads in service request pool.", e);
322 try {
323 clientSocket.close();
324 } catch (IOException ignored) {
325
326 }
327
328
329 removeClientConnectionRunner(runner);
330 }
331 } catch( IOException ioe ) {
332 getLogger().error( "Exception accepting connection", ioe );
333 } catch( Throwable e ) {
334 getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
335 }
336 }
337 synchronized( this ) {
338 serverConnectionThread = null;
339 Thread.currentThread().interrupted();
340 notifyAll();
341 }
342 }
343
344 /***
345 * An inner class to provide the actual body of the thread of execution
346 * that occurs upon a client connection.
347 *
348 */
349 class ClientConnectionRunner extends AbstractLogEnabled
350 implements Poolable, Runnable {
351
352 /***
353 * The Socket that this client connection is using for transport.
354 */
355 private Socket clientSocket;
356
357 /***
358 * The thread of execution associated with this client connection.
359 */
360 private Thread clientSocketThread;
361
362 /***
363 * Returns string for diagnostic logging
364 */
365 public String toString() {
366 return getClass().getName() + " for " + clientSocket + " on " + clientSocketThread;
367 }
368
369 public ClientConnectionRunner() {
370 }
371
372 /***
373 * The dispose operation that terminates the runner. Should only be
374 * called by the ServerConnection that owns the ClientConnectionRunner
375 */
376 public void dispose() {
377 synchronized( this ) {
378 if (null != clientSocketThread) {
379
380
381
382
383
384 clientSocketThread.interrupt();
385 clientSocketThread = null;
386 try {
387 wait();
388 } catch (InterruptedException ie) {
389
390 }
391 }
392 }
393 }
394
395 /***
396 * Sets the socket for a ClientConnectionRunner.
397 *
398 * @param socket the client socket associated with this ClientConnectionRunner
399 */
400 public void setSocket(Socket socket) {
401 clientSocket = socket;
402 }
403
404 /***
405 * Provides the body for the thread of execution dealing with a particular client
406 * connection. An appropriate ConnectionHandler is created, applied, executed,
407 * and released.
408 */
409 public void run() {
410 ConnectionHandler handler = null;
411 try {
412 clientSocketThread = Thread.currentThread();
413
414 handler = ServerConnection.this.handlerFactory.createConnectionHandler();
415 String connectionString = null;
416 if( getLogger().isDebugEnabled() ) {
417 connectionString = getConnectionString();
418 String message = "Starting " + connectionString;
419 getLogger().debug( message );
420 }
421
422 handler.handleConnection(clientSocket);
423
424 if( getLogger().isDebugEnabled() ) {
425 String message = "Ending " + connectionString;
426 getLogger().debug( message );
427 }
428
429 } catch( Throwable e ) {
430 getLogger().error( "Error handling connection", e );
431 } finally {
432
433
434 try {
435 if (clientSocket != null) {
436 clientSocket.close();
437 }
438 } catch( IOException ioe ) {
439 getLogger().warn( "Error shutting down connection", ioe );
440 }
441
442 clientSocket = null;
443
444
445
446 synchronized( this ) {
447 clientSocketThread = null;
448
449 Thread.currentThread().interrupted();
450
451
452
453
454
455 if (handler != null) {
456 ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
457 handler = null;
458 }
459
460
461 ServerConnection.this.removeClientConnectionRunner(this);
462
463 notifyAll();
464 }
465 }
466 }
467
468 /***
469 * Helper method to return a formatted string with connection transport information.
470 *
471 * @return a formatted string
472 */
473 private String getConnectionString() {
474 if (clientSocket == null) {
475 return "invalid socket";
476 }
477 StringBuffer connectionBuffer
478 = new StringBuffer(256)
479 .append("connection on ")
480 .append(clientSocket.getLocalAddress().getHostAddress().toString())
481 .append(":")
482 .append(clientSocket.getLocalPort())
483 .append(" from ")
484 .append(clientSocket.getInetAddress().getHostAddress().toString())
485 .append(":")
486 .append(clientSocket.getPort());
487 return connectionBuffer.toString();
488 }
489 }
490
491 /***
492 * The factory for producing handlers.
493 */
494 private class ClientConnectionRunnerFactory
495 implements ObjectFactory {
496
497 /***
498 * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
499 */
500 public Object newInstance() throws Exception {
501 return new ClientConnectionRunner();
502 }
503
504 /***
505 * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
506 */
507 public Class getCreatedClass() {
508 return ClientConnectionRunner.class;
509 }
510
511 /***
512 * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommision(Object)
513 */
514 public void decommission( Object object ) throws Exception {
515 return;
516 }
517 }
518 }
519
520