1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.apache.james.socket;
23
24 import org.apache.avalon.cornerstone.services.connection.AbstractHandlerFactory;
25 import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
26 import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
27 import org.apache.avalon.cornerstone.services.sockets.ServerSocketFactory;
28 import org.apache.avalon.cornerstone.services.sockets.SocketManager;
29 import org.apache.avalon.cornerstone.services.threads.ThreadManager;
30 import org.apache.avalon.excalibur.pool.DefaultPool;
31 import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
32 import org.apache.avalon.excalibur.pool.ObjectFactory;
33 import org.apache.avalon.excalibur.pool.Pool;
34 import org.apache.avalon.excalibur.pool.Poolable;
35 import org.apache.avalon.framework.activity.Disposable;
36 import org.apache.avalon.framework.activity.Initializable;
37 import org.apache.avalon.framework.configuration.Configurable;
38 import org.apache.avalon.framework.configuration.Configuration;
39 import org.apache.avalon.framework.configuration.ConfigurationException;
40 import org.apache.avalon.framework.container.ContainerUtil;
41 import org.apache.avalon.framework.logger.Logger;
42 import org.apache.avalon.framework.service.ServiceException;
43 import org.apache.avalon.framework.service.ServiceManager;
44 import org.apache.avalon.framework.service.Serviceable;
45 import org.apache.excalibur.thread.ThreadPool;
46 import org.apache.james.api.dnsservice.DNSService;
47 import org.apache.james.util.watchdog.ThreadPerWatchdogFactory;
48 import org.apache.james.util.watchdog.Watchdog;
49 import org.apache.james.util.watchdog.WatchdogFactory;
50
51 import java.net.BindException;
52 import java.net.InetAddress;
53 import java.net.ServerSocket;
54 import java.net.UnknownHostException;
55 import java.security.Provider;
56 import java.security.Security;
57 import java.util.concurrent.atomic.AtomicLong;
58
59
60
61
62
63
64 public abstract class AbstractJamesService extends AbstractHandlerFactory
65 implements Serviceable, Configurable, Disposable, Initializable, ConnectionHandlerFactory, ObjectFactory {
66
67
68
69
70 protected static final int DEFAULT_TIMEOUT = 5* 60 * 1000;
71
72
73
74
75 protected static final String TIMEOUT_NAME = "connectiontimeout";
76
77
78
79
80 protected static final int DEFAULT_BACKLOG = 5;
81
82
83
84
85 protected static final String BACKLOG_NAME = "connectionBacklog";
86
87
88
89
90 public static final String HELLO_NAME = "helloName";
91
92
93
94
95 private JamesConnectionManager connectionManager;
96
97
98
99
100
101 protected String threadGroup;
102
103
104
105
106
107 protected ThreadPool threadPool = null;
108
109
110
111
112 protected String serverSocketType = "plain";
113
114
115
116
117 protected int port = -1;
118
119
120
121
122
123 protected InetAddress bindTo = null;
124
125
126
127
128 protected ServerSocket serverSocket;
129
130
131
132
133
134
135 protected String connectionName;
136
137
138
139
140 protected Integer connectionLimit;
141
142
143
144
145
146 protected int timeout;
147
148
149
150
151 protected int backlog;
152
153
154
155
156 protected String helloName;
157
158
159
160
161 private ServiceManager componentManager;
162
163
164
165
166 private volatile boolean enabled;
167
168
169
170
171 private boolean m_disposed = false;
172
173
174
175
176
177 protected Pool theHandlerPool = null;
178
179
180
181
182 protected WatchdogFactory theWatchdogFactory = null;
183
184
185
186
187 private DNSService dnsServer = null;
188
189
190
191
192
193
194 private AtomicLong handlerCount = new AtomicLong(0);
195
196 private boolean connPerIPConfigured = false;
197 private int connPerIP = 0;
198
199
200
201
202 private String streamDumpDir = null;
203
204 public void setConnectionManager(JamesConnectionManager connectionManager) {
205 this.connectionManager = connectionManager;
206 }
207
208
209
210
211 public void service(ServiceManager comp) throws ServiceException {
212 super.service( comp );
213 componentManager = comp;
214 JamesConnectionManager connectionManager =
215 (JamesConnectionManager)componentManager.lookup(JamesConnectionManager.ROLE);
216 setConnectionManager(connectionManager);
217 dnsServer = (DNSService) comp.lookup(DNSService.ROLE);
218 }
219
220
221
222
223 public void configure(Configuration conf) throws ConfigurationException {
224 enabled = conf.getAttributeAsBoolean("enabled", true);
225 final Logger logger = getLogger();
226 if (!enabled) {
227 logger.info(getServiceType() + " disabled by configuration");
228 return;
229 }
230
231 Configuration handlerConfiguration = conf.getChild("handler");
232
233
234
235
236
237
238
239
240 super.configure(handlerConfiguration);
241
242
243 boolean streamdump=handlerConfiguration.getChild("streamdump").getAttributeAsBoolean("enabled", false);
244 String streamdumpDir=streamdump ? handlerConfiguration.getChild("streamdump").getAttribute("directory", null) : null;
245 setStreamDumpDir(streamdumpDir);
246
247
248 port = conf.getChild("port").getValueAsInteger(getDefaultPort());
249
250 Configuration serverSocketTypeConf = conf.getChild("serverSocketType", false);
251 String confSocketType = null;
252 if (serverSocketTypeConf != null ) {
253 confSocketType = serverSocketTypeConf.getValue();
254 }
255
256 if (confSocketType == null) {
257
258
259
260
261 final boolean useTLS = conf.getChild("useTLS").getValueAsBoolean(isDefaultTLSEnabled());
262 if (useTLS) {
263 serverSocketType = "ssl";
264 loadJCEProviders(conf, logger);
265 }
266 } else {
267 serverSocketType = confSocketType;
268 }
269
270 StringBuffer infoBuffer;
271 threadGroup = conf.getChild("threadGroup").getValue(null);
272 if (threadGroup != null) {
273 infoBuffer =
274 new StringBuffer(64)
275 .append(getServiceType())
276 .append(" uses thread group: ")
277 .append(threadGroup);
278 logger.info(infoBuffer.toString());
279 }
280 else {
281 logger.info(getServiceType() + " uses default thread group.");
282 }
283
284 try {
285 final String bindAddress = conf.getChild("bind").getValue(null);
286 if( null != bindAddress ) {
287 bindTo = InetAddress.getByName(bindAddress);
288 infoBuffer =
289 new StringBuffer(64)
290 .append(getServiceType())
291 .append(" bound to: ")
292 .append(bindTo);
293 logger.info(infoBuffer.toString());
294 }
295 }
296 catch( final UnknownHostException unhe ) {
297 throw new ConfigurationException( "Malformed bind parameter in configuration of service " + getServiceType(), unhe );
298 }
299
300 configureHelloName(handlerConfiguration);
301
302 timeout = handlerConfiguration.getChild(TIMEOUT_NAME).getValueAsInteger(DEFAULT_TIMEOUT);
303
304 infoBuffer =
305 new StringBuffer(64)
306 .append(getServiceType())
307 .append(" handler connection timeout is: ")
308 .append(timeout);
309 logger.info(infoBuffer.toString());
310
311 backlog = conf.getChild(BACKLOG_NAME).getValueAsInteger(DEFAULT_BACKLOG);
312
313 infoBuffer =
314 new StringBuffer(64)
315 .append(getServiceType())
316 .append(" connection backlog is: ")
317 .append(backlog);
318 logger.info(infoBuffer.toString());
319
320 String connectionLimitString = conf.getChild("connectionLimit").getValue(null);
321 if (connectionLimitString != null) {
322 try {
323 connectionLimit = new Integer(connectionLimitString);
324 } catch (NumberFormatException nfe) {
325 logger.error("Connection limit value is not properly formatted.", nfe);
326 }
327 if (connectionLimit.intValue() < 0) {
328 logger.error("Connection limit value cannot be less than zero.");
329 throw new ConfigurationException("Connection limit value cannot be less than zero.");
330 }
331 } else {
332 connectionLimit = new Integer(connectionManager.getMaximumNumberOfOpenConnections());
333 }
334 infoBuffer = new StringBuffer(128)
335 .append(getServiceType())
336 .append(" will allow a maximum of ")
337 .append(connectionLimit.intValue())
338 .append(" connections.");
339 logger.info(infoBuffer.toString());
340
341 String connectionLimitPerIP = conf.getChild("connectionLimitPerIP").getValue(null);
342 if (connectionLimitPerIP != null) {
343 try {
344 connPerIP = new Integer(connectionLimitPerIP).intValue();
345 connPerIPConfigured = true;
346 } catch (NumberFormatException nfe) {
347 logger.error("Connection limit per IP value is not properly formatted.", nfe);
348 }
349 if (connPerIP < 0) {
350 logger.error("Connection limit per IP value cannot be less than zero.");
351 throw new ConfigurationException("Connection limit value cannot be less than zero.");
352 }
353 } else {
354 connPerIP = connectionManager.getMaximumNumberOfOpenConnectionsPerIP();
355 }
356 infoBuffer = new StringBuffer(128)
357 .append(getServiceType())
358 .append(" will allow a maximum of ")
359 .append(connPerIP)
360 .append(" per IP connections for " +getServiceType());
361 logger.info(infoBuffer.toString());
362
363 }
364
365 private void loadJCEProviders(Configuration conf, final Logger logger) throws ConfigurationException {
366 final Configuration [] providerConfiguration = conf.getChildren("provider");
367 for (int i = 0; i < providerConfiguration.length; i++) {
368 final String providerName = providerConfiguration[i].getValue();
369 loadProvider(logger, providerName);
370 }
371 }
372
373 private void loadProvider(final Logger logger, final String providerName) {
374 if (providerName == null) {
375 logger.warn("Failed to specify provider. Continuing but JCE provider will not be loaded");
376 } else {
377 try {
378 logger.debug("Trying to load JCE provider '" + providerName + "'");
379 Security.addProvider((Provider) Class.forName(providerName).newInstance());
380 logger.info("Load JCE provider '" + providerName + "'");
381 } catch (IllegalAccessException e) {
382 logJCELoadFailure(logger, providerName, e);
383 } catch (InstantiationException e) {
384 logJCELoadFailure(logger, providerName, e);
385 } catch (ClassNotFoundException e) {
386 logJCELoadFailure(logger, providerName, e);
387 } catch (RuntimeException e) {
388 logJCELoadFailure(logger, providerName, e);
389 }
390 }
391 }
392
393 private void logJCELoadFailure(final Logger logger, final String providerName, Exception e) {
394 logger.warn("Cannot load JCE provider" + providerName);
395 logger.debug(e.getMessage(), e);
396 }
397
398 protected void setStreamDumpDir(String streamdumpDir) {
399 this.streamDumpDir = streamdumpDir;
400 }
401
402 protected String getLocalHostName() {
403 String hostName = null;
404 try {
405 hostName = dnsServer.getHostName(dnsServer.getLocalHost());
406 } catch (UnknownHostException ue) {
407 hostName = "localhost";
408 }
409 return hostName;
410 }
411
412 private void configureHelloName(Configuration handlerConfiguration) {
413 StringBuffer infoBuffer;
414 String hostName = null;
415 try {
416 hostName = dnsServer.getHostName(dnsServer.getLocalHost());
417 } catch (UnknownHostException ue) {
418 hostName = "localhost";
419 }
420
421 infoBuffer =
422 new StringBuffer(64)
423 .append(getServiceType())
424 .append(" is running on: ")
425 .append(hostName);
426 getLogger().info(infoBuffer.toString());
427
428 Configuration helloConf = handlerConfiguration.getChild(HELLO_NAME);
429
430 if (helloConf != null) {
431 boolean autodetect = helloConf.getAttributeAsBoolean("autodetect", true);
432 if (autodetect) {
433 helloName = hostName;
434 } else {
435
436 helloName = helloConf.getValue("localhost");
437 }
438 } else {
439 helloName = null;
440 }
441 infoBuffer =
442 new StringBuffer(64)
443 .append(getServiceType())
444 .append(" handler hello name is: ")
445 .append(helloName);
446 getLogger().info(infoBuffer.toString());
447 }
448
449
450
451
452 public void initialize() throws Exception {
453 if (!isEnabled()) {
454 getLogger().info(getServiceType() + " Disabled");
455 System.out.println(getServiceType() + " Disabled");
456 return;
457 }
458 getLogger().debug(getServiceType() + " init...");
459
460
461 ThreadManager threadManager = (ThreadManager) componentManager.lookup(ThreadManager.ROLE);
462 SocketManager socketManager = (SocketManager) componentManager.lookup(SocketManager.ROLE);
463
464 initializeThreadPool(threadManager);
465
466 initializeServerSocket(socketManager);
467
468 getLogger().debug(getServiceType() + " ...init end");
469
470 initializeHandlerPool();
471
472
473 ContainerUtil.enableLogging(theHandlerPool, getLogger());
474 ContainerUtil.initialize(theHandlerPool);
475
476 theWatchdogFactory = getWatchdogFactory();
477
478 }
479
480 private void initializeThreadPool(ThreadManager threadManager) {
481 if (threadGroup != null) {
482 threadPool = threadManager.getThreadPool(threadGroup);
483 } else {
484 threadPool = threadManager.getDefaultThreadPool();
485 }
486 }
487
488 private void initializeServerSocket(SocketManager socketManager) throws Exception {
489 try {
490 initializeServerSocketWorker(socketManager);
491 } catch (BindException e) {
492
493 String errorMessage = getBindingErrorMessage(e);
494 System.out.println("------------------------------");
495 System.out.println(errorMessage);
496 System.out.println("------------------------------");
497 getLogger().fatalError(errorMessage);
498 throw e;
499 }
500 }
501
502 private String getBindingErrorMessage(BindException e) {
503
504 StringBuffer errorMessage = new StringBuffer();
505 errorMessage.append("FATAL ERROR when starting service '").append(getServiceType()).append("'! ");
506 errorMessage.append("could not bind to ");
507 errorMessage.append(bindTo == null ? "0.0.0.0" : bindTo.toString());
508 errorMessage.append(":").append(port).append(". ");
509
510
511 if (e.getMessage().indexOf("Address already in use") != -1) {
512 errorMessage.append("Port is already exclusively in use by another application.");
513 } else if (e.getMessage().indexOf("Permission denied") != -1) {
514 errorMessage.append("The user account James is running under has not enough privileges to bind to this ");
515 if (port < 1024) errorMessage.append("privileged ");
516 errorMessage.append("port.");
517 } else {
518 errorMessage.append(e.getMessage());
519 }
520 return errorMessage.toString();
521 }
522
523 private void initializeServerSocketWorker(SocketManager socketManager) throws Exception {
524 ServerSocketFactory factory = socketManager.getServerSocketFactory(serverSocketType);
525 ServerSocket serverSocket = factory.createServerSocket(port, backlog, bindTo);
526
527 if (null == connectionName) {
528 final StringBuffer sb = new StringBuffer();
529 sb.append(serverSocketType);
530 sb.append(':');
531 sb.append(port);
532
533 if (null != bindTo) {
534 sb.append('/');
535 sb.append(bindTo);
536 }
537 connectionName = sb.toString();
538 }
539
540 if ((connectionLimit != null)) {
541 if (null != threadPool) {
542 if (connPerIPConfigured) {
543 connectionManager.connect(connectionName, serverSocket, this, threadPool, connectionLimit.intValue(),connPerIP);
544 } else {
545 connectionManager.connect(connectionName, serverSocket, this, threadPool, connectionLimit.intValue());
546 }
547 } else {
548 if (connPerIPConfigured) {
549 connectionManager.connect(connectionName, serverSocket, this, connectionLimit.intValue(),connPerIP);
550 } else {
551 connectionManager.connect(connectionName, serverSocket, this, connectionLimit.intValue());
552 }
553 }
554 } else {
555 if (null != threadPool) {
556 if (connPerIPConfigured) {
557 connectionManager.connect(connectionName, serverSocket, this, threadPool);
558 } else {
559 connectionManager.connect(connectionName, serverSocket, this, threadPool, 0, connPerIP);
560 }
561 } else {
562 if (connPerIPConfigured) {
563 connectionManager.connect(connectionName, serverSocket, this);
564 } else {
565 connectionManager.connect(connectionName, serverSocket, this, 0, connPerIP);
566 }
567 }
568 }
569 }
570
571 private void initializeHandlerPool() throws Exception {
572 StringBuffer logBuffer =
573 new StringBuffer(64)
574 .append(getServiceType())
575 .append(" started ")
576 .append(connectionName);
577 String logString = logBuffer.toString();
578 System.out.println(logString);
579 getLogger().info(logString);
580
581 if (connectionLimit != null) {
582 theHandlerPool = new HardResourceLimitingPool(this, 5, connectionLimit.intValue());
583 if (getLogger().isDebugEnabled()) {
584 getLogger().debug("Using a bounded pool for "+getServiceType()+" handlers with upper limit " + connectionLimit.intValue());
585 }
586 } else {
587
588
589 theHandlerPool = new DefaultPool(this, null, 5, 30);
590 getLogger().debug("Using an unbounded pool for "+getServiceType()+" handlers.");
591 }
592 }
593
594
595
596
597 public void dispose() {
598
599 if (!isEnabled()) {
600 return;
601 }
602
603 if( m_disposed )
604 {
605 if( getLogger().isWarnEnabled() )
606 {
607 getLogger().warn( "ignoring disposal request - already disposed" );
608 }
609 return;
610 }
611
612 if( getLogger().isDebugEnabled() )
613 {
614 getLogger().debug( "disposal" );
615 }
616
617 m_disposed = true;
618 if( getLogger().isDebugEnabled() )
619 {
620 StringBuffer infoBuffer =
621 new StringBuffer(64).append(getServiceType()).append(
622 " dispose... ").append(connectionName);
623 getLogger().debug(infoBuffer.toString());
624 }
625
626 try {
627 connectionManager.disconnect(connectionName, true);
628 } catch (final Exception e) {
629 StringBuffer warnBuffer =
630 new StringBuffer(64)
631 .append("Error disconnecting ")
632 .append(getServiceType())
633 .append(": ");
634 getLogger().warn(warnBuffer.toString(), e);
635 }
636
637 componentManager = null;
638
639 connectionManager = null;
640 threadPool = null;
641
642
643
644 System.gc();
645
646 getLogger().debug(getServiceType() + " ...dispose end");
647 }
648
649
650
651
652
653
654
655
656 protected WatchdogFactory getWatchdogFactory() {
657 WatchdogFactory theWatchdogFactory = null;
658 theWatchdogFactory = new ThreadPerWatchdogFactory(threadPool, timeout);
659 ContainerUtil.enableLogging(theWatchdogFactory,getLogger());
660 return theWatchdogFactory;
661 }
662
663
664
665
666
667
668
669 public final boolean isEnabled() {
670 return enabled;
671 }
672
673
674
675
676
677
678 protected ConnectionHandler newHandler()
679 throws Exception {
680 AbstractJamesHandler theHandler = (AbstractJamesHandler)theHandlerPool.get();
681
682 if (getLogger().isDebugEnabled()) {
683 getLogger().debug("Handler [" + theHandler + "] obtained from pool.");
684 }
685
686 Watchdog theWatchdog = theWatchdogFactory.getWatchdog(theHandler.getWatchdogTarget());
687
688 theHandler.setConfigurationData(getConfigurationData());
689 theHandler.setStreamDumpDir(streamDumpDir);
690 theHandler.setWatchdog(theWatchdog);
691 return theHandler;
692 }
693
694 protected abstract Object getConfigurationData();
695
696
697
698
699 public void releaseConnectionHandler( ConnectionHandler connectionHandler ) {
700 if (getLogger().isDebugEnabled()) {
701 getLogger().debug("Returning Handler [" + connectionHandler + "] to pool.");
702 }
703 theHandlerPool.put((Poolable)connectionHandler);
704 }
705
706
707
708
709
710
711
712
713
714
715
716 protected int getDefaultPort() {
717 return 0;
718 }
719
720
721
722
723
724
725 protected boolean isDefaultTLSEnabled() {
726 return false;
727 }
728
729
730
731
732
733
734
735
736
737
738
739 public String getServiceType() {
740 String name = getClass().getName();
741 int p = name.lastIndexOf(".");
742 if (p > 0 && p < name.length() - 2) {
743 name = name.substring(p + 1);
744 }
745 return name;
746 }
747
748
749
750
751
752
753 public int getPort() {
754 return port;
755 }
756
757
758
759
760
761
762 public String getNetworkInterface() {
763 if (bindTo == null) {
764 return "All";
765 } else {
766 return bindTo.getHostAddress();
767 }
768 }
769
770
771
772
773
774
775 public String getSocketType() {
776 return serverSocketType;
777 }
778
779
780
781
782 public void decommission( Object object ) throws Exception {
783 return;
784 }
785
786
787
788
789 public ConnectionHandler createConnectionHandler() throws Exception {
790 ConnectionHandler conn = super.createConnectionHandler();
791 ContainerUtil.service(conn, componentManager);
792 return conn;
793 }
794
795
796
797
798 public Object newInstance() throws Exception {
799 final AbstractJamesHandler delegatingJamesHandler = new DelegatingJamesHandler(newProtocolHandlerInstance());
800 delegatingJamesHandler.setName("Handler-" + handlerCount.getAndAdd(1));
801 return delegatingJamesHandler;
802 }
803
804 protected abstract ProtocolHandler newProtocolHandlerInstance();
805
806
807
808
809 public Class getCreatedClass() {
810 return DelegatingJamesHandler.class;
811 }
812
813 }
814