1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.apache.james.transport;
23
24 import org.apache.avalon.framework.activity.Disposable;
25 import org.apache.avalon.framework.activity.Initializable;
26 import org.apache.avalon.framework.configuration.Configurable;
27 import org.apache.avalon.framework.configuration.Configuration;
28 import org.apache.avalon.framework.configuration.ConfigurationException;
29 import org.apache.avalon.framework.container.ContainerUtil;
30 import org.apache.avalon.framework.logger.AbstractLogEnabled;
31 import org.apache.avalon.framework.service.ServiceException;
32 import org.apache.avalon.framework.service.ServiceManager;
33 import org.apache.avalon.framework.service.Serviceable;
34 import org.apache.james.services.SpoolRepository;
35 import org.apache.james.services.SpoolManager;
36 import org.apache.mailet.Mail;
37
38 import java.util.Collection;
39 import java.util.Iterator;
40 import java.util.List;
41 import java.util.ArrayList;
42
43 import javax.annotation.PostConstruct;
44
45
46
47
48
49
50
51
52
53 public class JamesSpoolManager
54 extends AbstractLogEnabled
55 implements Serviceable, Configurable, Runnable, Disposable, SpoolManager {
56
57
58
59
60 private ServiceManager compMgr;
61
62
63
64
65 private SpoolRepository spool;
66
67
68
69
70 private int numThreads;
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 private int numActive;
93
94
95
96
97 private boolean active;
98
99
100
101
102 private Collection spoolThreads;
103
104
105
106
107 private MailProcessor processorList;
108
109
110
111
112
113
114 public void setSpool(SpoolRepository spool) {
115 this.spool = spool;
116 }
117
118
119
120
121 public void service(ServiceManager comp) throws ServiceException {
122 compMgr = comp;
123 setSpool((SpoolRepository) compMgr.lookup(SpoolRepository.ROLE));
124 }
125
126
127
128
129 public void configure(Configuration conf) throws ConfigurationException {
130 numThreads = conf.getChild("threads").getValueAsInteger(1);
131
132 String processorClass = conf.getChild("processorClass").getValue("org.apache.james.transport.StateAwareProcessorList");
133 try {
134 processorList = (MailProcessor) Thread.currentThread().getContextClassLoader().loadClass(processorClass).newInstance();
135 } catch (Exception e1) {
136 getLogger().error("Unable to instantiate spoolmanager processor: "+processorClass, e1);
137 throw new ConfigurationException("Instantiation exception: "+processorClass, e1);
138 }
139
140 try {
141 ContainerUtil.enableLogging(processorList, getLogger());
142 ContainerUtil.service(processorList, compMgr);
143 } catch (ServiceException e) {
144 getLogger().error(e.getMessage(), e);
145 throw new ConfigurationException("Servicing failed with error: "+e.getMessage(),e);
146 }
147
148 ContainerUtil.configure(processorList, conf);
149 }
150
151
152
153
154 @PostConstruct
155 public void initialize() throws Exception {
156
157 getLogger().info("JamesSpoolManager init...");
158
159 ContainerUtil.initialize(processorList);
160
161 if (getLogger().isInfoEnabled()) {
162 StringBuffer infoBuffer =
163 new StringBuffer(64)
164 .append("Spooler Manager uses ")
165 .append(numThreads)
166 .append(" Thread(s)");
167 getLogger().info(infoBuffer.toString());
168 }
169
170 active = true;
171 numActive = 0;
172 spoolThreads = new java.util.ArrayList(numThreads);
173 for ( int i = 0 ; i < numThreads ; i++ ) {
174 Thread reader = new Thread(this, "Spool Thread #" + i);
175 spoolThreads.add(reader);
176 reader.start();
177 }
178 }
179
180
181
182
183
184 public void run() {
185
186 if (getLogger().isInfoEnabled())
187 {
188 getLogger().info("Run JamesSpoolManager: "
189 + Thread.currentThread().getName());
190 getLogger().info("Spool=" + spool.getClass().getName());
191 }
192
193 numActive++;
194 while(active) {
195 String key = null;
196 try {
197 Mail mail = (Mail)spool.accept();
198 key = mail.getName();
199 if (getLogger().isDebugEnabled()) {
200 StringBuffer debugBuffer =
201 new StringBuffer(64)
202 .append("==== Begin processing mail ")
203 .append(mail.getName())
204 .append("====");
205 getLogger().debug(debugBuffer.toString());
206 }
207
208 processorList.service(mail);
209
210
211
212 if ((Mail.GHOST.equals(mail.getState())) ||
213 (mail.getRecipients() == null) ||
214 (mail.getRecipients().size() == 0)) {
215 ContainerUtil.dispose(mail);
216 spool.remove(key);
217 if (getLogger().isDebugEnabled()) {
218 StringBuffer debugBuffer =
219 new StringBuffer(64)
220 .append("==== Removed from spool mail ")
221 .append(key)
222 .append("====");
223 getLogger().debug(debugBuffer.toString());
224 }
225 }
226 else {
227
228
229
230 spool.store(mail);
231 ContainerUtil.dispose(mail);
232 spool.unlock(key);
233
234
235 }
236 mail = null;
237 } catch (InterruptedException ie) {
238 getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
239 } catch (Throwable e) {
240 if (getLogger().isErrorEnabled()) {
241 getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
242 + e.getMessage(), e);
243 }
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258 }
259 }
260 if (getLogger().isInfoEnabled())
261 {
262 getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
263 }
264 numActive--;
265 }
266
267
268
269
270
271
272
273
274
275
276
277 public void dispose() {
278 getLogger().info("JamesSpoolManager dispose...");
279 active = false;
280 for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) {
281 ((Thread) it.next()).interrupt();
282 }
283
284 long stop = System.currentTimeMillis() + 60000;
285
286 while (numActive != 0 && stop > System.currentTimeMillis()) {
287 try {
288 Thread.sleep(1000);
289 } catch (Exception ignored) {}
290 }
291 getLogger().info("JamesSpoolManager thread shutdown completed.");
292
293 ContainerUtil.dispose(processorList);
294 }
295
296 public String[] getProcessorNames() {
297 if (!(processorList instanceof ProcessorList)) {
298 return new String[0];
299 }
300 String[] processorNames = ((ProcessorList) processorList).getProcessorNames();
301 return processorNames;
302 }
303
304 public List getMailetConfigs(String processorName) {
305 MailetContainer mailetContainer = getMailetContainerByName(processorName);
306 if (mailetContainer == null) return new ArrayList();
307 return mailetContainer.getMailetConfigs();
308 }
309
310 public List getMatcherConfigs(String processorName) {
311 MailetContainer mailetContainer = getMailetContainerByName(processorName);
312 if (mailetContainer == null) return new ArrayList();
313 return mailetContainer.getMatcherConfigs();
314 }
315
316 private MailetContainer getMailetContainerByName(String processorName) {
317 if (!(processorList instanceof ProcessorList)) return null;
318
319 MailProcessor processor = ((ProcessorList) processorList).getProcessor(processorName);
320 if (!(processor instanceof MailetContainer)) return null;
321
322
323 return (MailetContainer)processor;
324 }
325 }