1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.apache.james.util.connection;
23
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.net.ServerSocket;
27 import java.net.Socket;
28 import java.net.SocketException;
29 import java.util.ArrayList;
30 import java.util.Iterator;
31
32 import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
33 import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
34 import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
35 import org.apache.avalon.excalibur.pool.ObjectFactory;
36 import org.apache.avalon.excalibur.pool.Pool;
37 import org.apache.avalon.excalibur.pool.Poolable;
38 import org.apache.excalibur.thread.ThreadPool ;
39 import org.apache.avalon.framework.activity.Initializable;
40 import org.apache.avalon.framework.container.ContainerUtil;
41 import org.apache.avalon.framework.logger.AbstractLogEnabled;
42
43
44
45
46
47
48
49
50 public class ServerConnection extends AbstractLogEnabled
51 implements Initializable, Runnable {
52
53
54
55
56
57
58
59
60
61
62
63
64 private static int POLLING_INTERVAL = 20*1000;
65
66
67
68
69 private ServerSocket serverSocket;
70
71
72
73
74
75 private ConnectionHandlerFactory handlerFactory;
76
77
78
79
80 private Pool runnerPool;
81
82
83
84
85 private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
86
87
88
89
90
91 private ThreadPool connThreadPool;
92
93
94
95
96 private int socketTimeout;
97
98
99
100
101
102 private int maxOpenConn;
103
104
105
106
107
108 private int maxOpenConnPerIP;
109
110 private ConnectionPerIpMap connPerIpMap;
111
112
113
114
115 private final ArrayList clientConnectionRunners = new ArrayList();
116
117
118
119
120
121 private Thread serverConnectionThread;
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 public ServerConnection(ServerSocket serverSocket,
137 ConnectionHandlerFactory handlerFactory,
138 ThreadPool threadPool,
139 int timeout,
140 int maxOpenConn, int maxOpenConnPerIP) {
141 this.serverSocket = serverSocket;
142 this.handlerFactory = handlerFactory;
143 connThreadPool = threadPool;
144 socketTimeout = timeout;
145 this.maxOpenConn = maxOpenConn;
146 this.maxOpenConnPerIP = maxOpenConnPerIP;
147 }
148
149
150
151
152 public void initialize() throws Exception {
153 runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
154 connPerIpMap = new ConnectionPerIpMap();
155 ContainerUtil.enableLogging(runnerPool,getLogger());
156 ContainerUtil.initialize(runnerPool);
157 }
158
159
160
161
162
163
164 public void dispose() {
165 if (getLogger().isDebugEnabled()) {
166 getLogger().debug("Disposing server connection..." + this.toString());
167 }
168 synchronized( this ) {
169 if( null != serverConnectionThread ) {
170
171
172
173
174
175 Thread thread = serverConnectionThread;
176 serverConnectionThread = null;
177 thread.interrupt();
178 try {
179
180 serverSocket.close();
181
182 } catch (IOException ie) {
183
184
185
186
187
188 }
189 try {
190 if (POLLING_INTERVAL > 0) {
191 wait(2L*POLLING_INTERVAL);
192 } else {
193 wait();
194 }
195 } catch (InterruptedException ie) {
196
197 }
198 }
199 ContainerUtil.dispose(runnerPool);
200 runnerPool = null;
201 }
202
203 getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
204
205 synchronized (clientConnectionRunners) {
206 Iterator runnerIterator = clientConnectionRunners.iterator();
207 while( runnerIterator.hasNext() ) {
208 ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
209 runner.dispose();
210 runner = null;
211 }
212 clientConnectionRunners.clear();
213 connPerIpMap.clearConnectionPerIP();
214
215 }
216
217 getLogger().debug("Cleaned up clients - " + this.toString());
218
219 }
220
221
222
223
224
225 private ClientConnectionRunner addClientConnectionRunner()
226 throws Exception {
227 synchronized (clientConnectionRunners) {
228 ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
229 clientConnectionRunners.add(clientConnectionRunner);
230 if (getLogger().isDebugEnabled()) {
231 getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
232 }
233 return clientConnectionRunner;
234 }
235 }
236
237
238
239
240
241
242 private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
243
244
245
246
247
248
249
250 if( runnerPool == null ) {
251 getLogger().info("ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : " + clientConnectionRunner );
252 return;
253 }
254
255 synchronized (clientConnectionRunners) {
256 if (clientConnectionRunners.remove(clientConnectionRunner)) {
257 if (getLogger().isDebugEnabled()) {
258 getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
259 }
260 runnerPool.put(clientConnectionRunner);
261 }
262 }
263
264 synchronized (this) { notify(); }
265 }
266
267
268
269
270
271
272
273 public void run() {
274 serverConnectionThread = Thread.currentThread();
275
276 int ioExceptionCount = 0;
277 try {
278 serverSocket.setSoTimeout(POLLING_INTERVAL);
279 } catch (SocketException se) {
280
281 }
282
283 if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
284 StringBuffer debugBuffer =
285 new StringBuffer(128)
286 .append(serverConnectionThread.getName())
287 .append(" is listening on ")
288 .append(serverSocket.toString());
289 getLogger().debug(debugBuffer.toString());
290 }
291 while( !Thread.interrupted() && null != serverConnectionThread ) {
292 try {
293 Socket clientSocket = null;
294 try {
295 while (maxOpenConn > 0 && clientConnectionRunners.size() >= maxOpenConn) {
296 if (getLogger().isInfoEnabled()) {
297 getLogger().info("Maximum number of open connections (" + clientConnectionRunners.size() + ") in use.");
298 }
299 synchronized (this) { wait(10000); }
300 }
301
302 clientSocket = serverSocket.accept();
303
304 } catch( InterruptedIOException iioe ) {
305
306
307 continue;
308 } catch( IOException se ) {
309 if (ioExceptionCount > 0) {
310 getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
311 break;
312 } else {
313 continue;
314 }
315 } catch( SecurityException se ) {
316 getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
317 break;
318 }
319 ClientConnectionRunner runner = null;
320 synchronized (clientConnectionRunners) {
321 if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
322 if (getLogger().isWarnEnabled()) {
323 getLogger().warn("Maximum number of open connections exceeded - refusing connection. Current number of connections is " + clientConnectionRunners.size());
324 if (getLogger().isInfoEnabled()) {
325 Iterator runnerIterator = clientConnectionRunners.iterator();
326 getLogger().info("Connections: ");
327 while( runnerIterator.hasNext() ) {
328 getLogger().info(" " + ((ClientConnectionRunner)runnerIterator.next()).toString());
329 }
330 }
331 }
332 try {
333 clientSocket.close();
334 } catch (IOException ignored) {
335
336 }
337 continue;
338 } else if ((maxOpenConnPerIP > 0) && (connPerIpMap.getConnectionPerIP(clientSocket.getInetAddress().getHostAddress()) >= maxOpenConnPerIP)) {
339 if (getLogger().isWarnEnabled()) {
340 getLogger().warn("Maximum number of open connections per IP exceeded - refusing connection. Current number of connections for " + clientSocket.getInetAddress().getHostAddress() + " is " + connPerIpMap.getConnectionPerIP(clientSocket.getInetAddress().getHostAddress()));
341 }
342 try {
343 clientSocket.close();
344 } catch (IOException ignored) {
345
346 }
347 continue;
348
349 } else {
350 connPerIpMap.addConnectionPerIP(clientSocket.getInetAddress().getHostAddress());
351 clientSocket.setSoTimeout(socketTimeout);
352 runner = addClientConnectionRunner();
353 runner.setSocket(clientSocket);
354
355 }
356 }
357 setupLogger( runner );
358 try {
359 connThreadPool.execute( runner );
360 } catch (Exception e) {
361
362
363
364 getLogger().error("Internal error - insufficient threads available to service request. " +
365 Thread.activeCount() + " threads in service request pool.", e);
366 try {
367 connPerIpMap.removeConnectionPerIP(clientSocket.getInetAddress().getHostAddress());
368 clientSocket.close();
369 } catch (IOException ignored) {
370
371 }
372
373
374 removeClientConnectionRunner(runner);
375 }
376 } catch( IOException ioe ) {
377 getLogger().error( "Exception accepting connection", ioe );
378 } catch( Throwable e ) {
379 getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
380 }
381 }
382 synchronized( this ) {
383 serverConnectionThread = null;
384 Thread.interrupted();
385 notifyAll();
386 }
387 }
388
389
390
391
392
393
394 class ClientConnectionRunner extends AbstractLogEnabled
395 implements Poolable, Runnable {
396
397
398
399
400 private Socket clientSocket;
401
402
403
404
405 private Thread clientSocketThread;
406
407
408
409
410 public String toString() {
411 return getClass().getName() + " for " + clientSocket + " on " + clientSocketThread;
412 }
413
414 public ClientConnectionRunner() {
415 }
416
417
418
419
420
421 public void dispose() {
422 synchronized( this ) {
423 if (null != clientSocketThread) {
424
425
426
427
428
429 clientSocketThread.interrupt();
430 clientSocketThread = null;
431 try {
432 wait();
433 } catch (InterruptedException ie) {
434
435 }
436 }
437 }
438 }
439
440
441
442
443
444
445 public void setSocket(Socket socket) {
446 clientSocket = socket;
447 }
448
449
450
451
452
453
454 public void run() {
455 ConnectionHandler handler = null;
456 try {
457 clientSocketThread = Thread.currentThread();
458
459 handler = ServerConnection.this.handlerFactory.createConnectionHandler();
460 String connectionString = null;
461 if( getLogger().isDebugEnabled() ) {
462 connectionString = getConnectionString();
463 String message = "Starting " + connectionString;
464 getLogger().debug( message );
465 }
466
467 handler.handleConnection(clientSocket);
468
469 if( getLogger().isDebugEnabled() ) {
470 String message = "Ending " + connectionString;
471 getLogger().debug( message );
472 }
473
474 } catch( Throwable e ) {
475 getLogger().error( "Error handling connection", e );
476 } finally {
477
478
479 connPerIpMap.removeConnectionPerIP(clientSocket.getInetAddress().getHostAddress());
480
481
482 try {
483 if (clientSocket != null) {
484 clientSocket.close();
485 }
486 } catch( IOException ioe ) {
487 getLogger().warn( "Error shutting down connection", ioe );
488 }
489
490 clientSocket = null;
491
492
493
494 synchronized( this ) {
495 clientSocketThread = null;
496
497 Thread.interrupted();
498
499
500
501
502
503 if (handler != null) {
504 ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
505 handler = null;
506 }
507
508
509 ServerConnection.this.removeClientConnectionRunner(this);
510
511 notifyAll();
512 }
513 }
514 }
515
516
517
518
519
520
521 private String getConnectionString() {
522 if (clientSocket == null) {
523 return "invalid socket";
524 }
525 StringBuffer connectionBuffer
526 = new StringBuffer(256)
527 .append("connection on ")
528 .append(clientSocket.getLocalAddress().getHostAddress().toString())
529 .append(":")
530 .append(clientSocket.getLocalPort())
531 .append(" from ")
532 .append(clientSocket.getInetAddress().getHostAddress().toString())
533 .append(":")
534 .append(clientSocket.getPort());
535 return connectionBuffer.toString();
536 }
537 }
538
539
540
541
542 private class ClientConnectionRunnerFactory
543 implements ObjectFactory {
544
545
546
547
548 public Object newInstance() throws Exception {
549 return new ClientConnectionRunner();
550 }
551
552
553
554
555 public Class getCreatedClass() {
556 return ClientConnectionRunner.class;
557 }
558
559
560
561
562 public void decommission( Object object ) throws Exception {
563 return;
564 }
565 }
566 }
567
568