1 package org.apache.james.mailrepository;
2
3 import org.apache.avalon.cornerstone.services.store.StreamRepository;
4 import org.apache.james.core.MimeMessageUtil;
5 import org.apache.mailet.Mail;
6
7 import javax.mail.MessagingException;
8
9 import java.io.ByteArrayInputStream;
10 import java.io.ByteArrayOutputStream;
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.io.OutputStream;
14 import java.io.PipedInputStream;
15 import java.io.PipedOutputStream;
16
17 /*****************************************************************
18 * Licensed to the Apache Software Foundation (ASF) under one *
19 * or more contributor license agreements. See the NOTICE file *
20 * distributed with this work for additional information *
21 * regarding copyright ownership. The ASF licenses this file *
22 * to you under the Apache License, Version 2.0 (the *
23 * "License"); you may not use this file except in compliance *
24 * with the License. You may obtain a copy of the License at *
25 * *
26 * http://www.apache.org/licenses/LICENSE-2.0 *
27 * *
28 * Unless required by applicable law or agreed to in writing, *
29 * software distributed under the License is distributed on an *
30 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
31 * KIND, either express or implied. See the License for the *
32 * specific language governing permissions and limitations *
33 * under the License. *
34 ****************************************************************/
35
36 /***
37 * This class provides an inputStream for a Mail object.
38 * If the Mail is larger than 4KB it uses Piped streams and a worker threads
39 * Otherwise it simply create a temporary byte buffer and does not create
40 * the worker thread.
41 *
42 * Note: Javamail (or the Activation Framework) already uses a worker threads when
43 * asked for an inputstream.
44 */
45 final class MessageInputStream extends InputStream {
46
47 /***
48 * The size of the current message
49 */
50 private long size = -1;
51 /***
52 * The wrapped stream (Piped or Binary)
53 */
54 private InputStream wrapped;
55 /***
56 * If an excaption happens in the worker threads it's stored here
57 */
58 private Exception caughtException;
59 /***
60 * Stream repository used for dbfiles (null otherwise)
61 */
62 private StreamRepository streamRep;
63
64 /***
65 * Main constructor. If srep is not null than we are using dbfiles and we stream
66 * the body to file and only the header to db.
67 */
68 public MessageInputStream(Mail mc, StreamRepository srep, int sizeLimit) throws IOException, MessagingException {
69 super();
70 caughtException = null;
71 streamRep = srep;
72 size = mc.getMessageSize();
73
74
75 if (streamRep == null && size > sizeLimit) {
76 PipedOutputStream headerOut = new PipedOutputStream();
77 new Thread() {
78 private Mail mail;
79
80 private PipedOutputStream out;
81
82 public void run() {
83 try {
84 writeStream(mail,out);
85 } catch (IOException e) {
86 caughtException = e;
87 } catch (MessagingException e) {
88 caughtException = e;
89 }
90 }
91
92 public Thread setParam(Mail mc, PipedOutputStream headerOut) {
93 this.mail = mc;
94 this.out = headerOut;
95 return this;
96 }
97 }.setParam(mc,(PipedOutputStream) headerOut).start();
98 wrapped = new PipedInputStream(headerOut);
99 } else {
100 ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
101 writeStream(mc,headerOut);
102 wrapped = new ByteArrayInputStream(headerOut.toByteArray());
103 size = headerOut.size();
104 }
105 }
106
107 /***
108 * Returns the size of the full message
109 */
110 public long getSize() {
111 return size;
112 }
113
114 /***
115 * write the full mail to the stream
116 * This can be used by this object or by the worker threads.
117 */
118 private void writeStream(Mail mail, OutputStream out) throws IOException, MessagingException {
119 OutputStream bodyOut = null;
120 try {
121 if (streamRep == null) {
122
123
124 bodyOut = out;
125 } else {
126
127 bodyOut = streamRep.put(mail.getName());
128 }
129
130
131 MimeMessageUtil.writeTo(mail.getMessage(), out, bodyOut);
132 out.flush();
133 bodyOut.flush();
134
135 } finally {
136 closeOutputStreams(out, bodyOut);
137 }
138 }
139
140 private void throwException() throws IOException {
141 try {
142 if (wrapped == null) {
143 throw new IOException("wrapped stream does not exists anymore");
144 } else if (caughtException instanceof IOException) {
145 throw (IOException) caughtException;
146 } else {
147 throw new IOException("Exception caugth in worker thread "+caughtException.getMessage()) {
148 /***
149 * @see java.lang.Throwable#getCause()
150 */
151 public Throwable getCause() {
152 return caughtException;
153 }
154 };
155 }
156 } finally {
157 caughtException = null;
158 wrapped = null;
159 }
160 }
161
162
163 /***
164 * Closes output streams used to update message
165 *
166 * @param headerStream the stream containing header information - potentially the same
167 * as the body stream
168 * @param bodyStream the stream containing body information
169 * @throws IOException
170 */
171 private void closeOutputStreams(OutputStream headerStream, OutputStream bodyStream) throws IOException {
172 try {
173
174
175 if ((headerStream != null) && (headerStream != bodyStream)) {
176 headerStream.close();
177 }
178 } finally {
179 if (bodyStream != null) {
180 bodyStream.close();
181 }
182 }
183 }
184
185
186
187 /***
188 * @see java.io.InputStream#available()
189 */
190 public int available() throws IOException {
191 if (caughtException != null || wrapped == null) {
192 throwException();
193 }
194 return wrapped.available();
195 }
196
197 /***
198 * @see java.io.Closeable#close()
199 */
200 public void close() throws IOException {
201 if (caughtException != null || wrapped == null) {
202 throwException();
203 }
204 wrapped.close();
205 wrapped = null;
206 }
207
208 /***
209 * @see java.io.InputStream#mark(int)
210 */
211 public synchronized void mark(int arg0) {
212 wrapped.mark(arg0);
213 }
214
215 /***
216 * @see java.io.InputStream#markSupported()
217 */
218 public boolean markSupported() {
219 return wrapped.markSupported();
220 }
221
222 /***
223 * @see java.io.InputStream#read(byte[], int, int)
224 */
225 public int read(byte[] arg0, int arg1, int arg2) throws IOException {
226 if (caughtException != null || wrapped == null) {
227 throwException();
228 }
229 return wrapped.read(arg0, arg1, arg2);
230 }
231
232 /***
233 * @see java.io.InputStream#read(byte[])
234 */
235 public int read(byte[] arg0) throws IOException {
236 if (caughtException != null || wrapped == null) {
237 throwException();
238 }
239 return wrapped.read(arg0);
240 }
241
242 /***
243 * @see java.io.InputStream#reset()
244 */
245 public synchronized void reset() throws IOException {
246 if (caughtException != null || wrapped == null) {
247 throwException();
248 }
249 wrapped.reset();
250 }
251
252 /***
253 * @see java.io.InputStream#skip(long)
254 */
255 public long skip(long arg0) throws IOException {
256 if (caughtException != null || wrapped == null) {
257 throwException();
258 }
259 return wrapped.skip(arg0);
260 }
261
262 /***
263 * @see java.io.InputStream#read()
264 */
265 public int read() throws IOException {
266 if (caughtException != null || wrapped == null) {
267 throwException();
268 }
269 return wrapped.read();
270 }
271
272 }