View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.mailrepository;
21  
22  import org.apache.avalon.cornerstone.services.store.StreamRepository;
23  import org.apache.james.core.MimeMessageUtil;
24  import org.apache.mailet.Mail;
25  
26  import javax.mail.MessagingException;
27  
28  import java.io.ByteArrayInputStream;
29  import java.io.ByteArrayOutputStream;
30  import java.io.IOException;
31  import java.io.InputStream;
32  import java.io.OutputStream;
33  import java.io.PipedInputStream;
34  import java.io.PipedOutputStream;
35  
36  /**
37   * This class provides an inputStream for a Mail object.
38   * If the Mail is larger than 4KB it uses Piped streams and a worker threads
39   * Otherwise it simply create a temporary byte buffer and does not create
40   * the worker thread.
41   * 
42   * Note: Javamail (or the Activation Framework) already uses a worker threads when
43   * asked for an inputstream.
44   */
45  final class MessageInputStream extends InputStream {
46      
47      /**
48       * The size of the current message
49       */
50      private long size = -1;
51      /**
52       * The wrapped stream (Piped or Binary)
53       */
54      private InputStream wrapped;
55      /**
56       * If an excaption happens in the worker threads it's stored here
57       */
58      private Exception caughtException;
59      /**
60       * Stream repository used for dbfiles (null otherwise)
61       */
62      private StreamRepository streamRep;
63      
64  
65      /**
66       * Main constructor. If srep is not null than we are using dbfiles and we stream
67       * the body to file and only the header to db.
68       * 
69       * @param mc the Mail 
70       * @param srep the StreamRepository the StreamRepository used for dbfiles.
71       * @param sizeLimit the sizeLimit which set the limit after which the streaming will be disabled
72       * @throws IOException get thrown if an IO error detected
73       * @throws MessagingException get thrown if an error detected while reading informations of the mail 
74       */
75      public MessageInputStream(Mail mc, StreamRepository srep, int sizeLimit) throws IOException, MessagingException {
76          super();
77          caughtException = null;
78          streamRep = srep;
79          size = mc.getMessageSize();
80          // we use the pipes only when streamRep is null and the message size is greater than 4096
81          // Otherwise we should calculate the header size and not the message size when streamRep is not null (JAMES-475)
82          if (streamRep == null && size > sizeLimit) {
83              PipedOutputStream headerOut = new PipedOutputStream();
84              new Thread() {
85                  private Mail mail;
86  
87                  private PipedOutputStream out;
88  
89                  public void run() {
90                      try {
91                          writeStream(mail,out);
92                      } catch (IOException e) {
93                          caughtException = e;
94                      } catch (MessagingException e) {
95                          caughtException = e;
96                      }
97                  }
98  
99                  public Thread setParam(Mail mc, PipedOutputStream headerOut) {
100                     this.mail = mc;
101                     this.out = headerOut;
102                     return this;
103                 }
104             }.setParam(mc,(PipedOutputStream) headerOut).start();
105             wrapped = new PipedInputStream(headerOut);
106         } else {
107             ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
108             writeStream(mc,headerOut);
109             wrapped = new ByteArrayInputStream(headerOut.toByteArray());
110             size = headerOut.size();
111         }
112     }
113     
114 
115     /**
116      * Returns the size of the full message
117      * 
118      * @return size the full message size
119      */
120     public long getSize() {
121         return size;
122     }
123 
124 
125     /**
126      * Write the full mail to the stream
127      * This can be used by this object or by the worker threads.
128      * 
129      * @param mail the Mail used as source
130      * @param out the OutputStream writting the mail to
131      * @throws IOException get thrown if an IO error detected
132      * @throws MessagingException get thrown if an error detected while reading informations of the mail 
133      */
134     private void writeStream(Mail mail, OutputStream out) throws IOException, MessagingException {
135         OutputStream bodyOut = null;
136         try {
137             if (streamRep == null) {
138                 //If there is no filestore, use the byte array to store headers
139                 //  and the body
140                 bodyOut = out;
141             } else {
142                 //Store the body in the stream repository
143                 bodyOut = streamRep.put(mail.getName());
144             }
145         
146             //Write the message to the headerOut and bodyOut.  bodyOut goes straight to the file
147             MimeMessageUtil.writeTo(mail.getMessage(), out, bodyOut);
148             out.flush();
149             bodyOut.flush();
150         
151         } finally {
152             closeOutputStreams(out, bodyOut);
153         }
154     }
155 
156     private void throwException() throws IOException {
157         try {
158             if (wrapped == null) {
159                 throw new IOException("wrapped stream does not exists anymore");
160             } else if (caughtException instanceof IOException) {
161                 throw (IOException) caughtException;
162             } else {
163                 throw new IOException("Exception caugth in worker thread "+caughtException.getMessage()) {
164                     /**
165                      * @see java.lang.Throwable#getCause()
166                      */
167                     public Throwable getCause() {
168                         return caughtException;
169                     }
170                 };
171             }
172         } finally {
173             caughtException = null;
174             wrapped = null;
175         }
176     }
177 
178 
179     /**
180      * Closes output streams used to update message
181      * 
182      * @param headerStream the stream containing header information - potentially the same
183      *               as the body stream
184      * @param bodyStream the stream containing body information
185      * @throws IOException 
186      */
187     private void closeOutputStreams(OutputStream headerStream, OutputStream bodyStream) throws IOException {
188         try {
189             // If the header stream is not the same as the body stream,
190             // close the header stream here.
191             if ((headerStream != null) && (headerStream != bodyStream)) {
192                 headerStream.close();
193             }
194         } finally {
195             if (bodyStream != null) {
196                 bodyStream.close();
197             }
198         }
199     }
200 
201     // wrapper methods
202 
203     /**
204      * @see java.io.InputStream#available()
205      */
206     public int available() throws IOException {
207         if (caughtException != null || wrapped == null) {
208             throwException();
209         }
210         return wrapped.available();
211     }
212 
213     /**
214      * @see java.io.InputStream#close()
215      */
216     public void close() throws IOException {
217         if (caughtException != null || wrapped == null) {
218             throwException();
219         }
220         wrapped.close();
221         wrapped = null;
222     }
223 
224     /**
225      * @see java.io.InputStream#mark(int)
226      */
227     public synchronized void mark(int readLimit) {
228         wrapped.mark(readLimit);
229     }
230 
231     /**
232      * @see java.io.InputStream#markSupported()
233      */
234     public boolean markSupported() {
235         return wrapped.markSupported();
236     }
237 
238     /**
239      * @see java.io.InputStream#read(byte[], int, int)
240      */
241     public int read(byte[] b, int off, int len) throws IOException {
242         if (caughtException != null || wrapped == null) {
243             throwException();
244         }
245         return wrapped.read(b, off, len);
246     }
247 
248     /**
249      * @see java.io.InputStream#read(byte[])
250      */
251     public int read(byte[] b) throws IOException {
252         if (caughtException != null || wrapped == null) {
253             throwException();
254         }
255         return wrapped.read(b);
256     }
257 
258     /**
259      * @see java.io.InputStream#reset()
260      */
261     public synchronized void reset() throws IOException {
262         if (caughtException != null || wrapped == null) {
263             throwException();
264         }
265         wrapped.reset();
266     }
267 
268     /**
269      * @see java.io.InputStream#skip(long)
270      */
271     public long skip(long n) throws IOException {
272         if (caughtException != null || wrapped == null) {
273             throwException();
274         }
275         return wrapped.skip(n);
276     }
277 
278     /**
279      * @see java.io.InputStream#read()
280      */
281     public int read() throws IOException {
282         if (caughtException != null || wrapped == null) {
283             throwException();
284         }
285         return wrapped.read();
286     }
287 
288 }