1 /****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.mailrepository;
21
22 import org.apache.avalon.cornerstone.services.store.StreamRepository;
23 import org.apache.james.core.MimeMessageUtil;
24 import org.apache.mailet.Mail;
25
26 import javax.mail.MessagingException;
27
28 import java.io.ByteArrayInputStream;
29 import java.io.ByteArrayOutputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.OutputStream;
33 import java.io.PipedInputStream;
34 import java.io.PipedOutputStream;
35
36 /**
37 * This class provides an inputStream for a Mail object.
38 * If the Mail is larger than 4KB it uses Piped streams and a worker threads
39 * Otherwise it simply create a temporary byte buffer and does not create
40 * the worker thread.
41 *
42 * Note: Javamail (or the Activation Framework) already uses a worker threads when
43 * asked for an inputstream.
44 */
45 final class MessageInputStream extends InputStream {
46
47 /**
48 * The size of the current message
49 */
50 private long size = -1;
51 /**
52 * The wrapped stream (Piped or Binary)
53 */
54 private InputStream wrapped;
55 /**
56 * If an excaption happens in the worker threads it's stored here
57 */
58 private Exception caughtException;
59 /**
60 * Stream repository used for dbfiles (null otherwise)
61 */
62 private StreamRepository streamRep;
63
64
65 /**
66 * Main constructor. If srep is not null than we are using dbfiles and we stream
67 * the body to file and only the header to db.
68 *
69 * @param mc the Mail
70 * @param srep the StreamRepository the StreamRepository used for dbfiles.
71 * @param sizeLimit the sizeLimit which set the limit after which the streaming will be disabled
72 * @throws IOException get thrown if an IO error detected
73 * @throws MessagingException get thrown if an error detected while reading informations of the mail
74 */
75 public MessageInputStream(Mail mc, StreamRepository srep, int sizeLimit) throws IOException, MessagingException {
76 super();
77 caughtException = null;
78 streamRep = srep;
79 size = mc.getMessageSize();
80 // we use the pipes only when streamRep is null and the message size is greater than 4096
81 // Otherwise we should calculate the header size and not the message size when streamRep is not null (JAMES-475)
82 if (streamRep == null && size > sizeLimit) {
83 PipedOutputStream headerOut = new PipedOutputStream();
84 new Thread() {
85 private Mail mail;
86
87 private PipedOutputStream out;
88
89 public void run() {
90 try {
91 writeStream(mail,out);
92 } catch (IOException e) {
93 caughtException = e;
94 } catch (MessagingException e) {
95 caughtException = e;
96 }
97 }
98
99 public Thread setParam(Mail mc, PipedOutputStream headerOut) {
100 this.mail = mc;
101 this.out = headerOut;
102 return this;
103 }
104 }.setParam(mc,(PipedOutputStream) headerOut).start();
105 wrapped = new PipedInputStream(headerOut);
106 } else {
107 ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
108 writeStream(mc,headerOut);
109 wrapped = new ByteArrayInputStream(headerOut.toByteArray());
110 size = headerOut.size();
111 }
112 }
113
114
115 /**
116 * Returns the size of the full message
117 *
118 * @return size the full message size
119 */
120 public long getSize() {
121 return size;
122 }
123
124
125 /**
126 * Write the full mail to the stream
127 * This can be used by this object or by the worker threads.
128 *
129 * @param mail the Mail used as source
130 * @param out the OutputStream writting the mail to
131 * @throws IOException get thrown if an IO error detected
132 * @throws MessagingException get thrown if an error detected while reading informations of the mail
133 */
134 private void writeStream(Mail mail, OutputStream out) throws IOException, MessagingException {
135 OutputStream bodyOut = null;
136 try {
137 if (streamRep == null) {
138 //If there is no filestore, use the byte array to store headers
139 // and the body
140 bodyOut = out;
141 } else {
142 //Store the body in the stream repository
143 bodyOut = streamRep.put(mail.getName());
144 }
145
146 //Write the message to the headerOut and bodyOut. bodyOut goes straight to the file
147 MimeMessageUtil.writeTo(mail.getMessage(), out, bodyOut);
148 out.flush();
149 bodyOut.flush();
150
151 } finally {
152 closeOutputStreams(out, bodyOut);
153 }
154 }
155
156 private void throwException() throws IOException {
157 try {
158 if (wrapped == null) {
159 throw new IOException("wrapped stream does not exists anymore");
160 } else if (caughtException instanceof IOException) {
161 throw (IOException) caughtException;
162 } else {
163 throw new IOException("Exception caugth in worker thread "+caughtException.getMessage()) {
164 /**
165 * @see java.lang.Throwable#getCause()
166 */
167 public Throwable getCause() {
168 return caughtException;
169 }
170 };
171 }
172 } finally {
173 caughtException = null;
174 wrapped = null;
175 }
176 }
177
178
179 /**
180 * Closes output streams used to update message
181 *
182 * @param headerStream the stream containing header information - potentially the same
183 * as the body stream
184 * @param bodyStream the stream containing body information
185 * @throws IOException
186 */
187 private void closeOutputStreams(OutputStream headerStream, OutputStream bodyStream) throws IOException {
188 try {
189 // If the header stream is not the same as the body stream,
190 // close the header stream here.
191 if ((headerStream != null) && (headerStream != bodyStream)) {
192 headerStream.close();
193 }
194 } finally {
195 if (bodyStream != null) {
196 bodyStream.close();
197 }
198 }
199 }
200
201 // wrapper methods
202
203 /**
204 * @see java.io.InputStream#available()
205 */
206 public int available() throws IOException {
207 if (caughtException != null || wrapped == null) {
208 throwException();
209 }
210 return wrapped.available();
211 }
212
213 /**
214 * @see java.io.InputStream#close()
215 */
216 public void close() throws IOException {
217 if (caughtException != null || wrapped == null) {
218 throwException();
219 }
220 wrapped.close();
221 wrapped = null;
222 }
223
224 /**
225 * @see java.io.InputStream#mark(int)
226 */
227 public synchronized void mark(int readLimit) {
228 wrapped.mark(readLimit);
229 }
230
231 /**
232 * @see java.io.InputStream#markSupported()
233 */
234 public boolean markSupported() {
235 return wrapped.markSupported();
236 }
237
238 /**
239 * @see java.io.InputStream#read(byte[], int, int)
240 */
241 public int read(byte[] b, int off, int len) throws IOException {
242 if (caughtException != null || wrapped == null) {
243 throwException();
244 }
245 return wrapped.read(b, off, len);
246 }
247
248 /**
249 * @see java.io.InputStream#read(byte[])
250 */
251 public int read(byte[] b) throws IOException {
252 if (caughtException != null || wrapped == null) {
253 throwException();
254 }
255 return wrapped.read(b);
256 }
257
258 /**
259 * @see java.io.InputStream#reset()
260 */
261 public synchronized void reset() throws IOException {
262 if (caughtException != null || wrapped == null) {
263 throwException();
264 }
265 wrapped.reset();
266 }
267
268 /**
269 * @see java.io.InputStream#skip(long)
270 */
271 public long skip(long n) throws IOException {
272 if (caughtException != null || wrapped == null) {
273 throwException();
274 }
275 return wrapped.skip(n);
276 }
277
278 /**
279 * @see java.io.InputStream#read()
280 */
281 public int read() throws IOException {
282 if (caughtException != null || wrapped == null) {
283 throwException();
284 }
285 return wrapped.read();
286 }
287
288 }