1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.james.jms.activemq;
21
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Iterator;
25
26 import javax.jms.Connection;
27 import javax.jms.JMSException;
28 import javax.jms.MessageConsumer;
29 import javax.jms.Queue;
30 import javax.jms.Session;
31 import javax.jms.Topic;
32
33 import org.apache.activemq.ActiveMQConnectionFactory;
34 import org.apache.activemq.broker.BrokerService;
35 import org.apache.commons.logging.Log;
36 import org.apache.james.api.jms.MailBuilder;
37 import org.apache.james.api.jms.MailConsumer;
38 import org.apache.james.jms.MailMessageListener;
39
40
41
42
43
44
45
46
47
48 public class BrokerManager {
49
50 private final BrokerService broker;
51 private final Collection registrations;
52 private final Log log;
53
54 private ActiveMQConnectionFactory factory;
55 private Connection connection;
56 private boolean started;
57
58 public BrokerManager(final BrokerService broker, final Log log) {
59 this.broker = broker;
60 this.registrations = new ArrayList();
61 this.log = log;
62 }
63
64 public void start() throws JMSException {
65
66 synchronized (registrations) {
67 if (!started) {
68 try {
69 broker.start();
70 } catch (Exception e) {
71 throw new ActiveMQException(e);
72 }
73 factory = new ActiveMQConnectionFactory("vm://localhost");
74 connection = factory.createConnection();
75
76 for (final Iterator it=registrations.iterator();it.hasNext();) {
77 final ConsumerRegistration registration = (ConsumerRegistration) it.next();
78 try {
79 registration.register(this);
80 it.remove();
81 } catch (JMSException e) {
82 if (log.isErrorEnabled()) {
83 log.error("Failed to add consumer to " + registration.destination
84 + ": " + e.getMessage());
85 }
86 if (log.isDebugEnabled()) {
87 log.debug("Failed to register " + registration, e);
88 }
89 }
90 }
91
92 started = true;
93 }
94 }
95 }
96
97 public void stop() throws JMSException {
98
99 synchronized (registrations) {
100 if (started) {
101 connection.stop();
102 try {
103 broker.stop();
104 } catch (Exception e) {
105 throw new ActiveMQException(e);
106 }
107 started = false;
108 }
109 }
110 }
111
112
113
114
115
116
117
118
119
120
121 public void consumeQueue(final MailConsumer consumer, final MailBuilder builder,
122 String destination) throws JMSException {
123 if (started) {
124 doListen(consumer, builder, destination);
125 } else {
126 register(consumer, builder, destination, false);
127 }
128 }
129
130 private void doListen(final MailConsumer consumer, final MailBuilder builder, String destination) throws JMSException {
131 final MailMessageListener listener = new MailMessageListener(consumer, builder);
132 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
133 Queue queue = session.createQueue(destination);
134 MessageConsumer messageConsumer = session.createConsumer(queue);
135 messageConsumer.setMessageListener(listener);
136 connection.start();
137 if (log.isTraceEnabled()) {
138 log.trace("Attached " + consumer + " to " + destination);
139 }
140 }
141
142
143
144
145
146
147
148
149
150
151 public void subscribeToTopic(final MailConsumer consumer, final MailBuilder builder,
152 String destination) throws JMSException {
153 if (started) {
154 doSubscribe(consumer, builder, destination);
155 } else {
156 register(consumer, builder, destination, true);
157 }
158 }
159
160 private void doSubscribe(final MailConsumer consumer, final MailBuilder builder, String destination) throws JMSException {
161 final MailMessageListener listener = new MailMessageListener(consumer, builder);
162 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
163 Topic topic = session.createTopic(destination);
164 MessageConsumer messageConsumer = session.createConsumer(topic);
165 messageConsumer.setMessageListener(listener);
166 connection.start();
167 if (log.isTraceEnabled()) {
168 log.trace("Subscribed " + consumer + " to " + destination);
169 }
170 }
171
172 private void register(final MailConsumer consumer, final MailBuilder builder,
173 final String destination, final boolean topic) throws JMSException {
174 final ConsumerRegistration registration = new ConsumerRegistration(consumer, builder, destination, topic);
175
176 synchronized (registrations) {
177
178
179 if (started) {
180
181 registration.register(this);
182 } else {
183
184 if (log.isDebugEnabled()) {
185 log.debug("Registered: " + registration);
186 }
187 registrations.add(registration);
188 }
189 }
190 }
191
192
193
194
195 private static final class ConsumerRegistration {
196 public final MailConsumer consumer;
197 public final MailBuilder builder;
198 public final String destination;
199 public final boolean topic;
200
201 public ConsumerRegistration(final MailConsumer consumer, final MailBuilder builder, final String destination, final boolean topic) {
202 super();
203 this.consumer = consumer;
204 this.builder = builder;
205 this.destination = destination;
206 this.topic = topic;
207 }
208
209 public void register(final BrokerManager brokerManager) throws JMSException {
210 if (topic) {
211 brokerManager.doSubscribe(consumer, builder, destination);
212 } else {
213 brokerManager.doListen(consumer, builder, destination);
214 }
215 }
216
217
218
219
220
221
222
223 public String toString()
224 {
225 final String TAB = " ";
226
227 String retValue = "ConsumerRegistration ( "
228 + super.toString() + TAB
229 + "consumer = " + this.consumer + TAB
230 + "builder = " + this.builder + TAB
231 + "destination = " + this.destination + TAB
232 + "topic = " + this.topic + TAB
233 + " )";
234
235 return retValue;
236 }
237 }
238 }