View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.phoenix.jms.activemq;
21  
22  import java.lang.reflect.InvocationTargetException;
23  
24  import javax.jms.JMSException;
25  
26  import org.apache.activemq.broker.BrokerService;
27  import org.apache.avalon.framework.activity.Initializable;
28  import org.apache.avalon.framework.configuration.Configurable;
29  import org.apache.avalon.framework.configuration.Configuration;
30  import org.apache.avalon.framework.configuration.ConfigurationException;
31  import org.apache.avalon.framework.logger.AbstractLogEnabled;
32  import org.apache.avalon.framework.logger.Logger;
33  import org.apache.avalon.framework.service.ServiceException;
34  import org.apache.avalon.framework.service.ServiceManager;
35  import org.apache.avalon.framework.service.Serviceable;
36  import org.apache.commons.logging.impl.AvalonLogger;
37  import org.apache.james.api.jms.MailBuilder;
38  import org.apache.james.api.jms.MailConsumer;
39  import org.apache.james.jms.activemq.BrokerManager;
40  import org.apache.james.jms.builder.SimpleMailBuilder;
41  import org.apache.james.jms.consumer.SpoolToJamesMailConsumer;
42  import org.apache.james.services.MailServer;
43  
44  public class JMSService extends AbstractLogEnabled implements Configurable, Serviceable, Initializable {
45  
46      private static final String NAME = "JAMES ActiveMQ JMS ";
47      private static final String LOG_MESSAGE_DISABLED = NAME + "is disabled.";
48      private static final String LOG_MESSAGE_DISABLED_BY_CONFIGURATION = NAME + "is disabled by configuration.";
49      
50      private MailServer mailServer;
51      private BrokerManager brokerManager;
52      
53      public void configure(Configuration configuration) throws ConfigurationException {
54          final boolean isEnabled = configuration.getAttributeAsBoolean("enabled", true);
55          if (isEnabled) {
56              BrokerManager brokerManager = configureService(configuration.getChild("activemq-broker"));
57              Configuration[] consumers = configuration.getChildren("consumer");
58              this.brokerManager = configureConsumers(brokerManager, consumers);
59          } else {
60              getLogger().info(LOG_MESSAGE_DISABLED_BY_CONFIGURATION);
61              brokerManager = null;
62          }
63      }
64          
65      private BrokerManager configureConsumers(final BrokerManager broker, 
66              final Configuration[] consumers) throws ConfigurationException {
67          final BrokerManager result = broker;
68          if (broker != null && consumers != null) {
69              final int length = consumers.length;
70              for (int i=0;i<length;i++) {
71                  final Configuration configuration = consumers[i];
72                  final MailConsumer consumer = createConsumer(configuration);
73                  final Configuration builderConfiguration = configuration.getChild("builder");
74                  final MailBuilder builder = createBuilder(builderConfiguration);
75                  final Configuration destination = configuration.getChild("destination");
76                  if (destination == null) {
77                      throw new ConfigurationException("Element 'consumer' must contain one element 'destination'.");
78                  } else {
79                      final Configuration nameConfiguration = destination.getChild("name");
80                      if (nameConfiguration == null) {
81                          throw new ConfigurationException("Element 'destination' must contain one element 'name'.");
82                      } else {    
83                          final String name = nameConfiguration.getValue();
84                          try {
85                              if (destination.getChild("queue") != null) {
86                                  broker.consumeQueue(consumer, builder, name);
87                              } else if (destination.getChild("topic") != null) {
88                                  broker.subscribeToTopic(consumer, builder, name);
89                              } else {
90                                  throw new ConfigurationException("Element 'destination' must contain either 'topic' or 'queue'.");      
91                              }
92                          } catch (JMSException e) {
93                              throw new ConfigurationException("Cannot connect to destination " + name, e);
94                          }
95                      }
96                  }
97              }
98          }
99          return result;
100     }
101     
102     private void setup(final Object subject, final Configuration configuration) throws ConfigurationException {
103         if (subject != null) {
104             setupLogger(subject);
105             if (subject instanceof Configurable && configuration != null) {
106                 final Configurable configurable = (Configurable) subject;
107                 configurable.configure(configuration);
108             }
109         }
110     }
111 
112     protected void setupLogger(Object subject) {
113         super.setupLogger(subject);
114         if (!(subject instanceof AbstractLogEnabled)) {
115             Class[] commonsLog = {org.apache.commons.logging.Log.class};
116             try {
117                 Object[] args = {new AvalonLogger(getLogger())};
118                 subject.getClass().getMethod("setLog", commonsLog).invoke(subject, args);
119             } catch (SecurityException e) {
120                 getLogger().debug("Cannot use reflection to determine whether component uses commons logging", e);
121             } catch (NoSuchMethodException e) {
122                 // ok
123             } catch (IllegalArgumentException e) {
124                 getLogger().debug("Failed to set log on" + subject, e);
125             } catch (IllegalAccessException e) {
126                 getLogger().debug("Failed to set log on" + subject, e);
127             } catch (InvocationTargetException e) {
128                 getLogger().debug("Failed to set log on" + subject, e);
129             }
130         }
131     }
132 
133     private MailBuilder createBuilder(final Configuration configuration) throws ConfigurationException {
134         final MailBuilder result;
135         if (configuration == null) {
136             throw new ConfigurationException("Element 'consumer' must contain one 'builder' element.");
137         } else {
138             String type;
139             try {
140                 type = configuration.getAttribute("type");
141             } catch (ConfigurationException e) {
142                 type = null;
143             }
144             if (type == null || "".equals(type)) {
145                 final String className = configuration.getAttribute("classname");
146                 if (className == null || "".equals(className)) {
147                     throw new ConfigurationException(
148                     "Element 'builder' requires either attribute 'classname' or 'type'.");
149                 } else {
150                     try {
151                         result = (MailBuilder) load(className);
152                     } catch (ClassCastException e) {
153                         throw new ConfigurationException("Class is not a MailConsumer: " + className, e);
154                     }
155                 }
156             } else {
157                 if ("SimpleMailBuilder".equals(type)) {
158                     SimpleMailBuilder.JamesIdGenerator idGenerator = new SimpleMailBuilder.JamesIdGenerator(mailServer);
159                     result = new SimpleMailBuilder(idGenerator);
160                 } else {
161                     throw new ConfigurationException("Unknown standard type: " +type);
162                 }
163             }
164             setup(result, configuration);
165         }
166         return result;
167     }
168     
169     private MailConsumer createConsumer(final Configuration configuration) throws ConfigurationException {
170         String type;
171         try {
172             type = configuration.getAttribute("type");
173         } catch (ConfigurationException e) {
174             type = null;
175         }
176         final MailConsumer consumer;
177         if (type == null || "".equals(type)) {
178             final String className = configuration.getAttribute("classname");
179             if (className == null || "".equals(className)) {
180                 throw new ConfigurationException(
181                         "Element 'consumer' requires either attribute 'classname' or 'type'.");
182             } else {
183                 try {
184                     consumer = (MailConsumer) load(className);
185                 } catch (ClassCastException e) {
186                     throw new ConfigurationException("Class is not a MailConsumer: " + className, e);
187                 }
188             }
189         } else {
190             if ("james-in".equals(type)) {
191                 consumer = new SpoolToJamesMailConsumer(mailServer, new AvalonLogger(getLogger()));
192             } else {
193                 throw new ConfigurationException("Unknown standard type: " +type);
194             }
195         }
196         setup(consumer, configuration);
197         return consumer;
198     }
199     
200     private Object load(String className) throws ConfigurationException {
201         final Object result;
202         Class clazz;
203         try {
204             clazz = Class.forName(className);
205         } catch (ClassNotFoundException e) {
206             getLogger().debug("Trying context classloader", e);
207             ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
208             try {
209                 clazz = classLoader.loadClass(className);
210             } catch (ClassNotFoundException e1) {
211                 throw new ConfigurationException("Cannot load type " + className, e);
212             }
213         }
214         
215         try {
216             result = clazz.newInstance();
217         } catch (InstantiationException e) {
218             throw new ConfigurationException("Cannot load type " + className, e);
219         } catch (IllegalAccessException e) {
220             throw new ConfigurationException("Cannot load type " + className, e);
221         }
222         
223         return result;
224     }
225 
226     private BrokerManager configureService(Configuration configuration) throws ConfigurationException {
227         BrokerService broker = new BrokerService();
228         configureJmx(configuration, broker);
229         configurePersistent(configuration, broker);
230         Configuration[] connectors = configuration.getChildren("connector");
231         configureConnectors(broker, connectors);
232         
233         BrokerManager result = new BrokerManager(broker, new AvalonLogger(getLogger()));
234         return result;
235     }
236 
237     private void configurePersistent(Configuration configuration, BrokerService broker) {
238         final boolean persistent = configuration.getAttributeAsBoolean("persistent", false);
239         broker.setPersistent(persistent);
240     }
241 
242     private void configureConnectors(BrokerService broker, Configuration[] connectors) throws ConfigurationException {
243         if (connectors != null) {
244         for (int i=0;i<connectors.length;i++) {
245             final String url = connectors[i].getValue();
246             try {
247                 Logger logger = getLogger();
248                 if (logger.isDebugEnabled()) {
249                     logger.debug("Adding connector URL " + url);
250                 }
251                 broker.addConnector(url);
252             } catch (Exception e) {
253                 throw new ConfigurationException("Cannot add connection " + url, e);
254             }
255         } 
256         }
257     }
258 
259     private void configureJmx(Configuration configuration, BrokerService broker) {
260         final boolean jmx = configuration.getAttributeAsBoolean("jmx", true);
261         broker.setUseJmx(jmx);
262     }
263     
264 
265     public void service(ServiceManager serviceManager) throws ServiceException {
266         mailServer = (MailServer) serviceManager.lookup(MailServer.ROLE);
267     }
268 
269     public void initialize() throws Exception {
270         if (brokerManager != null) {
271             brokerManager.start();
272         } else {
273             getLogger().info(LOG_MESSAGE_DISABLED);
274         }
275     }
276 
277     
278 }