1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.james.phoenix.jms.activemq;
21
22 import java.lang.reflect.InvocationTargetException;
23
24 import javax.jms.JMSException;
25
26 import org.apache.activemq.broker.BrokerService;
27 import org.apache.avalon.framework.activity.Initializable;
28 import org.apache.avalon.framework.configuration.Configurable;
29 import org.apache.avalon.framework.configuration.Configuration;
30 import org.apache.avalon.framework.configuration.ConfigurationException;
31 import org.apache.avalon.framework.logger.AbstractLogEnabled;
32 import org.apache.avalon.framework.logger.Logger;
33 import org.apache.avalon.framework.service.ServiceException;
34 import org.apache.avalon.framework.service.ServiceManager;
35 import org.apache.avalon.framework.service.Serviceable;
36 import org.apache.commons.logging.impl.AvalonLogger;
37 import org.apache.james.api.jms.MailBuilder;
38 import org.apache.james.api.jms.MailConsumer;
39 import org.apache.james.jms.activemq.BrokerManager;
40 import org.apache.james.jms.builder.SimpleMailBuilder;
41 import org.apache.james.jms.consumer.SpoolToJamesMailConsumer;
42 import org.apache.james.services.MailServer;
43
44 public class JMSService extends AbstractLogEnabled implements Configurable, Serviceable, Initializable {
45
46 private static final String NAME = "JAMES ActiveMQ JMS ";
47 private static final String LOG_MESSAGE_DISABLED = NAME + "is disabled.";
48 private static final String LOG_MESSAGE_DISABLED_BY_CONFIGURATION = NAME + "is disabled by configuration.";
49
50 private MailServer mailServer;
51 private BrokerManager brokerManager;
52
53 public void configure(Configuration configuration) throws ConfigurationException {
54 final boolean isEnabled = configuration.getAttributeAsBoolean("enabled", true);
55 if (isEnabled) {
56 BrokerManager brokerManager = configureService(configuration.getChild("activemq-broker"));
57 Configuration[] consumers = configuration.getChildren("consumer");
58 this.brokerManager = configureConsumers(brokerManager, consumers);
59 } else {
60 getLogger().info(LOG_MESSAGE_DISABLED_BY_CONFIGURATION);
61 brokerManager = null;
62 }
63 }
64
65 private BrokerManager configureConsumers(final BrokerManager broker,
66 final Configuration[] consumers) throws ConfigurationException {
67 final BrokerManager result = broker;
68 if (broker != null && consumers != null) {
69 final int length = consumers.length;
70 for (int i=0;i<length;i++) {
71 final Configuration configuration = consumers[i];
72 final MailConsumer consumer = createConsumer(configuration);
73 final Configuration builderConfiguration = configuration.getChild("builder");
74 final MailBuilder builder = createBuilder(builderConfiguration);
75 final Configuration destination = configuration.getChild("destination");
76 if (destination == null) {
77 throw new ConfigurationException("Element 'consumer' must contain one element 'destination'.");
78 } else {
79 final Configuration nameConfiguration = destination.getChild("name");
80 if (nameConfiguration == null) {
81 throw new ConfigurationException("Element 'destination' must contain one element 'name'.");
82 } else {
83 final String name = nameConfiguration.getValue();
84 try {
85 if (destination.getChild("queue") != null) {
86 broker.consumeQueue(consumer, builder, name);
87 } else if (destination.getChild("topic") != null) {
88 broker.subscribeToTopic(consumer, builder, name);
89 } else {
90 throw new ConfigurationException("Element 'destination' must contain either 'topic' or 'queue'.");
91 }
92 } catch (JMSException e) {
93 throw new ConfigurationException("Cannot connect to destination " + name, e);
94 }
95 }
96 }
97 }
98 }
99 return result;
100 }
101
102 private void setup(final Object subject, final Configuration configuration) throws ConfigurationException {
103 if (subject != null) {
104 setupLogger(subject);
105 if (subject instanceof Configurable && configuration != null) {
106 final Configurable configurable = (Configurable) subject;
107 configurable.configure(configuration);
108 }
109 }
110 }
111
112 protected void setupLogger(Object subject) {
113 super.setupLogger(subject);
114 if (!(subject instanceof AbstractLogEnabled)) {
115 Class[] commonsLog = {org.apache.commons.logging.Log.class};
116 try {
117 Object[] args = {new AvalonLogger(getLogger())};
118 subject.getClass().getMethod("setLog", commonsLog).invoke(subject, args);
119 } catch (SecurityException e) {
120 getLogger().debug("Cannot use reflection to determine whether component uses commons logging", e);
121 } catch (NoSuchMethodException e) {
122
123 } catch (IllegalArgumentException e) {
124 getLogger().debug("Failed to set log on" + subject, e);
125 } catch (IllegalAccessException e) {
126 getLogger().debug("Failed to set log on" + subject, e);
127 } catch (InvocationTargetException e) {
128 getLogger().debug("Failed to set log on" + subject, e);
129 }
130 }
131 }
132
133 private MailBuilder createBuilder(final Configuration configuration) throws ConfigurationException {
134 final MailBuilder result;
135 if (configuration == null) {
136 throw new ConfigurationException("Element 'consumer' must contain one 'builder' element.");
137 } else {
138 String type;
139 try {
140 type = configuration.getAttribute("type");
141 } catch (ConfigurationException e) {
142 type = null;
143 }
144 if (type == null || "".equals(type)) {
145 final String className = configuration.getAttribute("classname");
146 if (className == null || "".equals(className)) {
147 throw new ConfigurationException(
148 "Element 'builder' requires either attribute 'classname' or 'type'.");
149 } else {
150 try {
151 result = (MailBuilder) load(className);
152 } catch (ClassCastException e) {
153 throw new ConfigurationException("Class is not a MailConsumer: " + className, e);
154 }
155 }
156 } else {
157 if ("SimpleMailBuilder".equals(type)) {
158 SimpleMailBuilder.JamesIdGenerator idGenerator = new SimpleMailBuilder.JamesIdGenerator(mailServer);
159 result = new SimpleMailBuilder(idGenerator);
160 } else {
161 throw new ConfigurationException("Unknown standard type: " +type);
162 }
163 }
164 setup(result, configuration);
165 }
166 return result;
167 }
168
169 private MailConsumer createConsumer(final Configuration configuration) throws ConfigurationException {
170 String type;
171 try {
172 type = configuration.getAttribute("type");
173 } catch (ConfigurationException e) {
174 type = null;
175 }
176 final MailConsumer consumer;
177 if (type == null || "".equals(type)) {
178 final String className = configuration.getAttribute("classname");
179 if (className == null || "".equals(className)) {
180 throw new ConfigurationException(
181 "Element 'consumer' requires either attribute 'classname' or 'type'.");
182 } else {
183 try {
184 consumer = (MailConsumer) load(className);
185 } catch (ClassCastException e) {
186 throw new ConfigurationException("Class is not a MailConsumer: " + className, e);
187 }
188 }
189 } else {
190 if ("james-in".equals(type)) {
191 consumer = new SpoolToJamesMailConsumer(mailServer, new AvalonLogger(getLogger()));
192 } else {
193 throw new ConfigurationException("Unknown standard type: " +type);
194 }
195 }
196 setup(consumer, configuration);
197 return consumer;
198 }
199
200 private Object load(String className) throws ConfigurationException {
201 final Object result;
202 Class clazz;
203 try {
204 clazz = Class.forName(className);
205 } catch (ClassNotFoundException e) {
206 getLogger().debug("Trying context classloader", e);
207 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
208 try {
209 clazz = classLoader.loadClass(className);
210 } catch (ClassNotFoundException e1) {
211 throw new ConfigurationException("Cannot load type " + className, e);
212 }
213 }
214
215 try {
216 result = clazz.newInstance();
217 } catch (InstantiationException e) {
218 throw new ConfigurationException("Cannot load type " + className, e);
219 } catch (IllegalAccessException e) {
220 throw new ConfigurationException("Cannot load type " + className, e);
221 }
222
223 return result;
224 }
225
226 private BrokerManager configureService(Configuration configuration) throws ConfigurationException {
227 BrokerService broker = new BrokerService();
228 configureJmx(configuration, broker);
229 configurePersistent(configuration, broker);
230 Configuration[] connectors = configuration.getChildren("connector");
231 configureConnectors(broker, connectors);
232
233 BrokerManager result = new BrokerManager(broker, new AvalonLogger(getLogger()));
234 return result;
235 }
236
237 private void configurePersistent(Configuration configuration, BrokerService broker) {
238 final boolean persistent = configuration.getAttributeAsBoolean("persistent", false);
239 broker.setPersistent(persistent);
240 }
241
242 private void configureConnectors(BrokerService broker, Configuration[] connectors) throws ConfigurationException {
243 if (connectors != null) {
244 for (int i=0;i<connectors.length;i++) {
245 final String url = connectors[i].getValue();
246 try {
247 Logger logger = getLogger();
248 if (logger.isDebugEnabled()) {
249 logger.debug("Adding connector URL " + url);
250 }
251 broker.addConnector(url);
252 } catch (Exception e) {
253 throw new ConfigurationException("Cannot add connection " + url, e);
254 }
255 }
256 }
257 }
258
259 private void configureJmx(Configuration configuration, BrokerService broker) {
260 final boolean jmx = configuration.getAttributeAsBoolean("jmx", true);
261 broker.setUseJmx(jmx);
262 }
263
264
265 public void service(ServiceManager serviceManager) throws ServiceException {
266 mailServer = (MailServer) serviceManager.lookup(MailServer.ROLE);
267 }
268
269 public void initialize() throws Exception {
270 if (brokerManager != null) {
271 brokerManager.start();
272 } else {
273 getLogger().info(LOG_MESSAGE_DISABLED);
274 }
275 }
276
277
278 }