View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.jms.activemq;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Iterator;
25  
26  import javax.jms.Connection;
27  import javax.jms.JMSException;
28  import javax.jms.MessageConsumer;
29  import javax.jms.Queue;
30  import javax.jms.Session;
31  import javax.jms.Topic;
32  
33  import org.apache.activemq.ActiveMQConnectionFactory;
34  import org.apache.activemq.broker.BrokerService;
35  import org.apache.commons.logging.Log;
36  import org.apache.james.api.jms.MailBuilder;
37  import org.apache.james.api.jms.MailConsumer;
38  import org.apache.james.jms.MailMessageListener;
39  
40  /**
41   * <p>Manages the connection to the local ActiveMQ broker.
42   * </p><p>
43   * <strong>Note:</strong> <code>BrokerManager</code> is intended
44   * to allow concurrent access. {@link #start} and {@link stop}
45   * are synchronized on the consumer register. 
46   * </p>
47   */
48  public class BrokerManager {
49  
50      private final BrokerService broker;
51      private final Collection registrations;
52      private final Log log;
53      
54      private ActiveMQConnectionFactory factory;
55      private Connection connection;
56      private boolean started;
57      
58      public BrokerManager(final BrokerService broker, final Log log) {
59          this.broker = broker;
60          this.registrations = new ArrayList();
61          this.log = log;
62      }
63      
64      public void start() throws JMSException {
65          // Prevent concurrent start, stop, registration
66          synchronized (registrations) {
67              if (!started) {
68                  try {
69                      broker.start();
70                  } catch (Exception e) {
71                      throw new ActiveMQException(e);
72                  }
73                  factory = new ActiveMQConnectionFactory("vm://localhost");
74                  connection = factory.createConnection();
75                  
76                  for (final Iterator it=registrations.iterator();it.hasNext();) {
77                      final ConsumerRegistration registration = (ConsumerRegistration) it.next();
78                      try {
79                          registration.register(this);
80                          it.remove();
81                      } catch (JMSException e) {
82                          if (log.isErrorEnabled()) {
83                              log.error("Failed to add consumer to " + registration.destination 
84                                      + ": " + e.getMessage());
85                          }
86                          if (log.isDebugEnabled()) {
87                              log.debug("Failed to register " + registration, e);
88                          }
89                      }
90                  }
91                  
92                  started = true;
93              }
94          }
95      }
96      
97      public void stop() throws JMSException {
98          // Prevent concurrent start, stop, registration
99          synchronized (registrations) {
100           if (started) {
101               connection.stop();
102               try {
103                   broker.stop();
104               } catch (Exception e) {
105                   throw new ActiveMQException(e);
106               }
107               started = false;
108           }
109       }
110   }
111   
112   /**
113     * Sets the consumer as the listener for the queue with the given name.
114     * If the broker has not been started, the consumer will be registered
115     * and added on start.
116     * @param consumer <code>MailConsumer</code>, not null
117     * @param builder <code>MailBuilder</code>, not null
118     * @param destination name of the destination queue, not null
119     * @throws JMSException 
120     */
121   public void consumeQueue(final MailConsumer consumer, final MailBuilder builder, 
122           String destination) throws JMSException {
123       if (started) {
124           doListen(consumer, builder, destination);
125       } else {
126           register(consumer, builder, destination, false);
127       }
128   }
129 
130   private void doListen(final MailConsumer consumer, final MailBuilder builder, String destination) throws JMSException {
131       final MailMessageListener listener = new MailMessageListener(consumer, builder);
132       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
133       Queue queue = session.createQueue(destination);
134       MessageConsumer messageConsumer = session.createConsumer(queue);
135       messageConsumer.setMessageListener(listener);
136       connection.start();
137       if (log.isTraceEnabled()) {
138           log.trace("Attached " + consumer + " to " + destination);
139       }
140   }
141   
142   /**
143     * Subscribes the consumer to the topic with the given name.
144     * If the broker has not been started, the consumer will be registered
145     * and subscribed on start.
146     * @param consumer <code>MailConsumer</code>, not null
147     * @param builder <code>MailBuilder</code>, not null
148     * @param destination name of the destination queue, not null
149     * @throws JMSException 
150     */
151   public void subscribeToTopic(final MailConsumer consumer, final MailBuilder builder, 
152           String destination) throws JMSException {
153       if (started) {
154           doSubscribe(consumer, builder, destination);
155       } else {
156           register(consumer, builder, destination, true);
157       }
158   }
159 
160   private void doSubscribe(final MailConsumer consumer, final MailBuilder builder, String destination) throws JMSException {
161       final MailMessageListener listener = new MailMessageListener(consumer, builder);
162       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
163       Topic topic = session.createTopic(destination);
164       MessageConsumer messageConsumer = session.createConsumer(topic);
165       messageConsumer.setMessageListener(listener);
166       connection.start();
167       if (log.isTraceEnabled()) {
168           log.trace("Subscribed " + consumer + " to " + destination);
169       }
170   }
171 
172   private void register(final MailConsumer consumer, final MailBuilder builder, 
173           final String destination, final boolean topic) throws JMSException {
174       final ConsumerRegistration registration = new ConsumerRegistration(consumer, builder, destination, topic);
175       // Prevent concurrent start, stop, registration
176       synchronized (registrations) {
177           // After gaining the lock for registrations, check whether start
178           // or stop has now completed
179           if (started) {
180               // broker now started so add consumer now
181               registration.register(this);
182           } else {
183               // broker isn't started so add to registrations
184               if (log.isDebugEnabled()) {
185                   log.debug("Registered: " + registration);
186               }
187               registrations.add(registration);
188           }
189       }
190   }
191   
192   /**
193     * Holds a pending registration.
194     */
195   private static final class ConsumerRegistration {
196       public final MailConsumer consumer;
197       public final MailBuilder builder;
198       public final String destination;
199       public final boolean topic;
200       
201       public ConsumerRegistration(final MailConsumer consumer, final MailBuilder builder, final String destination, final boolean topic) {
202           super();
203           this.consumer = consumer;
204           this.builder = builder;
205           this.destination = destination;
206           this.topic = topic;
207       }
208       
209       public void register(final BrokerManager brokerManager) throws JMSException {
210           if (topic) {
211               brokerManager.doSubscribe(consumer, builder, destination);
212           } else {
213               brokerManager.doListen(consumer, builder, destination);
214           }
215       }
216 
217       /**
218         * Renders this object suitably for logging.
219         *
220         * @return a <code>String</code> representation 
221         * of this object.
222         */
223       public String toString()
224       {
225           final String TAB = "  ";
226           
227           String retValue = "ConsumerRegistration ( "
228               + super.toString() + TAB
229               + "consumer = " + this.consumer + TAB
230               + "builder = " + this.builder + TAB
231               + "destination = " + this.destination + TAB
232               + "topic = " + this.topic + TAB
233               + " )";
234       
235           return retValue;
236       }
237   }
238 }