1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.util.connection;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.net.ServerSocket;
25 import java.net.Socket;
26 import java.net.SocketException;
27 import java.util.ArrayList;
28 import java.util.Iterator;
29
30 import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
31 import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
32 import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
33 import org.apache.avalon.excalibur.pool.ObjectFactory;
34 import org.apache.avalon.excalibur.pool.Pool;
35 import org.apache.avalon.excalibur.pool.Poolable;
36 import org.apache.excalibur.thread.ThreadPool ;
37 import org.apache.avalon.framework.activity.Initializable;
38 import org.apache.avalon.framework.container.ContainerUtil;
39 import org.apache.avalon.framework.logger.AbstractLogEnabled;
40
41
42 /***
43 * Represents a single server socket managed by a connection manager.
44 * The connection manager will spawn a single ServerConnection for each
45 * server socket that the connection manager is managing.
46 *
47 */
48 public class ServerConnection extends AbstractLogEnabled
49 implements Initializable, Runnable {
50
51 /***
52 * This is a hack to deal with the fact that there appears to be
53 * no platform-independent way to break out of a ServerSocket
54 * accept() call. On some platforms closing either the ServerSocket
55 * itself, or its associated InputStream, causes the accept
56 * method to exit. Unfortunately, this behavior is not consistent
57 * across platforms. The deal with this, we introduce a polling
58 * loop of 20 seconds for the server socket. This introduces a
59 * cost across platforms, but is necessary to maintain cross-platform
60 * functionality.
61 */
62 private static int POLLING_INTERVAL = 20*1000;
63
64 /***
65 * The server socket which this connection is managing
66 */
67 private ServerSocket serverSocket;
68
69 /***
70 * The connection handler factory that generates connection
71 * handlers to manage client connections to this server socket
72 */
73 private ConnectionHandlerFactory handlerFactory;
74
75 /***
76 * The pool that produces ClientConnectionRunners
77 */
78 private Pool runnerPool;
79
80 /***
81 * The factory used to provide ClientConnectionRunner objects
82 */
83 private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
84
85 /***
86 * The thread pool used to spawn individual threads used to manage each
87 * client connection.
88 */
89 private ThreadPool connThreadPool;
90
91 /***
92 * The timeout for client sockets spawned off this connection.
93 */
94 private int socketTimeout;
95
96 /***
97 * The maximum number of open client connections that this server
98 * connection will allow.
99 */
100 private int maxOpenConn;
101
102 /***
103 * A collection of client connection runners.
104 */
105 private final ArrayList clientConnectionRunners = new ArrayList();
106
107 /***
108 * The thread used to manage this server connection.
109 */
110 private Thread serverConnectionThread;
111
112 /***
113 * The sole constructor for a ServerConnection.
114 *
115 * @param serverSocket the ServerSocket associated with this ServerConnection
116 * @param handlerFactory the factory that generates ConnectionHandlers for the client
117 * connections spawned off this ServerConnection
118 * @param threadPool the ThreadPool used to obtain handler threads
119 * @param timeout the client idle timeout for this ServerConnection's client connections
120 * @param maxOpenConn the maximum number of open client connections allowed for this
121 * ServerConnection
122 */
123 public ServerConnection(ServerSocket serverSocket,
124 ConnectionHandlerFactory handlerFactory,
125 ThreadPool threadPool,
126 int timeout,
127 int maxOpenConn) {
128 this.serverSocket = serverSocket;
129 this.handlerFactory = handlerFactory;
130 connThreadPool = threadPool;
131 socketTimeout = timeout;
132 this.maxOpenConn = maxOpenConn;
133 }
134
135 /***
136 * @see org.apache.avalon.framework.activity.Initializable#initialize()
137 */
138 public void initialize() throws Exception {
139 runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
140 ContainerUtil.enableLogging(runnerPool,getLogger());
141 ContainerUtil.initialize(runnerPool);
142 }
143
144 /***
145 * The dispose operation is called by the owning ConnectionManager
146 * at the end of its lifecycle. Cleans up the server connection, forcing
147 * everything to finish.
148 */
149 public void dispose() {
150 if (getLogger().isDebugEnabled()) {
151 getLogger().debug("Disposing server connection..." + this.toString());
152 }
153 synchronized( this ) {
154 if( null != serverConnectionThread ) {
155
156
157
158
159
160 Thread thread = serverConnectionThread;
161 serverConnectionThread = null;
162 thread.interrupt();
163 try {
164 serverSocket.close();
165 } catch (IOException ie) {
166
167
168
169
170
171 }
172 try {
173 if (POLLING_INTERVAL > 0) {
174 wait(2L*POLLING_INTERVAL);
175 } else {
176 wait();
177 }
178 } catch (InterruptedException ie) {
179
180 }
181 }
182 ContainerUtil.dispose(runnerPool);
183 runnerPool = null;
184 }
185
186 getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
187
188 synchronized (clientConnectionRunners) {
189 Iterator runnerIterator = clientConnectionRunners.iterator();
190 while( runnerIterator.hasNext() ) {
191 ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
192 runner.dispose();
193 runner = null;
194 }
195 clientConnectionRunners.clear();
196 }
197
198 getLogger().debug("Cleaned up clients - " + this.toString());
199
200 }
201
202 /***
203 * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
204 *
205 * @param clientConnectionRunner the ClientConnectionRunner to be added
206 */
207 private ClientConnectionRunner addClientConnectionRunner()
208 throws Exception {
209 synchronized (clientConnectionRunners) {
210 ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
211 clientConnectionRunners.add(clientConnectionRunner);
212 if (getLogger().isDebugEnabled()) {
213 getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
214 }
215 return clientConnectionRunner;
216 }
217 }
218
219 /***
220 * Removes a ClientConnectionRunner from the set managed by this ServerConnection object.
221 *
222 * @param clientConnectionRunner the ClientConnectionRunner to be removed
223 */
224 private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
225
226
227
228
229
230
231
232 if( runnerPool == null ) {
233 getLogger().info("ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : " + clientConnectionRunner );
234 return;
235 }
236
237 synchronized (clientConnectionRunners) {
238 if (clientConnectionRunners.remove(clientConnectionRunner)) {
239 if (getLogger().isDebugEnabled()) {
240 getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
241 }
242 runnerPool.put(clientConnectionRunner);
243 }
244 }
245
246 synchronized (this) { notify(); }
247 }
248
249 /***
250 * Provides the body for the thread of execution for a ServerConnection.
251 * Connections made to the server socket are passed to an appropriate,
252 * newly created, ClientConnectionRunner
253 */
254 public void run() {
255 serverConnectionThread = Thread.currentThread();
256
257 int ioExceptionCount = 0;
258 try {
259 serverSocket.setSoTimeout(POLLING_INTERVAL);
260 } catch (SocketException se) {
261
262 }
263
264 if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
265 StringBuffer debugBuffer =
266 new StringBuffer(128)
267 .append(serverConnectionThread.getName())
268 .append(" is listening on ")
269 .append(serverSocket.toString());
270 getLogger().debug(debugBuffer.toString());
271 }
272 while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) {
273 try {
274 Socket clientSocket = null;
275 try {
276 while (maxOpenConn > 0 && clientConnectionRunners.size() >= maxOpenConn) {
277 getLogger().warn("Maximum number of open connections (" + clientConnectionRunners.size() + ") in use.");
278 synchronized (this) { wait(10000); }
279 }
280
281 clientSocket = serverSocket.accept();
282
283 } catch( InterruptedIOException iioe ) {
284
285
286 continue;
287 } catch( IOException se ) {
288 if (ioExceptionCount > 0) {
289 getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
290 break;
291 } else {
292 continue;
293 }
294 } catch( SecurityException se ) {
295 getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
296 break;
297 }
298 ClientConnectionRunner runner = null;
299 synchronized (clientConnectionRunners) {
300 if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
301 if (getLogger().isWarnEnabled()) {
302 getLogger().warn("Maximum number of open connections exceeded - refusing connection. Current number of connections is " + clientConnectionRunners.size());
303 if (getLogger().isWarnEnabled()) {
304 Iterator runnerIterator = clientConnectionRunners.iterator();
305 getLogger().info("Connections: ");
306 while( runnerIterator.hasNext() ) {
307 getLogger().info(" " + ((ClientConnectionRunner)runnerIterator.next()).toString());
308 }
309 }
310 }
311 try {
312 clientSocket.close();
313 } catch (IOException ignored) {
314
315 }
316 continue;
317 } else {
318 clientSocket.setSoTimeout(socketTimeout);
319 runner = addClientConnectionRunner();
320 runner.setSocket(clientSocket);
321 }
322 }
323 setupLogger( runner );
324 try {
325 connThreadPool.execute( runner );
326 } catch (Exception e) {
327
328
329
330 getLogger().error("Internal error - insufficient threads available to service request. " +
331 Thread.activeCount() + " threads in service request pool.", e);
332 try {
333 clientSocket.close();
334 } catch (IOException ignored) {
335
336 }
337
338
339 removeClientConnectionRunner(runner);
340 }
341 } catch( IOException ioe ) {
342 getLogger().error( "Exception accepting connection", ioe );
343 } catch( Throwable e ) {
344 getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
345 }
346 }
347 synchronized( this ) {
348 serverConnectionThread = null;
349 Thread.currentThread().interrupted();
350 notifyAll();
351 }
352 }
353
354 /***
355 * An inner class to provide the actual body of the thread of execution
356 * that occurs upon a client connection.
357 *
358 */
359 class ClientConnectionRunner extends AbstractLogEnabled
360 implements Poolable, Runnable {
361
362 /***
363 * The Socket that this client connection is using for transport.
364 */
365 private Socket clientSocket;
366
367 /***
368 * The thread of execution associated with this client connection.
369 */
370 private Thread clientSocketThread;
371
372 /***
373 * Returns string for diagnostic logging
374 */
375 public String toString() {
376 return getClass().getName() + " for " + clientSocket + " on " + clientSocketThread;
377 }
378
379 public ClientConnectionRunner() {
380 }
381
382 /***
383 * The dispose operation that terminates the runner. Should only be
384 * called by the ServerConnection that owns the ClientConnectionRunner
385 */
386 public void dispose() {
387 synchronized( this ) {
388 if (null != clientSocketThread) {
389
390
391
392
393
394 clientSocketThread.interrupt();
395 clientSocketThread = null;
396 try {
397 wait();
398 } catch (InterruptedException ie) {
399
400 }
401 }
402 }
403 }
404
405 /***
406 * Sets the socket for a ClientConnectionRunner.
407 *
408 * @param socket the client socket associated with this ClientConnectionRunner
409 */
410 public void setSocket(Socket socket) {
411 clientSocket = socket;
412 }
413
414 /***
415 * Provides the body for the thread of execution dealing with a particular client
416 * connection. An appropriate ConnectionHandler is created, applied, executed,
417 * and released.
418 */
419 public void run() {
420 ConnectionHandler handler = null;
421 try {
422 clientSocketThread = Thread.currentThread();
423
424 handler = ServerConnection.this.handlerFactory.createConnectionHandler();
425 String connectionString = null;
426 if( getLogger().isDebugEnabled() ) {
427 connectionString = getConnectionString();
428 String message = "Starting " + connectionString;
429 getLogger().debug( message );
430 }
431
432 handler.handleConnection(clientSocket);
433
434 if( getLogger().isDebugEnabled() ) {
435 String message = "Ending " + connectionString;
436 getLogger().debug( message );
437 }
438
439 } catch( Throwable e ) {
440 getLogger().error( "Error handling connection", e );
441 } finally {
442
443
444 try {
445 if (clientSocket != null) {
446 clientSocket.close();
447 }
448 } catch( IOException ioe ) {
449 getLogger().warn( "Error shutting down connection", ioe );
450 }
451
452 clientSocket = null;
453
454
455
456 synchronized( this ) {
457 clientSocketThread = null;
458
459 Thread.currentThread().interrupted();
460
461
462
463
464
465 if (handler != null) {
466 ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
467 handler = null;
468 }
469
470
471 ServerConnection.this.removeClientConnectionRunner(this);
472
473 notifyAll();
474 }
475 }
476 }
477
478 /***
479 * Helper method to return a formatted string with connection transport information.
480 *
481 * @return a formatted string
482 */
483 private String getConnectionString() {
484 if (clientSocket == null) {
485 return "invalid socket";
486 }
487 StringBuffer connectionBuffer
488 = new StringBuffer(256)
489 .append("connection on ")
490 .append(clientSocket.getLocalAddress().getHostAddress().toString())
491 .append(":")
492 .append(clientSocket.getLocalPort())
493 .append(" from ")
494 .append(clientSocket.getInetAddress().getHostAddress().toString())
495 .append(":")
496 .append(clientSocket.getPort());
497 return connectionBuffer.toString();
498 }
499 }
500
501 /***
502 * The factory for producing handlers.
503 */
504 private class ClientConnectionRunnerFactory
505 implements ObjectFactory {
506
507 /***
508 * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
509 */
510 public Object newInstance() throws Exception {
511 return new ClientConnectionRunner();
512 }
513
514 /***
515 * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
516 */
517 public Class getCreatedClass() {
518 return ClientConnectionRunner.class;
519 }
520
521 /***
522 * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommision(Object)
523 */
524 public void decommission( Object object ) throws Exception {
525 return;
526 }
527 }
528 }
529
530