1 package org.apache.james.mailrepository;
2
3 import org.apache.avalon.cornerstone.services.store.StreamRepository;
4 import org.apache.james.core.MimeMessageUtil;
5 import org.apache.mailet.Mail;
6
7 import javax.mail.MessagingException;
8
9 import java.io.ByteArrayInputStream;
10 import java.io.ByteArrayOutputStream;
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.io.OutputStream;
14 import java.io.PipedInputStream;
15 import java.io.PipedOutputStream;
16
17 /***
18 * This class provides an inputStream for a Mail object.
19 * If the Mail is larger than 4KB it uses Piped streams and a worker threads
20 * Otherwise it simply create a temporary byte buffer and does not create
21 * the worker thread.
22 *
23 * Note: Javamail (or the Activation Framework) already uses a worker threads when
24 * asked for an inputstream.
25 */
26 final class MessageInputStream extends InputStream {
27
28 /***
29 * The size of the current message
30 */
31 private long size = -1;
32 /***
33 * The wrapped stream (Piped or Binary)
34 */
35 private InputStream wrapped;
36 /***
37 * If an excaption happens in the worker threads it's stored here
38 */
39 private Exception caughtException;
40 /***
41 * Stream repository used for dbfiles (null otherwise)
42 */
43 private StreamRepository streamRep;
44
45 /***
46 * Main constructor. If srep is not null than we are using dbfiles and we stream
47 * the body to file and only the header to db.
48 */
49 public MessageInputStream(Mail mc, StreamRepository srep, int sizeLimit) throws IOException, MessagingException {
50 super();
51 caughtException = null;
52 streamRep = srep;
53 size = mc.getMessageSize();
54
55
56 if (streamRep == null && size > sizeLimit) {
57 PipedOutputStream headerOut = new PipedOutputStream();
58 new Thread() {
59 private Mail mail;
60
61 private PipedOutputStream out;
62
63 public void run() {
64 try {
65 writeStream(mail,out);
66 } catch (IOException e) {
67 caughtException = e;
68 } catch (MessagingException e) {
69 caughtException = e;
70 }
71 }
72
73 public Thread setParam(Mail mc, PipedOutputStream headerOut) {
74 this.mail = mc;
75 this.out = headerOut;
76 return this;
77 }
78 }.setParam(mc,(PipedOutputStream) headerOut).start();
79 wrapped = new PipedInputStream(headerOut);
80 } else {
81 ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
82 writeStream(mc,headerOut);
83 wrapped = new ByteArrayInputStream(headerOut.toByteArray());
84 size = headerOut.size();
85 }
86 }
87
88 /***
89 * Returns the size of the full message
90 */
91 public long getSize() {
92 return size;
93 }
94
95 /***
96 * write the full mail to the stream
97 * This can be used by this object or by the worker threads.
98 */
99 private void writeStream(Mail mail, OutputStream out) throws IOException, MessagingException {
100 OutputStream bodyOut = null;
101 try {
102 if (streamRep == null) {
103
104
105 bodyOut = out;
106 } else {
107
108 bodyOut = streamRep.put(mail.getName());
109 }
110
111
112 MimeMessageUtil.writeTo(mail.getMessage(), out, bodyOut);
113 out.flush();
114 bodyOut.flush();
115
116 } finally {
117 closeOutputStreams(out, bodyOut);
118 }
119 }
120
121 private void throwException() throws IOException {
122 try {
123 if (wrapped == null) {
124 throw new IOException("wrapped stream does not exists anymore");
125 } else if (caughtException instanceof IOException) {
126 throw (IOException) caughtException;
127 } else {
128 throw new IOException("Exception caugth in worker thread "+caughtException.getMessage()) {
129 /***
130 * @see java.lang.Throwable#getCause()
131 */
132 public Throwable getCause() {
133 return caughtException;
134 }
135 };
136 }
137 } finally {
138 caughtException = null;
139 wrapped = null;
140 }
141 }
142
143
144 /***
145 * Closes output streams used to update message
146 *
147 * @param headerStream the stream containing header information - potentially the same
148 * as the body stream
149 * @param bodyStream the stream containing body information
150 * @throws IOException
151 */
152 private void closeOutputStreams(OutputStream headerStream, OutputStream bodyStream) throws IOException {
153 try {
154
155
156 if ((headerStream != null) && (headerStream != bodyStream)) {
157 headerStream.close();
158 }
159 } finally {
160 if (bodyStream != null) {
161 bodyStream.close();
162 }
163 }
164 }
165
166
167
168 /***
169 * @see java.io.InputStream#available()
170 */
171 public int available() throws IOException {
172 if (caughtException != null || wrapped == null) {
173 throwException();
174 }
175 return wrapped.available();
176 }
177
178 /***
179 * @see java.io.Closeable#close()
180 */
181 public void close() throws IOException {
182 if (caughtException != null || wrapped == null) {
183 throwException();
184 }
185 wrapped.close();
186 wrapped = null;
187 }
188
189 /***
190 * @see java.io.InputStream#mark(int)
191 */
192 public synchronized void mark(int arg0) {
193 wrapped.mark(arg0);
194 }
195
196 /***
197 * @see java.io.InputStream#markSupported()
198 */
199 public boolean markSupported() {
200 return wrapped.markSupported();
201 }
202
203 /***
204 * @see java.io.InputStream#read(byte[], int, int)
205 */
206 public int read(byte[] arg0, int arg1, int arg2) throws IOException {
207 if (caughtException != null || wrapped == null) {
208 throwException();
209 }
210 return wrapped.read(arg0, arg1, arg2);
211 }
212
213 /***
214 * @see java.io.InputStream#read(byte[])
215 */
216 public int read(byte[] arg0) throws IOException {
217 if (caughtException != null || wrapped == null) {
218 throwException();
219 }
220 return wrapped.read(arg0);
221 }
222
223 /***
224 * @see java.io.InputStream#reset()
225 */
226 public synchronized void reset() throws IOException {
227 if (caughtException != null || wrapped == null) {
228 throwException();
229 }
230 wrapped.reset();
231 }
232
233 /***
234 * @see java.io.InputStream#skip(long)
235 */
236 public long skip(long arg0) throws IOException {
237 if (caughtException != null || wrapped == null) {
238 throwException();
239 }
240 return wrapped.skip(arg0);
241 }
242
243 /***
244 * @see java.io.InputStream#read()
245 */
246 public int read() throws IOException {
247 if (caughtException != null || wrapped == null) {
248 throwException();
249 }
250 return wrapped.read();
251 }
252
253 }