1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.apache.james.transport;
23
24 import org.apache.avalon.framework.activity.Disposable;
25 import org.apache.avalon.framework.activity.Initializable;
26 import org.apache.avalon.framework.configuration.Configurable;
27 import org.apache.avalon.framework.configuration.Configuration;
28 import org.apache.avalon.framework.configuration.ConfigurationException;
29 import org.apache.avalon.framework.container.ContainerUtil;
30 import org.apache.avalon.framework.logger.AbstractLogEnabled;
31 import org.apache.avalon.framework.service.ServiceException;
32 import org.apache.avalon.framework.service.ServiceManager;
33 import org.apache.avalon.framework.service.Serviceable;
34 import org.apache.mailet.Mail;
35 import org.apache.mailet.MailetException;
36 import javax.mail.MessagingException;
37
38 import java.util.HashMap;
39 import java.util.Iterator;
40 import java.util.Map;
41
42
43
44
45
46
47
48 public class StateAwareProcessorList
49 extends AbstractLogEnabled
50 implements Serviceable, Configurable, Initializable, Disposable, MailProcessor, ProcessorList {
51
52
53
54
55 private ServiceManager compMgr;
56
57
58
59
60 private Configuration conf;
61
62
63
64
65 private final Map<String, MailProcessor> processors;
66
67 public StateAwareProcessorList() {
68 super();
69 this.processors = new HashMap<String, MailProcessor>();
70 }
71
72
73
74
75 public void service(ServiceManager comp) throws ServiceException {
76 compMgr = comp;
77 }
78
79
80
81
82 public void configure(Configuration conf) throws ConfigurationException {
83 this.conf = conf;
84 }
85
86
87
88
89 public void initialize() throws Exception {
90
91 final Configuration[] processorConfs = conf.getChildren( "processor" );
92 for ( int i = 0; i < processorConfs.length; i++ )
93 {
94 Configuration processorConf = processorConfs[i];
95 String processorName = processorConf.getAttribute("name");
96 String processorClass = processorConf.getAttribute("class","org.apache.james.transport.LinearProcessor");
97
98 try {
99 MailProcessor processor = (MailProcessor) Thread.currentThread().getContextClassLoader().loadClass(processorClass).newInstance();
100 processors.put(processorName, processor);
101
102 setupLogger(processor, processorName);
103 ContainerUtil.service(processor, compMgr);
104 ContainerUtil.configure(processor, processorConf);
105
106 if (getLogger().isInfoEnabled()) {
107 StringBuffer infoBuffer =
108 new StringBuffer(64)
109 .append("Processor ")
110 .append(processorName)
111 .append(" instantiated.");
112 getLogger().info(infoBuffer.toString());
113 }
114 } catch (Exception ex) {
115 if (getLogger().isErrorEnabled()) {
116 StringBuffer errorBuffer =
117 new StringBuffer(256)
118 .append("Unable to init processor ")
119 .append(processorName)
120 .append(": ")
121 .append(ex.toString());
122 getLogger().error( errorBuffer.toString(), ex );
123 }
124 throw ex;
125 }
126 }
127 }
128
129
130
131
132
133
134
135
136
137 public void service(Mail mail) {
138 while (true) {
139 String processorName = mail.getState();
140 if (processorName.equals(Mail.GHOST)) {
141
142 return;
143 }
144 try {
145 MailProcessor processor
146 = (MailProcessor)processors.get(processorName);
147 if (processor == null) {
148 StringBuffer exceptionMessageBuffer =
149 new StringBuffer(128)
150 .append("Unable to find processor ")
151 .append(processorName)
152 .append(" requested for processing of ")
153 .append(mail.getName());
154 String exceptionMessage = exceptionMessageBuffer.toString();
155 getLogger().debug(exceptionMessage);
156 mail.setState(Mail.ERROR);
157 throw new MailetException(exceptionMessage);
158 }
159 StringBuffer logMessageBuffer = null;
160 if (getLogger().isDebugEnabled()) {
161 logMessageBuffer =
162 new StringBuffer(64)
163 .append("Processing ")
164 .append(mail.getName())
165 .append(" through ")
166 .append(processorName);
167 getLogger().debug(logMessageBuffer.toString());
168 }
169 processor.service(mail);
170 if (getLogger().isDebugEnabled()) {
171 logMessageBuffer =
172 new StringBuffer(128)
173 .append("Processed ")
174 .append(mail.getName())
175 .append(" through ")
176 .append(processorName);
177 getLogger().debug(logMessageBuffer.toString());
178 getLogger().debug("Result was " + mail.getState());
179 }
180 return;
181 } catch (Throwable e) {
182
183
184 StringBuffer exceptionBuffer =
185 new StringBuffer(64)
186 .append("Exception in processor <")
187 .append(processorName)
188 .append(">");
189 getLogger().error(exceptionBuffer.toString(), e);
190 if (processorName.equals(Mail.ERROR)) {
191
192
193 mail.setState(Mail.GHOST);
194 mail.setErrorMessage(e.getMessage());
195 } else {
196
197 if (!(e instanceof MessagingException)) {
198
199 mail.setState(Mail.ERROR);
200 }
201 mail.setErrorMessage(e.getMessage());
202 }
203 }
204 if (getLogger().isErrorEnabled()) {
205 StringBuffer logMessageBuffer =
206 new StringBuffer(128)
207 .append("An error occurred processing ")
208 .append(mail.getName())
209 .append(" through ")
210 .append(processorName);
211 getLogger().error(logMessageBuffer.toString());
212 getLogger().error("Result was " + mail.getState());
213 }
214 }
215 }
216
217
218
219
220
221
222
223
224
225
226
227 public void dispose() {
228 Iterator it = processors.keySet().iterator();
229 while (it.hasNext()) {
230 String processorName = (String)it.next();
231 if (getLogger().isDebugEnabled()) {
232 getLogger().debug("Processor " + processorName);
233 }
234 Object processor = processors.get(processorName);
235 ContainerUtil.dispose(processor);
236 processors.remove(processor);
237 }
238 }
239
240
241
242
243 public String[] getProcessorNames() {
244 return (String[]) processors.keySet().toArray(new String[]{});
245 }
246
247 public MailProcessor getProcessor(String name) {
248 return (MailProcessor) processors.get(name);
249 }
250
251 }