View Javadoc

1   package org.apache.james.mailrepository;
2   
3   import org.apache.avalon.cornerstone.services.store.StreamRepository;
4   import org.apache.james.core.MimeMessageUtil;
5   import org.apache.mailet.Mail;
6   
7   import javax.mail.MessagingException;
8   
9   import java.io.ByteArrayInputStream;
10  import java.io.ByteArrayOutputStream;
11  import java.io.IOException;
12  import java.io.InputStream;
13  import java.io.OutputStream;
14  import java.io.PipedInputStream;
15  import java.io.PipedOutputStream;
16  
17  /*****************************************************************
18   * Licensed to the Apache Software Foundation (ASF) under one   *
19   * or more contributor license agreements.  See the NOTICE file *
20   * distributed with this work for additional information        *
21   * regarding copyright ownership.  The ASF licenses this file   *
22   * to you under the Apache License, Version 2.0 (the            *
23   * "License"); you may not use this file except in compliance   *
24   * with the License.  You may obtain a copy of the License at   *
25   *                                                              *
26   *   http://www.apache.org/licenses/LICENSE-2.0                 *
27   *                                                              *
28   * Unless required by applicable law or agreed to in writing,   *
29   * software distributed under the License is distributed on an  *
30   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
31   * KIND, either express or implied.  See the License for the    *
32   * specific language governing permissions and limitations      *
33   * under the License.                                           *
34   ****************************************************************/
35  
36  /***
37   * This class provides an inputStream for a Mail object.
38   * If the Mail is larger than 4KB it uses Piped streams and a worker threads
39   * Otherwise it simply create a temporary byte buffer and does not create
40   * the worker thread.
41   * 
42   * Note: Javamail (or the Activation Framework) already uses a worker threads when
43   * asked for an inputstream.
44   */
45  final class MessageInputStream extends InputStream {
46      
47      /***
48       * The size of the current message
49       */
50      private long size = -1;
51      /***
52       * The wrapped stream (Piped or Binary)
53       */
54      private InputStream wrapped;
55      /***
56       * If an excaption happens in the worker threads it's stored here
57       */
58      private Exception caughtException;
59      /***
60       * Stream repository used for dbfiles (null otherwise)
61       */
62      private StreamRepository streamRep;
63      
64      /***
65       * Main constructor. If srep is not null than we are using dbfiles and we stream
66       * the body to file and only the header to db.
67       */
68      public MessageInputStream(Mail mc, StreamRepository srep, int sizeLimit) throws IOException, MessagingException {
69          super();
70          caughtException = null;
71          streamRep = srep;
72          size = mc.getMessageSize();
73          // we use the pipes only when streamRep is null and the message size is greater than 4096
74          // Otherwise we should calculate the header size and not the message size when streamRep is not null (JAMES-475)
75          if (streamRep == null && size > sizeLimit) {
76              PipedOutputStream headerOut = new PipedOutputStream();
77              new Thread() {
78                  private Mail mail;
79  
80                  private PipedOutputStream out;
81  
82                  public void run() {
83                      try {
84                          writeStream(mail,out);
85                      } catch (IOException e) {
86                          caughtException = e;
87                      } catch (MessagingException e) {
88                          caughtException = e;
89                      }
90                  }
91  
92                  public Thread setParam(Mail mc, PipedOutputStream headerOut) {
93                      this.mail = mc;
94                      this.out = headerOut;
95                      return this;
96                  }
97              }.setParam(mc,(PipedOutputStream) headerOut).start();
98              wrapped = new PipedInputStream(headerOut);
99          } else {
100             ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
101             writeStream(mc,headerOut);
102             wrapped = new ByteArrayInputStream(headerOut.toByteArray());
103             size = headerOut.size();
104         }
105     }
106     
107     /***
108      * Returns the size of the full message
109      */
110     public long getSize() {
111         return size;
112     }
113 
114     /***
115      * write the full mail to the stream
116      * This can be used by this object or by the worker threads.
117      */
118     private void writeStream(Mail mail, OutputStream out) throws IOException, MessagingException {
119         OutputStream bodyOut = null;
120         try {
121             if (streamRep == null) {
122                 //If there is no filestore, use the byte array to store headers
123                 //  and the body
124                 bodyOut = out;
125             } else {
126                 //Store the body in the stream repository
127                 bodyOut = streamRep.put(mail.getName());
128             }
129         
130             //Write the message to the headerOut and bodyOut.  bodyOut goes straight to the file
131             MimeMessageUtil.writeTo(mail.getMessage(), out, bodyOut);
132             out.flush();
133             bodyOut.flush();
134         
135         } finally {
136             closeOutputStreams(out, bodyOut);
137         }
138     }
139 
140     private void throwException() throws IOException {
141         try {
142             if (wrapped == null) {
143                 throw new IOException("wrapped stream does not exists anymore");
144             } else if (caughtException instanceof IOException) {
145                 throw (IOException) caughtException;
146             } else {
147                 throw new IOException("Exception caugth in worker thread "+caughtException.getMessage()) {
148                     /***
149                      * @see java.lang.Throwable#getCause()
150                      */
151                     public Throwable getCause() {
152                         return caughtException;
153                     }
154                 };
155             }
156         } finally {
157             caughtException = null;
158             wrapped = null;
159         }
160     }
161 
162 
163     /***
164      * Closes output streams used to update message
165      * 
166      * @param headerStream the stream containing header information - potentially the same
167      *               as the body stream
168      * @param bodyStream the stream containing body information
169      * @throws IOException 
170      */
171     private void closeOutputStreams(OutputStream headerStream, OutputStream bodyStream) throws IOException {
172         try {
173             // If the header stream is not the same as the body stream,
174             // close the header stream here.
175             if ((headerStream != null) && (headerStream != bodyStream)) {
176                 headerStream.close();
177             }
178         } finally {
179             if (bodyStream != null) {
180                 bodyStream.close();
181             }
182         }
183     }
184 
185     // wrapper methods
186 
187     /***
188      * @see java.io.InputStream#available()
189      */
190     public int available() throws IOException {
191         if (caughtException != null || wrapped == null) {
192             throwException();
193         }
194         return wrapped.available();
195     }
196 
197     /***
198      * @see java.io.Closeable#close()
199      */
200     public void close() throws IOException {
201         if (caughtException != null || wrapped == null) {
202             throwException();
203         }
204         wrapped.close();
205         wrapped = null;
206     }
207 
208     /***
209      * @see java.io.InputStream#mark(int)
210      */
211     public synchronized void mark(int arg0) {
212         wrapped.mark(arg0);
213     }
214 
215     /***
216      * @see java.io.InputStream#markSupported()
217      */
218     public boolean markSupported() {
219         return wrapped.markSupported();
220     }
221 
222     /***
223      * @see java.io.InputStream#read(byte[], int, int)
224      */
225     public int read(byte[] arg0, int arg1, int arg2) throws IOException {
226         if (caughtException != null || wrapped == null) {
227             throwException();
228         }
229         return wrapped.read(arg0, arg1, arg2);
230     }
231 
232     /***
233      * @see java.io.InputStream#read(byte[])
234      */
235     public int read(byte[] arg0) throws IOException {
236         if (caughtException != null || wrapped == null) {
237             throwException();
238         }
239         return wrapped.read(arg0);
240     }
241 
242     /***
243      * @see java.io.InputStream#reset()
244      */
245     public synchronized void reset() throws IOException {
246         if (caughtException != null || wrapped == null) {
247             throwException();
248         }
249         wrapped.reset();
250     }
251 
252     /***
253      * @see java.io.InputStream#skip(long)
254      */
255     public long skip(long arg0) throws IOException {
256         if (caughtException != null || wrapped == null) {
257             throwException();
258         }
259         return wrapped.skip(arg0);
260     }
261 
262     /***
263      * @see java.io.InputStream#read()
264      */
265     public int read() throws IOException {
266         if (caughtException != null || wrapped == null) {
267             throwException();
268         }
269         return wrapped.read();
270     }
271 
272 }