View Javadoc

1   package org.apache.james.mailrepository;
2   
3   import org.apache.avalon.cornerstone.services.store.StreamRepository;
4   import org.apache.james.core.MimeMessageUtil;
5   import org.apache.mailet.Mail;
6   
7   import javax.mail.MessagingException;
8   
9   import java.io.ByteArrayInputStream;
10  import java.io.ByteArrayOutputStream;
11  import java.io.IOException;
12  import java.io.InputStream;
13  import java.io.OutputStream;
14  import java.io.PipedInputStream;
15  import java.io.PipedOutputStream;
16  
17  /***
18   * This class provides an inputStream for a Mail object.
19   * If the Mail is larger than 4KB it uses Piped streams and a worker threads
20   * Otherwise it simply create a temporary byte buffer and does not create
21   * the worker thread.
22   * 
23   * Note: Javamail (or the Activation Framework) already uses a worker threads when
24   * asked for an inputstream.
25   */
26  final class MessageInputStream extends InputStream {
27      
28      /***
29       * The size of the current message
30       */
31      private long size = -1;
32      /***
33       * The wrapped stream (Piped or Binary)
34       */
35      private InputStream wrapped;
36      /***
37       * If an excaption happens in the worker threads it's stored here
38       */
39      private Exception caughtException;
40      /***
41       * Stream repository used for dbfiles (null otherwise)
42       */
43      private StreamRepository streamRep;
44      
45      /***
46       * Main constructor. If srep is not null than we are using dbfiles and we stream
47       * the body to file and only the header to db.
48       */
49      public MessageInputStream(Mail mc, StreamRepository srep, int sizeLimit) throws IOException, MessagingException {
50          super();
51          caughtException = null;
52          streamRep = srep;
53          size = mc.getMessageSize();
54          // we use the pipes only when streamRep is null and the message size is greater than 4096
55          // Otherwise we should calculate the header size and not the message size when streamRep is not null (JAMES-475)
56          if (streamRep == null && size > sizeLimit) {
57              PipedOutputStream headerOut = new PipedOutputStream();
58              new Thread() {
59                  private Mail mail;
60  
61                  private PipedOutputStream out;
62  
63                  public void run() {
64                      try {
65                          writeStream(mail,out);
66                      } catch (IOException e) {
67                          caughtException = e;
68                      } catch (MessagingException e) {
69                          caughtException = e;
70                      }
71                  }
72  
73                  public Thread setParam(Mail mc, PipedOutputStream headerOut) {
74                      this.mail = mc;
75                      this.out = headerOut;
76                      return this;
77                  }
78              }.setParam(mc,(PipedOutputStream) headerOut).start();
79              wrapped = new PipedInputStream(headerOut);
80          } else {
81              ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
82              writeStream(mc,headerOut);
83              wrapped = new ByteArrayInputStream(headerOut.toByteArray());
84              size = headerOut.size();
85          }
86      }
87      
88      /***
89       * Returns the size of the full message
90       */
91      public long getSize() {
92          return size;
93      }
94  
95      /***
96       * write the full mail to the stream
97       * This can be used by this object or by the worker threads.
98       */
99      private void writeStream(Mail mail, OutputStream out) throws IOException, MessagingException {
100         OutputStream bodyOut = null;
101         try {
102             if (streamRep == null) {
103                 //If there is no filestore, use the byte array to store headers
104                 //  and the body
105                 bodyOut = out;
106             } else {
107                 //Store the body in the stream repository
108                 bodyOut = streamRep.put(mail.getName());
109             }
110         
111             //Write the message to the headerOut and bodyOut.  bodyOut goes straight to the file
112             MimeMessageUtil.writeTo(mail.getMessage(), out, bodyOut);
113             out.flush();
114             bodyOut.flush();
115         
116         } finally {
117             closeOutputStreams(out, bodyOut);
118         }
119     }
120 
121     private void throwException() throws IOException {
122         try {
123             if (wrapped == null) {
124                 throw new IOException("wrapped stream does not exists anymore");
125             } else if (caughtException instanceof IOException) {
126                 throw (IOException) caughtException;
127             } else {
128                 throw new IOException("Exception caugth in worker thread "+caughtException.getMessage()) {
129                     /***
130                      * @see java.lang.Throwable#getCause()
131                      */
132                     public Throwable getCause() {
133                         return caughtException;
134                     }
135                 };
136             }
137         } finally {
138             caughtException = null;
139             wrapped = null;
140         }
141     }
142 
143 
144     /***
145      * Closes output streams used to update message
146      * 
147      * @param headerStream the stream containing header information - potentially the same
148      *               as the body stream
149      * @param bodyStream the stream containing body information
150      * @throws IOException 
151      */
152     private void closeOutputStreams(OutputStream headerStream, OutputStream bodyStream) throws IOException {
153         try {
154             // If the header stream is not the same as the body stream,
155             // close the header stream here.
156             if ((headerStream != null) && (headerStream != bodyStream)) {
157                 headerStream.close();
158             }
159         } finally {
160             if (bodyStream != null) {
161                 bodyStream.close();
162             }
163         }
164     }
165 
166     // wrapper methods
167 
168     /***
169      * @see java.io.InputStream#available()
170      */
171     public int available() throws IOException {
172         if (caughtException != null || wrapped == null) {
173             throwException();
174         }
175         return wrapped.available();
176     }
177 
178     /***
179      * @see java.io.Closeable#close()
180      */
181     public void close() throws IOException {
182         if (caughtException != null || wrapped == null) {
183             throwException();
184         }
185         wrapped.close();
186         wrapped = null;
187     }
188 
189     /***
190      * @see java.io.InputStream#mark(int)
191      */
192     public synchronized void mark(int arg0) {
193         wrapped.mark(arg0);
194     }
195 
196     /***
197      * @see java.io.InputStream#markSupported()
198      */
199     public boolean markSupported() {
200         return wrapped.markSupported();
201     }
202 
203     /***
204      * @see java.io.InputStream#read(byte[], int, int)
205      */
206     public int read(byte[] arg0, int arg1, int arg2) throws IOException {
207         if (caughtException != null || wrapped == null) {
208             throwException();
209         }
210         return wrapped.read(arg0, arg1, arg2);
211     }
212 
213     /***
214      * @see java.io.InputStream#read(byte[])
215      */
216     public int read(byte[] arg0) throws IOException {
217         if (caughtException != null || wrapped == null) {
218             throwException();
219         }
220         return wrapped.read(arg0);
221     }
222 
223     /***
224      * @see java.io.InputStream#reset()
225      */
226     public synchronized void reset() throws IOException {
227         if (caughtException != null || wrapped == null) {
228             throwException();
229         }
230         wrapped.reset();
231     }
232 
233     /***
234      * @see java.io.InputStream#skip(long)
235      */
236     public long skip(long arg0) throws IOException {
237         if (caughtException != null || wrapped == null) {
238             throwException();
239         }
240         return wrapped.skip(arg0);
241     }
242 
243     /***
244      * @see java.io.InputStream#read()
245      */
246     public int read() throws IOException {
247         if (caughtException != null || wrapped == null) {
248             throwException();
249         }
250         return wrapped.read();
251     }
252 
253 }