1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.apache.james.mailrepository;
23
24 import org.apache.avalon.cornerstone.services.store.ObjectRepository;
25 import org.apache.avalon.cornerstone.services.store.Store;
26 import org.apache.avalon.cornerstone.services.store.StreamRepository;
27 import org.apache.avalon.framework.configuration.Configuration;
28 import org.apache.avalon.framework.configuration.ConfigurationException;
29 import org.apache.avalon.framework.configuration.DefaultConfiguration;
30 import org.apache.avalon.framework.service.ServiceException;
31 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
32 import org.apache.james.core.MimeMessageWrapper;
33 import org.apache.mailet.Mail;
34
35 import javax.mail.MessagingException;
36 import javax.mail.internet.MimeMessage;
37
38 import java.io.IOException;
39 import java.io.OutputStream;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.HashSet;
44 import java.util.Iterator;
45 import java.util.Set;
46
47
48
49
50
51
52
53
54
55
56
57
58 public class AvalonMailRepository
59 extends AbstractMailRepository {
60
61 private StreamRepository streamRepository;
62 private ObjectRepository objectRepository;
63 private String destination;
64 private Set keys;
65 private boolean fifo;
66 private boolean cacheKeys;
67
68
69
70
71 public void configure(Configuration conf) throws ConfigurationException {
72 destination = conf.getAttribute("destinationURL");
73 if (getLogger().isDebugEnabled()) {
74 getLogger().debug("AvalonMailRepository.destinationURL: " + destination);
75 }
76 String checkType = conf.getAttribute("type");
77 if (! (checkType.equals("MAIL") || checkType.equals("SPOOL")) ) {
78 String exceptionString = "Attempt to configure AvalonMailRepository as " +
79 checkType;
80 if (getLogger().isWarnEnabled()) {
81 getLogger().warn(exceptionString);
82 }
83 throw new ConfigurationException(exceptionString);
84 }
85 fifo = conf.getAttributeAsBoolean("FIFO", false);
86 cacheKeys = conf.getAttributeAsBoolean("CACHEKEYS", true);
87
88 }
89
90
91
92
93 public void initialize()
94 throws Exception {
95 super.initialize();
96 try {
97 objectRepository = (ObjectRepository) selectRepository(store, "OBJECT");
98 streamRepository = (StreamRepository) selectRepository(store, "STREAM");
99
100 if (cacheKeys) keys = Collections.synchronizedSet(new HashSet());
101
102
103 HashSet streamKeys = new HashSet();
104 for (Iterator i = streamRepository.list(); i.hasNext(); ) {
105 streamKeys.add(i.next());
106 }
107 HashSet objectKeys = new HashSet();
108 for (Iterator i = objectRepository.list(); i.hasNext(); ) {
109 objectKeys.add(i.next());
110 }
111
112 Collection strandedStreams = (Collection)streamKeys.clone();
113 strandedStreams.removeAll(objectKeys);
114 for (Iterator i = strandedStreams.iterator(); i.hasNext(); ) {
115 String key = (String)i.next();
116 remove(key);
117 }
118
119 Collection strandedObjects = (Collection)objectKeys.clone();
120 strandedObjects.removeAll(streamKeys);
121 for (Iterator i = strandedObjects.iterator(); i.hasNext(); ) {
122 String key = (String)i.next();
123 remove(key);
124 }
125
126 if (keys != null) {
127
128
129 keys.clear();
130 for (Iterator i = objectRepository.list(); i.hasNext(); ) {
131 keys.add(i.next());
132 }
133 }
134 if (getLogger().isDebugEnabled()) {
135 StringBuffer logBuffer =
136 new StringBuffer(128)
137 .append(getClass().getName())
138 .append(" created in ")
139 .append(destination);
140 getLogger().debug(logBuffer.toString());
141 }
142 } catch (Exception e) {
143 final String message = "Failed to retrieve Store component:" + e.getMessage();
144 getLogger().error( message, e );
145 throw e;
146 }
147 }
148
149 private Object selectRepository(Store store, String type) throws ServiceException {
150 DefaultConfiguration objectConfiguration
151 = new DefaultConfiguration( "repository",
152 "generated:AvalonFileRepository.compose()" );
153
154 objectConfiguration.setAttribute("destinationURL", destination);
155 objectConfiguration.setAttribute("type", type);
156 objectConfiguration.setAttribute("model", "SYNCHRONOUS");
157 return store.select(objectConfiguration);
158 }
159
160
161
162
163 protected void internalStore(Mail mc) throws MessagingException, IOException {
164 String key = mc.getName();
165 if (keys != null && !keys.contains(key)) {
166 keys.add(key);
167 }
168 boolean saveStream = true;
169
170 MimeMessage message = mc.getMessage();
171
172
173 if (message instanceof MimeMessageCopyOnWriteProxy) {
174 MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) message;
175 message = messageCow.getWrappedMessage();
176 }
177 if (message instanceof MimeMessageWrapper) {
178 MimeMessageWrapper wrapper = (MimeMessageWrapper) message;
179 if (DEEP_DEBUG) {
180 System.out.println("Retrieving from: " + wrapper.getSourceId());
181 StringBuffer debugBuffer =
182 new StringBuffer(64)
183 .append("Saving to: ")
184 .append(destination)
185 .append("/")
186 .append(mc.getName());
187 System.out.println(debugBuffer.toString());
188 System.out.println("Modified: " + wrapper.isModified());
189 }
190 StringBuffer destinationBuffer =
191 new StringBuffer(128)
192 .append(destination)
193 .append("/")
194 .append(mc.getName());
195 if (destinationBuffer.toString().equals(wrapper.getSourceId()) && !wrapper.isModified()) {
196
197
198
199 saveStream = false;
200 }
201 }
202 if (saveStream) {
203 OutputStream out = null;
204 try {
205 out = streamRepository.put(key);
206 mc.getMessage().writeTo(out);
207 } finally {
208 if (out != null) out.close();
209 }
210 }
211
212 objectRepository.put(key, mc);
213 }
214
215
216
217
218 public Mail retrieve(String key) throws MessagingException {
219 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
220 getLogger().debug("Retrieving mail: " + key);
221 }
222 try {
223 Mail mc = null;
224 try {
225 mc = (Mail) objectRepository.get(key);
226 }
227 catch (RuntimeException re){
228 StringBuffer exceptionBuffer = new StringBuffer(128);
229 if(re.getCause() instanceof Error){
230 exceptionBuffer.append("Error when retrieving mail, not deleting: ")
231 .append(re.toString());
232 }else{
233 exceptionBuffer.append("Exception retrieving mail: ")
234 .append(re.toString())
235 .append(", so we're deleting it.");
236 remove(key);
237 }
238 final String errorMessage = exceptionBuffer.toString();
239 getLogger().warn(errorMessage);
240 getLogger().debug(errorMessage, re);
241 return null;
242 }
243 MimeMessageAvalonSource source = new MimeMessageAvalonSource(streamRepository, destination, key);
244 mc.setMessage(new MimeMessageCopyOnWriteProxy(source));
245
246 return mc;
247 } catch (Exception me) {
248 getLogger().error("Exception retrieving mail: " + me);
249 throw new MessagingException("Exception while retrieving mail: " + me.getMessage(), me);
250 }
251 }
252
253
254
255
256
257 protected void internalRemove(String key) throws MessagingException {
258 if (keys != null) keys.remove(key);
259 streamRepository.remove(key);
260 objectRepository.remove(key);
261 }
262
263
264
265
266
267 public Iterator list() {
268
269
270 final ArrayList clone;
271 if (keys != null) synchronized(keys) {
272 clone = new ArrayList(keys);
273 } else {
274 clone = new ArrayList();
275 for (Iterator i = objectRepository.list(); i.hasNext(); ) {
276 clone.add(i.next());
277 }
278 }
279 if (fifo) Collections.sort(clone);
280 return clone.iterator();
281 }
282 }