1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.james.mailrepository;
21
22 import org.apache.avalon.cornerstone.services.store.StreamRepository;
23 import org.apache.james.core.MimeMessageUtil;
24 import org.apache.mailet.Mail;
25
26 import javax.mail.MessagingException;
27
28 import java.io.ByteArrayInputStream;
29 import java.io.ByteArrayOutputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.OutputStream;
33 import java.io.PipedInputStream;
34 import java.io.PipedOutputStream;
35
36
37
38
39
40
41
42
43
44
45 final class MessageInputStream extends InputStream {
46
47
48
49
50 private long size = -1;
51
52
53
54 private InputStream wrapped;
55
56
57
58 private Exception caughtException;
59
60
61
62 private StreamRepository streamRep;
63
64
65
66
67
68
69
70
71
72
73
74
75 public MessageInputStream(Mail mc, StreamRepository srep, int sizeLimit) throws IOException, MessagingException {
76 super();
77 caughtException = null;
78 streamRep = srep;
79 size = mc.getMessageSize();
80
81
82 if (streamRep == null && size > sizeLimit) {
83 PipedOutputStream headerOut = new PipedOutputStream();
84 new Thread() {
85 private Mail mail;
86
87 private PipedOutputStream out;
88
89 public void run() {
90 try {
91 writeStream(mail,out);
92 } catch (IOException e) {
93 caughtException = e;
94 } catch (MessagingException e) {
95 caughtException = e;
96 }
97 }
98
99 public Thread setParam(Mail mc, PipedOutputStream headerOut) {
100 this.mail = mc;
101 this.out = headerOut;
102 return this;
103 }
104 }.setParam(mc,(PipedOutputStream) headerOut).start();
105 wrapped = new PipedInputStream(headerOut);
106 } else {
107 ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
108 writeStream(mc,headerOut);
109 wrapped = new ByteArrayInputStream(headerOut.toByteArray());
110 size = headerOut.size();
111 }
112 }
113
114
115
116
117
118
119
120 public long getSize() {
121 return size;
122 }
123
124
125
126
127
128
129
130
131
132
133
134 private void writeStream(Mail mail, OutputStream out) throws IOException, MessagingException {
135 OutputStream bodyOut = null;
136 try {
137 if (streamRep == null) {
138
139
140 bodyOut = out;
141 } else {
142
143 bodyOut = streamRep.put(mail.getName());
144 }
145
146
147 MimeMessageUtil.writeTo(mail.getMessage(), out, bodyOut);
148 out.flush();
149 bodyOut.flush();
150
151 } finally {
152 closeOutputStreams(out, bodyOut);
153 }
154 }
155
156 private void throwException() throws IOException {
157 try {
158 if (wrapped == null) {
159 throw new IOException("wrapped stream does not exists anymore");
160 } else if (caughtException instanceof IOException) {
161 throw (IOException) caughtException;
162 } else {
163 throw new IOException("Exception caugth in worker thread "+caughtException.getMessage()) {
164
165
166
167 public Throwable getCause() {
168 return caughtException;
169 }
170 };
171 }
172 } finally {
173 caughtException = null;
174 wrapped = null;
175 }
176 }
177
178
179
180
181
182
183
184
185
186
187 private void closeOutputStreams(OutputStream headerStream, OutputStream bodyStream) throws IOException {
188 try {
189
190
191 if ((headerStream != null) && (headerStream != bodyStream)) {
192 headerStream.close();
193 }
194 } finally {
195 if (bodyStream != null) {
196 bodyStream.close();
197 }
198 }
199 }
200
201
202
203
204
205
206 public int available() throws IOException {
207 if (caughtException != null || wrapped == null) {
208 throwException();
209 }
210 return wrapped.available();
211 }
212
213
214
215
216 public void close() throws IOException {
217 if (caughtException != null || wrapped == null) {
218 throwException();
219 }
220 wrapped.close();
221 wrapped = null;
222 }
223
224
225
226
227 public synchronized void mark(int readLimit) {
228 wrapped.mark(readLimit);
229 }
230
231
232
233
234 public boolean markSupported() {
235 return wrapped.markSupported();
236 }
237
238
239
240
241 public int read(byte[] b, int off, int len) throws IOException {
242 if (caughtException != null || wrapped == null) {
243 throwException();
244 }
245 return wrapped.read(b, off, len);
246 }
247
248
249
250
251 public int read(byte[] b) throws IOException {
252 if (caughtException != null || wrapped == null) {
253 throwException();
254 }
255 return wrapped.read(b);
256 }
257
258
259
260
261 public synchronized void reset() throws IOException {
262 if (caughtException != null || wrapped == null) {
263 throwException();
264 }
265 wrapped.reset();
266 }
267
268
269
270
271 public long skip(long n) throws IOException {
272 if (caughtException != null || wrapped == null) {
273 throwException();
274 }
275 return wrapped.skip(n);
276 }
277
278
279
280
281 public int read() throws IOException {
282 if (caughtException != null || wrapped == null) {
283 throwException();
284 }
285 return wrapped.read();
286 }
287
288 }