View Javadoc

1   /************************************************************************
2    * Copyright (c) 2000-2006 The Apache Software Foundation.             *
3    * All rights reserved.                                                *
4    * ------------------------------------------------------------------- *
5    * Licensed under the Apache License, Version 2.0 (the "License"); you *
6    * may not use this file except in compliance with the License. You    *
7    * may obtain a copy of the License at:                                *
8    *                                                                     *
9    *     http://www.apache.org/licenses/LICENSE-2.0                      *
10   *                                                                     *
11   * Unless required by applicable law or agreed to in writing, software *
12   * distributed under the License is distributed on an "AS IS" BASIS,   *
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or     *
14   * implied.  See the License for the specific language governing       *
15   * permissions and limitations under the License.                      *
16   ***********************************************************************/
17  
18  package org.apache.james.mailrepository;
19  
20  import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
21  import org.apache.avalon.cornerstone.services.store.Store;
22  import org.apache.avalon.cornerstone.services.store.StreamRepository;
23  import org.apache.avalon.excalibur.datasource.DataSourceComponent;
24  import org.apache.avalon.framework.activity.Initializable;
25  import org.apache.avalon.framework.service.Serviceable;
26  import org.apache.avalon.framework.service.ServiceManager;
27  import org.apache.avalon.framework.service.ServiceException;
28  import org.apache.avalon.framework.configuration.Configurable;
29  import org.apache.avalon.framework.configuration.Configuration;
30  import org.apache.avalon.framework.configuration.ConfigurationException;
31  import org.apache.avalon.framework.configuration.DefaultConfiguration;
32  import org.apache.avalon.framework.context.Context;
33  import org.apache.avalon.framework.context.ContextException;
34  import org.apache.avalon.framework.context.Contextualizable;
35  import org.apache.avalon.framework.logger.AbstractLogEnabled;
36  import org.apache.james.context.AvalonContextUtilities;
37  import org.apache.james.core.MailImpl;
38  import org.apache.james.core.MimeMessageCopyOnWriteProxy;
39  import org.apache.james.core.MimeMessageWrapper;
40  import org.apache.james.services.MailRepository;
41  import org.apache.james.util.JDBCUtil;
42  import org.apache.james.util.Lock;
43  import org.apache.james.util.SqlResources;
44  import org.apache.mailet.Mail;
45  import org.apache.mailet.MailAddress;
46  
47  import javax.mail.MessagingException;
48  import javax.mail.internet.MimeMessage;
49  
50  import java.io.ByteArrayInputStream;
51  import java.io.ByteArrayOutputStream;
52  import java.io.File;
53  import java.io.IOException;
54  import java.io.ObjectOutputStream;
55  import java.io.ObjectInputStream;
56  import java.sql.Blob;
57  import java.sql.Connection;
58  import java.sql.DatabaseMetaData;
59  import java.sql.PreparedStatement;
60  import java.sql.ResultSet;
61  import java.sql.SQLException;
62  import java.sql.Statement;
63  import java.util.ArrayList;
64  import java.util.Collection;
65  import java.util.HashMap;
66  import java.util.HashSet;
67  import java.util.Iterator;
68  import java.util.List;
69  import java.util.Map;
70  import java.util.Set;
71  import java.util.StringTokenizer;
72  
73  /***
74   * Implementation of a MailRepository on a database.
75   *
76   * <p>Requires a configuration element in the .conf.xml file of the form:
77   *  <br>&lt;repository destinationURL="db://&lt;datasource&gt;/&lt;table_name&gt;/&lt;repository_name&gt;"
78   *  <br>            type="MAIL"
79   *  <br>            model="SYNCHRONOUS"/&gt;
80   *  <br>&lt;/repository&gt;
81   * <p>destinationURL specifies..(Serge??)
82   * <br>Type can be SPOOL or MAIL
83   * <br>Model is currently not used and may be dropped
84   *
85   * <p>Requires a logger called MailRepository.
86   *
87   * @version CVS $Revision: 397494 $ $Date: 2006-04-27 09:43:34 +0000 (gio, 27 apr 2006) $
88   */
89  public class JDBCMailRepository
90      extends AbstractLogEnabled
91      implements MailRepository, Contextualizable, Serviceable, Configurable, Initializable {
92  
93      /***
94       * Whether 'deep debugging' is turned on.
95       */
96      private static final boolean DEEP_DEBUG = false;
97  
98      /***
99       * The Avalon componentManager used by the instance
100      */
101     private ServiceManager componentManager;
102 
103     /***
104      * The Avalon context used by the instance
105      */
106     protected Context context;
107 
108     /***
109      * A lock used to control access to repository elements, locking access
110      * based on the key 
111      */
112     private Lock lock;
113 
114     /***
115      * The table name parsed from the destination URL
116      */
117     protected String tableName;
118 
119     /***
120      * The repository name parsed from the destination URL
121      */
122     protected String repositoryName;
123 
124     /***
125      * The name of the SQL configuration file to be used to configure this repository.
126      */
127     private String sqlFileName;
128 
129     /***
130      * The stream repository used in dbfile mode
131      */
132     private StreamRepository sr = null;
133 
134     /***
135      * The selector used to obtain the JDBC datasource
136      */
137     protected DataSourceSelector datasources;
138 
139     /***
140      * The JDBC datasource that provides the JDBC connection
141      */
142     protected DataSourceComponent datasource;
143 
144     /***
145      * The name of the datasource used by this repository
146      */
147     protected String datasourceName;
148 
149     /***
150      * Contains all of the sql strings for this component.
151      */
152     protected SqlResources sqlQueries;
153 
154     /***
155      * The JDBCUtil helper class
156      */
157     protected JDBCUtil theJDBCUtil;
158     
159     /***
160      * "Support for Mail Attributes under JDBC repositories is ready" indicator.
161      */
162     protected boolean jdbcMailAttributesReady = false;
163 
164     /***
165      * The size threshold for in memory handling of storing operations
166      */
167     private int inMemorySizeLimit;
168 
169     /***
170      * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
171      */
172     public void contextualize(final Context context)
173             throws ContextException {
174         this.context = context;
175     }
176 
177     /***
178      * @see org.apache.avalon.framework.service.Servicable#service(ServiceManager)
179      */
180     public void service( final ServiceManager componentManager )
181         throws ServiceException {
182         StringBuffer logBuffer = null;
183         if (getLogger().isDebugEnabled()) {
184             logBuffer =
185                 new StringBuffer(64)
186                         .append(this.getClass().getName())
187                         .append(".compose()");
188             getLogger().debug(logBuffer.toString());
189         }
190         // Get the DataSourceSelector service
191         datasources = (DataSourceSelector)componentManager.lookup( DataSourceSelector.ROLE );
192         this.componentManager = componentManager;
193 
194     }
195 
196     /***
197      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
198      */
199     public void configure(Configuration conf) throws ConfigurationException {
200         if (getLogger().isDebugEnabled()) {
201             getLogger().debug(this.getClass().getName() + ".configure()");
202         }
203 
204         String destination = conf.getAttribute("destinationURL");
205         // normalize the destination, to simplify processing.
206         if ( ! destination.endsWith("/") ) {
207             destination += "/";
208         }
209         // Parse the DestinationURL for the name of the datasource,
210         // the table to use, and the (optional) repository Key.
211         // Split on "/", starting after "db://"
212         List urlParams = new ArrayList();
213         int start = 5;
214         if (destination.startsWith("dbfile")) {
215             //this is dbfile:// instead of db://
216             start += 4;
217         }
218         int end = destination.indexOf('/', start);
219         while ( end > -1 ) {
220             urlParams.add(destination.substring(start, end));
221             start = end + 1;
222             end = destination.indexOf('/', start);
223         }
224 
225         // Build SqlParameters and get datasource name from URL parameters
226         if (urlParams.size() == 0) {
227             StringBuffer exceptionBuffer =
228                 new StringBuffer(256)
229                         .append("Malformed destinationURL - Must be of the format '")
230                         .append("db://<data-source>[/<table>[/<repositoryName>]]'.  Was passed ")
231                         .append(conf.getAttribute("destinationURL"));
232             throw new ConfigurationException(exceptionBuffer.toString());
233         }
234         if (urlParams.size() >= 1) {
235             datasourceName = (String)urlParams.get(0);
236         }
237         if (urlParams.size() >= 2) {
238             tableName = (String)urlParams.get(1);
239         }
240         if (urlParams.size() >= 3) {
241             repositoryName = "";
242             for (int i = 2; i < urlParams.size(); i++) {
243                 if (i >= 3) {
244                     repositoryName += '/';
245                 }
246                 repositoryName += (String)urlParams.get(i);
247             }
248         }
249 
250         if (getLogger().isDebugEnabled()) {
251             StringBuffer logBuffer =
252                 new StringBuffer(128)
253                         .append("Parsed URL: table = '")
254                         .append(tableName)
255                         .append("', repositoryName = '")
256                         .append(repositoryName)
257                         .append("'");
258             getLogger().debug(logBuffer.toString());
259         }
260         
261         inMemorySizeLimit = conf.getChild("inMemorySizeLimit").getValueAsInteger(409600000); 
262 
263         String filestore = conf.getChild("filestore").getValue(null);
264         sqlFileName = conf.getChild("sqlFile").getValue();
265         if (!sqlFileName.startsWith("file://")) {
266             throw new ConfigurationException
267                 ("Malformed sqlFile - Must be of the format 'file://<filename>'.");
268         }
269         try {
270             if (filestore != null) {
271                 Store store = (Store)componentManager.lookup(Store.ROLE);
272                 //prepare Configurations for stream repositories
273                 DefaultConfiguration streamConfiguration
274                     = new DefaultConfiguration( "repository",
275                                                 "generated:JDBCMailRepository.configure()" );
276 
277                 streamConfiguration.setAttribute( "destinationURL", filestore );
278                 streamConfiguration.setAttribute( "type", "STREAM" );
279                 streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
280                 sr = (StreamRepository) store.select(streamConfiguration);
281 
282                 if (getLogger().isDebugEnabled()) {
283                     getLogger().debug("Got filestore for JdbcMailRepository: " + filestore);
284                 }
285             }
286 
287             lock = new Lock();
288             if (getLogger().isDebugEnabled()) {
289                 StringBuffer logBuffer =
290                     new StringBuffer(128)
291                             .append(this.getClass().getName())
292                             .append(" created according to ")
293                             .append(destination);
294                 getLogger().debug(logBuffer.toString());
295             }
296         } catch (Exception e) {
297             final String message = "Failed to retrieve Store component:" + e.getMessage();
298             getLogger().error(message, e);
299             throw new ConfigurationException(message, e);
300         }
301     }
302 
303     /***
304      * Initialises the JDBC repository.
305      * 1) Tests the connection to the database.
306      * 2) Loads SQL strings from the SQL definition file,
307      *     choosing the appropriate SQL for this connection,
308      *     and performing paramter substitution,
309      * 3) Initialises the database with the required tables, if necessary.
310      *
311      * @throws Exception if an error occurs
312      */
313     public void initialize() throws Exception {
314         StringBuffer logBuffer = null;
315         if (getLogger().isDebugEnabled()) {
316             getLogger().debug(this.getClass().getName() + ".initialize()");
317         }
318 
319         theJDBCUtil =
320             new JDBCUtil() {
321                 protected void delegatedLog(String logString) {
322                     JDBCMailRepository.this.getLogger().warn("JDBCMailRepository: " + logString);
323                 }
324             };
325         // Get the data-source required.
326         datasource = (DataSourceComponent)datasources.select(datasourceName);
327 
328         // Test the connection to the database, by getting the DatabaseMetaData.
329         Connection conn = datasource.getConnection();
330         PreparedStatement createStatement = null;
331 
332         try {
333             // Initialise the sql strings.
334 
335             File sqlFile = null;
336             try {
337                 sqlFile = AvalonContextUtilities.getFile(context, sqlFileName);
338                 sqlFileName = null;
339             } catch (Exception e) {
340                 getLogger().fatalError(e.getMessage(), e);
341                 throw e;
342             }
343 
344             if (getLogger().isDebugEnabled()) {
345                 logBuffer =
346                     new StringBuffer(128)
347                             .append("Reading SQL resources from file: ")
348                             .append(sqlFile.getAbsolutePath())
349                             .append(", section ")
350                             .append(this.getClass().getName())
351                             .append(".");
352                 getLogger().debug(logBuffer.toString());
353             }
354 
355             // Build the statement parameters
356             Map sqlParameters = new HashMap();
357             if (tableName != null) {
358                 sqlParameters.put("table", tableName);
359             }
360             if (repositoryName != null) {
361                 sqlParameters.put("repository", repositoryName);
362             }
363 
364             sqlQueries = new SqlResources();
365             sqlQueries.init(sqlFile, this.getClass().getName(),
366                             conn, sqlParameters);
367 
368             // Check if the required table exists. If not, create it.
369             DatabaseMetaData dbMetaData = conn.getMetaData();
370             // Need to ask in the case that identifiers are stored, ask the DatabaseMetaInfo.
371             // Try UPPER, lower, and MixedCase, to see if the table is there.
372             if (!(theJDBCUtil.tableExists(dbMetaData, tableName))) {
373                 // Users table doesn't exist - create it.
374                 createStatement =
375                     conn.prepareStatement(sqlQueries.getSqlString("createTable", true));
376                 createStatement.execute();
377 
378                 if (getLogger().isInfoEnabled()) {
379                     logBuffer =
380                         new StringBuffer(64)
381                                 .append("JdbcMailRepository: Created table '")
382                                 .append(tableName)
383                                 .append("'.");
384                     getLogger().info(logBuffer.toString());
385                 }
386             }
387             
388             checkJdbcAttributesSupport(dbMetaData);
389 
390         } finally {
391             theJDBCUtil.closeJDBCStatement(createStatement);
392             theJDBCUtil.closeJDBCConnection(conn);
393         }
394     }
395     
396     /*** Checks whether support for JDBC Mail atributes is activated for this repository
397      * and if everything is consistent.
398      * Looks for both the "updateMessageAttributesSQL" and "retrieveMessageAttributesSQL"
399      * statements in sqlResources and for a table column named "message_attributes".
400      *
401      * @param dbMetaData the database metadata to be used to look up the column
402      * @throws SQLException if a fatal situation is met
403      */
404     protected void checkJdbcAttributesSupport(DatabaseMetaData dbMetaData) throws SQLException {
405         String attributesColumnName = "message_attributes";
406         boolean hasUpdateMessageAttributesSQL = false;
407         boolean hasRetrieveMessageAttributesSQL = false;
408         
409         boolean hasMessageAttributesColumn = theJDBCUtil.columnExists(dbMetaData, tableName, attributesColumnName);
410         
411         StringBuffer logBuffer = new StringBuffer(64)
412                                     .append("JdbcMailRepository '"
413                                             + repositoryName
414                                             + ", table '"
415                                             + tableName
416                                             + "': ");
417         
418         //Determine whether attributes are used and available for storing
419         //Do we have updateMessageAttributesSQL?
420         String updateMessageAttrSql =
421             sqlQueries.getSqlString("updateMessageAttributesSQL", false);
422         if (updateMessageAttrSql!=null) {
423             hasUpdateMessageAttributesSQL = true;
424         }
425         
426         //Determine whether attributes are used and retrieve them
427         //Do we have retrieveAttributesSQL?
428         String retrieveMessageAttrSql =
429             sqlQueries.getSqlString("retrieveMessageAttributesSQL", false);
430         if (retrieveMessageAttrSql!=null) {
431             hasRetrieveMessageAttributesSQL = true;
432         }
433         
434         if (hasUpdateMessageAttributesSQL && !hasRetrieveMessageAttributesSQL) {
435             logBuffer.append("JDBC Mail Attributes support was activated for update but not for retrieval"
436                              + "(found 'updateMessageAttributesSQL' but not 'retrieveMessageAttributesSQL'"
437                              + "in table '"
438                              + tableName
439                              + "').");
440             getLogger().fatalError(logBuffer.toString());
441             throw new SQLException(logBuffer.toString());
442         }
443         if (!hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) {
444             logBuffer.append("JDBC Mail Attributes support was activated for retrieval but not for update"
445                              + "(found 'retrieveMessageAttributesSQL' but not 'updateMessageAttributesSQL'"
446                              + "in table '"
447                              + tableName
448                              + "'.");
449             getLogger().fatalError(logBuffer.toString());
450             throw new SQLException(logBuffer.toString());
451         }
452         if (!hasMessageAttributesColumn
453             && (hasUpdateMessageAttributesSQL || hasRetrieveMessageAttributesSQL)
454             ) {
455                 logBuffer.append("JDBC Mail Attributes support was activated but column '"
456                                  + attributesColumnName
457                                  + "' is missing in table '"
458                                  + tableName
459                                  + "'.");
460                 getLogger().fatalError(logBuffer.toString());
461                 throw new SQLException(logBuffer.toString());
462         }
463         if (hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) {
464             jdbcMailAttributesReady = true;
465             if (getLogger().isInfoEnabled()) {
466                 logBuffer.append("JDBC Mail Attributes support ready.");
467                 getLogger().info(logBuffer.toString());
468             }
469         } else {
470             jdbcMailAttributesReady = false;
471             logBuffer.append("JDBC Mail Attributes support not activated. "
472                              + "Missing both 'updateMessageAttributesSQL' "
473                              + "and 'retrieveMessageAttributesSQL' "
474                              + "statements for table '"
475                              + tableName
476                              + "' in sqlResources.xml. "
477                              + "Will not persist in the repository '"
478                              + repositoryName
479                              + "'.");
480             getLogger().warn(logBuffer.toString());
481         }
482     }
483 
484     /***
485      * Releases a lock on a message identified by a key
486      *
487      * @param key the key of the message to be unlocked
488      *
489      * @return true if successfully released the lock, false otherwise
490      */
491     public boolean unlock(String key) {
492         if (lock.unlock(key)) {
493             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
494                 StringBuffer debugBuffer =
495                     new StringBuffer(256)
496                             .append("Unlocked ")
497                             .append(key)
498                             .append(" for ")
499                             .append(Thread.currentThread().getName())
500                             .append(" @ ")
501                             .append(new java.util.Date(System.currentTimeMillis()));
502                 getLogger().debug(debugBuffer.toString());
503             }
504             return true;
505         } else {
506             return false;
507         }
508     }
509 
510     /***
511      * Obtains a lock on a message identified by a key
512      *
513      * @param key the key of the message to be locked
514      *
515      * @return true if successfully obtained the lock, false otherwise
516      */
517     public boolean lock(String key) {
518         if (lock.lock(key)) {
519             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
520                 StringBuffer debugBuffer =
521                     new StringBuffer(256)
522                             .append("Locked ")
523                             .append(key)
524                             .append(" for ")
525                             .append(Thread.currentThread().getName())
526                             .append(" @ ")
527                             .append(new java.util.Date(System.currentTimeMillis()));
528                 getLogger().debug(debugBuffer.toString());
529             }
530             return true;
531         } else {
532             return false;
533         }
534     }
535 
536     /***
537      * Store this message to the database.  Optionally stores the message
538      * body to the filesystem and only writes the headers to the database.
539      */
540     public void store(Mail mc) throws MessagingException {
541         Connection conn = null;
542         boolean wasLocked = true;
543         String key = mc.getName();
544         try {
545             synchronized(this) {
546                   wasLocked = lock.isLocked(key);
547     
548                   if (!wasLocked) {
549                       //If it wasn't locked, we want a lock during the store
550                       lock(key);
551                   }
552             }
553             conn = datasource.getConnection();
554 
555             //Need to determine whether need to insert this record, or update it.
556 
557             //Begin a transaction
558             conn.setAutoCommit(false);
559 
560             PreparedStatement checkMessageExists = null;
561             ResultSet rsExists = null;
562             boolean exists = false;
563             try {
564                 checkMessageExists = 
565                     conn.prepareStatement(sqlQueries.getSqlString("checkMessageExistsSQL", true));
566                 checkMessageExists.setString(1, mc.getName());
567                 checkMessageExists.setString(2, repositoryName);
568                 rsExists = checkMessageExists.executeQuery();
569                 exists = rsExists.next() && rsExists.getInt(1) > 0;
570             } finally {
571                 theJDBCUtil.closeJDBCResultSet(rsExists);
572                 theJDBCUtil.closeJDBCStatement(checkMessageExists);
573             }
574 
575             if (exists) {
576                 //Update the existing record
577                 PreparedStatement updateMessage = null;
578 
579                 try {
580                     updateMessage =
581                         conn.prepareStatement(sqlQueries.getSqlString("updateMessageSQL", true));
582                     updateMessage.setString(1, mc.getState());
583                     updateMessage.setString(2, mc.getErrorMessage());
584                     if (mc.getSender() == null) {
585                         updateMessage.setNull(3, java.sql.Types.VARCHAR);
586                     } else {
587                         updateMessage.setString(3, mc.getSender().toString());
588                     }
589                     StringBuffer recipients = new StringBuffer();
590                     for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) {
591                         recipients.append(i.next().toString());
592                         if (i.hasNext()) {
593                             recipients.append("\r\n");
594                         }
595                     }
596                     updateMessage.setString(4, recipients.toString());
597                     updateMessage.setString(5, mc.getRemoteHost());
598                     updateMessage.setString(6, mc.getRemoteAddr());
599                     updateMessage.setTimestamp(7, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
600                     updateMessage.setString(8, mc.getName());
601                     updateMessage.setString(9, repositoryName);
602                     updateMessage.execute();
603                 } finally {
604                     Statement localUpdateMessage = updateMessage;
605                     // Clear reference to statement
606                     updateMessage = null;
607                     theJDBCUtil.closeJDBCStatement(localUpdateMessage);
608                 }
609 
610                 //Determine whether attributes are used and available for storing
611                 if (jdbcMailAttributesReady && mc.hasAttributes()) {
612                     String updateMessageAttrSql =
613                         sqlQueries.getSqlString("updateMessageAttributesSQL", false);
614                     PreparedStatement updateMessageAttr = null;
615                     try {
616                         updateMessageAttr =
617                             conn.prepareStatement(updateMessageAttrSql);
618                         ByteArrayOutputStream baos = new ByteArrayOutputStream();
619                         ObjectOutputStream oos = new ObjectOutputStream(baos);
620                         try {
621                             if (mc instanceof MailImpl) {
622                             oos.writeObject(((MailImpl)mc).getAttributesRaw());
623                             } else {
624                                 HashMap temp = new HashMap();
625                                 for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) {
626                                     String hashKey = (String) i.next();
627                                     temp.put(hashKey,mc.getAttribute(hashKey));
628                                 }
629                                 oos.writeObject(temp);
630                             }
631                             oos.flush();
632                             ByteArrayInputStream attrInputStream =
633                                 new ByteArrayInputStream(baos.toByteArray());
634                             updateMessageAttr.setBinaryStream(1, attrInputStream, baos.size());
635                         } finally {
636                             try {
637                                 if (oos != null) {
638                                     oos.close();
639                                 }
640                             } catch (IOException ioe) {
641                                 getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe);
642                             }
643                         }
644                         updateMessageAttr.setString(2, mc.getName());
645                         updateMessageAttr.setString(3, repositoryName);
646                         updateMessageAttr.execute();
647                     } catch (SQLException sqle) {
648                         getLogger().info("JDBCMailRepository: Trying to update mail attributes failed.",sqle);
649                         
650                     } finally {
651                         theJDBCUtil.closeJDBCStatement(updateMessageAttr);
652                     }
653                 }
654 
655                 //Determine whether the message body has changed, and possibly avoid
656                 //  updating the database.
657                 MimeMessage messageBody = mc.getMessage();
658                 boolean saveBody = false;
659                 // if the message is a CopyOnWrite proxy we check the modified wrapped object.
660                 if (messageBody instanceof MimeMessageCopyOnWriteProxy) {
661                     MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) messageBody;
662                     messageBody = messageCow.getWrappedMessage();
663                 }
664                 if (messageBody instanceof MimeMessageWrapper) {
665                     MimeMessageWrapper message = (MimeMessageWrapper)messageBody;
666                     saveBody = message.isModified();
667                 } else {
668                     saveBody = true;
669                 }
670                 
671                 if (saveBody) {
672                     PreparedStatement updateMessageBody = 
673                         conn.prepareStatement(sqlQueries.getSqlString("updateMessageBodySQL", true));
674                     try {
675                         MessageInputStream is = new MessageInputStream(mc,sr,inMemorySizeLimit);
676                         updateMessageBody.setBinaryStream(1,is,(int) is.getSize());
677                         updateMessageBody.setString(2, mc.getName());
678                         updateMessageBody.setString(3, repositoryName);
679                         updateMessageBody.execute();
680                         
681                     } finally {
682                         theJDBCUtil.closeJDBCStatement(updateMessageBody);
683                     }
684                 }
685                 
686 
687             } else {
688                 //Insert the record into the database
689                 PreparedStatement insertMessage = null;
690                 try {
691                     String insertMessageSQL = sqlQueries.getSqlString("insertMessageSQL", true);
692                     int number_of_parameters = getNumberOfParameters (insertMessageSQL);
693                     insertMessage =
694                         conn.prepareStatement(insertMessageSQL);
695                     insertMessage.setString(1, mc.getName());
696                     insertMessage.setString(2, repositoryName);
697                     insertMessage.setString(3, mc.getState());
698                     insertMessage.setString(4, mc.getErrorMessage());
699                     if (mc.getSender() == null) {
700                         insertMessage.setNull(5, java.sql.Types.VARCHAR);
701                     } else {
702                         insertMessage.setString(5, mc.getSender().toString());
703                     }
704                     StringBuffer recipients = new StringBuffer();
705                     for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) {
706                         recipients.append(i.next().toString());
707                         if (i.hasNext()) {
708                             recipients.append("\r\n");
709                         }
710                     }
711                     insertMessage.setString(6, recipients.toString());
712                     insertMessage.setString(7, mc.getRemoteHost());
713                     insertMessage.setString(8, mc.getRemoteAddr());
714                     insertMessage.setTimestamp(9, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
715 
716                     MessageInputStream is = new MessageInputStream(mc, sr, inMemorySizeLimit);
717 
718                     insertMessage.setBinaryStream(10, is, (int) is.getSize());
719                     
720                     //Store attributes
721                     if (number_of_parameters > 10) {
722                         ByteArrayOutputStream baos = new ByteArrayOutputStream();
723                         ObjectOutputStream oos = new ObjectOutputStream(baos);
724                         try {
725                             if (mc instanceof MailImpl) {
726                             oos.writeObject(((MailImpl)mc).getAttributesRaw());
727                             } else {
728                                 HashMap temp = new HashMap();
729                                 for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) {
730                                     String hashKey = (String) i.next();
731                                     temp.put(hashKey,mc.getAttribute(hashKey));
732                                 }
733                                 oos.writeObject(temp);
734                             }
735                             oos.flush();
736                             ByteArrayInputStream attrInputStream =
737                                 new ByteArrayInputStream(baos.toByteArray());
738                             insertMessage.setBinaryStream(11, attrInputStream, baos.size());
739                         } finally {
740                             try {
741                                 if (oos != null) {
742                                     oos.close();
743                                 }
744                             } catch (IOException ioe) {
745                                 getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe);
746                             }
747                         }                        
748                     }
749                     
750                     insertMessage.execute();
751                 } finally {
752                     theJDBCUtil.closeJDBCStatement(insertMessage);
753                 }
754             }
755 
756 
757             conn.commit();
758             conn.setAutoCommit(true);
759 
760         } catch (Exception e) {
761             getLogger().error("Exception caught while storing mail Container",e);
762             throw new MessagingException("Exception caught while storing mail Container: ",e);
763         } finally {
764             theJDBCUtil.closeJDBCConnection(conn);
765             if (!wasLocked) {
766                 // If it wasn't locked, we need to unlock now
767                 unlock(key);
768                 synchronized (this) {
769                     notify();
770                 }
771             }
772         }
773     }
774 
775     /***
776      * Retrieves a message given a key. At the moment, keys can be obtained
777      * from list()
778      *
779      * @param key the key of the message to retrieve
780      * @return the mail corresponding to this key, null if none exists
781      */
782     public Mail retrieve(String key) throws MessagingException {
783         if (DEEP_DEBUG) {
784             System.err.println("retrieving " + key);
785         }
786         Connection conn = null;
787         PreparedStatement retrieveMessage = null;
788         ResultSet rsMessage = null;
789         try {
790             conn = datasource.getConnection();
791             if (DEEP_DEBUG) {
792                 System.err.println("got a conn " + key);
793             }
794 
795             retrieveMessage =
796                 conn.prepareStatement(sqlQueries.getSqlString("retrieveMessageSQL", true));
797             retrieveMessage.setString(1, key);
798             retrieveMessage.setString(2, repositoryName);
799             rsMessage = retrieveMessage.executeQuery();
800             if (DEEP_DEBUG) {
801                 System.err.println("ran the query " + key);
802             }
803             if (!rsMessage.next()) {
804                 if (getLogger().isDebugEnabled()) {
805                     StringBuffer debugBuffer =
806                         new StringBuffer(64)
807                                 .append("Did not find a record ")
808                                 .append(key)
809                                 .append(" in ")
810                                 .append(repositoryName);
811                     getLogger().debug(debugBuffer.toString());
812                 }
813                 return null;
814             }
815             //Determine whether attributes are used and retrieve them
816             PreparedStatement retrieveMessageAttr = null;
817             HashMap attributes = null;
818             if (jdbcMailAttributesReady) {
819                 String retrieveMessageAttrSql =
820                     sqlQueries.getSqlString("retrieveMessageAttributesSQL", false);
821                 ResultSet rsMessageAttr = null;
822                 try {
823                     retrieveMessageAttr =
824                         conn.prepareStatement(retrieveMessageAttrSql);
825                     
826                     retrieveMessageAttr.setString(1, key);
827                     retrieveMessageAttr.setString(2, repositoryName);
828                     rsMessageAttr = retrieveMessageAttr.executeQuery();
829                     
830                     if (rsMessageAttr.next()) {
831                         try {
832                             byte[] serialized_attr = null;
833                             String getAttributesOption = sqlQueries.getDbOption("getAttributes");
834                             if (getAttributesOption != null && (getAttributesOption.equalsIgnoreCase("useBlob") || getAttributesOption.equalsIgnoreCase("useBinaryStream"))) {
835                                 Blob b = rsMessageAttr.getBlob(1);
836                                 serialized_attr = b.getBytes(1, (int)b.length());
837                             } else {
838                                 serialized_attr = rsMessageAttr.getBytes(1);
839                             }
840                             // this check is for better backwards compatibility
841                             if (serialized_attr != null) {
842                                 ByteArrayInputStream bais = new ByteArrayInputStream(serialized_attr);
843                                 ObjectInputStream ois = new ObjectInputStream(bais);
844                                 attributes = (HashMap)ois.readObject();
845                                 ois.close();
846                             }
847                         } catch (IOException ioe) {
848                             if (getLogger().isDebugEnabled()) {
849                                 StringBuffer debugBuffer =
850                                     new StringBuffer(64)
851                                     .append("Exception reading attributes ")
852                                     .append(key)
853                                     .append(" in ")
854                                     .append(repositoryName);
855                                 getLogger().debug(debugBuffer.toString(), ioe);
856                             }
857                         }
858                     } else {
859                         if (getLogger().isDebugEnabled()) {
860                             StringBuffer debugBuffer =
861                                 new StringBuffer(64)
862                                 .append("Did not find a record (attributes) ")
863                                 .append(key)
864                                 .append(" in ")
865                             .append(repositoryName);
866                             getLogger().debug(debugBuffer.toString());
867                         }
868                     }
869                 } catch (SQLException sqle) {
870                     StringBuffer errorBuffer =  new StringBuffer(256)
871                                                 .append("Error retrieving message")
872                                                 .append(sqle.getMessage())
873                                                 .append(sqle.getErrorCode())
874                                                 .append(sqle.getSQLState())
875                                                 .append(sqle.getNextException());
876                     getLogger().error(errorBuffer.toString());
877                 } finally {
878                     theJDBCUtil.closeJDBCResultSet(rsMessageAttr);
879                     theJDBCUtil.closeJDBCStatement(retrieveMessageAttr);
880                 }
881             }
882 
883             MailImpl mc = new MailImpl();
884             mc.setAttributesRaw (attributes);
885             mc.setName(key);
886             mc.setState(rsMessage.getString(1));
887             mc.setErrorMessage(rsMessage.getString(2));
888             String sender = rsMessage.getString(3);
889             if (sender == null) {
890                 mc.setSender(null);
891             } else {
892                 mc.setSender(new MailAddress(sender));
893             }
894             StringTokenizer st = new StringTokenizer(rsMessage.getString(4), "\r\n", false);
895             Set recipients = new HashSet();
896             while (st.hasMoreTokens()) {
897                 recipients.add(new MailAddress(st.nextToken()));
898             }
899             mc.setRecipients(recipients);
900             mc.setRemoteHost(rsMessage.getString(5));
901             mc.setRemoteAddr(rsMessage.getString(6));
902             mc.setLastUpdated(rsMessage.getTimestamp(7));
903 
904             MimeMessageJDBCSource source = new MimeMessageJDBCSource(this, key, sr);
905             MimeMessageCopyOnWriteProxy message = new MimeMessageCopyOnWriteProxy(source);
906             mc.setMessage(message);
907             return mc;
908         } catch (SQLException sqle) {
909             StringBuffer errorBuffer =  new StringBuffer(256)
910                                         .append("Error retrieving message")
911                                         .append(sqle.getMessage())
912                                         .append(sqle.getErrorCode())
913                                         .append(sqle.getSQLState())
914                                         .append(sqle.getNextException());
915             getLogger().error(errorBuffer.toString());
916             throw new MessagingException("Exception while retrieving mail: " + sqle.getMessage());
917         } catch (Exception me) {
918             throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
919         } finally {
920             theJDBCUtil.closeJDBCResultSet(rsMessage);
921             theJDBCUtil.closeJDBCStatement(retrieveMessage);
922             theJDBCUtil.closeJDBCConnection(conn);
923         }
924     }
925 
926     /***
927      * Removes a specified message
928      *
929      * @param mail the message to be removed from the repository
930      */
931     public void remove(Mail mail) throws MessagingException {
932         remove(mail.getName());
933     }
934 
935     /***
936      * Removes a Collection of mails from the repository
937      * @param mails The Collection of <code>MailImpl</code>'s to delete
938      * @throws MessagingException
939      * @since 2.2.0
940      */
941     public void remove(Collection mails) throws MessagingException {
942         Iterator delList = mails.iterator();
943         while (delList.hasNext()) {
944             remove((Mail)delList.next());
945         }
946     }
947 
948     /***
949      * Removes a message identified by a key.
950      *
951      * @param key the key of the message to be removed from the repository
952      */
953     public void remove(String key) throws MessagingException {
954         //System.err.println("removing " + key);
955         if (lock(key)) {
956             Connection conn = null;
957             PreparedStatement removeMessage = null;
958             try {
959                 conn = datasource.getConnection();
960                 removeMessage =
961                     conn.prepareStatement(sqlQueries.getSqlString("removeMessageSQL", true));
962                 removeMessage.setString(1, key);
963                 removeMessage.setString(2, repositoryName);
964                 removeMessage.execute();
965 
966                 if (sr != null) {
967                     sr.remove(key);
968                 }
969             } catch (Exception me) {
970                 throw new MessagingException("Exception while removing mail: " + me.getMessage());
971             } finally {
972                 theJDBCUtil.closeJDBCStatement(removeMessage);
973                 theJDBCUtil.closeJDBCConnection(conn);
974                 unlock(key);
975             }
976         }
977     }
978 
979     /***
980      * Gets a list of message keys stored in this repository.
981      *
982      * @return an Iterator of the message keys
983      */
984     public Iterator list() throws MessagingException {
985         //System.err.println("listing messages");
986         Connection conn = null;
987         PreparedStatement listMessages = null;
988         ResultSet rsListMessages = null;
989         try {
990             conn = datasource.getConnection();
991             listMessages =
992                 conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
993             listMessages.setString(1, repositoryName);
994             rsListMessages = listMessages.executeQuery();
995 
996             List messageList = new ArrayList();
997             while (rsListMessages.next() && !Thread.currentThread().isInterrupted()) {
998                 messageList.add(rsListMessages.getString(1));
999             }
1000             return messageList.iterator();
1001         } catch (Exception me) {
1002             throw new MessagingException("Exception while listing mail: " + me.getMessage());
1003         } finally {
1004             theJDBCUtil.closeJDBCResultSet(rsListMessages);
1005             theJDBCUtil.closeJDBCStatement(listMessages);
1006             theJDBCUtil.closeJDBCConnection(conn);
1007         }
1008     }
1009 
1010     /***
1011      * Gets the SQL connection to be used by this JDBCMailRepository
1012      *
1013      * @return the connection
1014      * @throws SQLException if there is an issue with getting the connection
1015      */
1016     protected Connection getConnection() throws SQLException {
1017         return datasource.getConnection();
1018     }
1019 
1020     /***
1021      * @see java.lang.Object#equals(Object)
1022      */
1023     public boolean equals(Object obj) {
1024         if (!(obj instanceof JDBCMailRepository)) {
1025             return false;
1026         }
1027         // TODO: Figure out whether other instance variables should be part of
1028         // the equals equation
1029         JDBCMailRepository repository = (JDBCMailRepository)obj;
1030         return  ((repository.tableName == tableName) || ((repository.tableName != null) && repository.tableName.equals(tableName))) && 
1031                 ((repository.repositoryName == repositoryName) || ((repository.repositoryName != null) && repository.repositoryName.equals(repositoryName)));
1032     }
1033 
1034     /***
1035      * Provide a hash code that is consistent with equals for this class
1036      *
1037      * @return the hash code
1038      */
1039      public int hashCode() {
1040         int result = 17;
1041         if (tableName != null) {
1042             result = 37 * tableName.hashCode();
1043         }
1044         if (repositoryName != null) {
1045             result = 37 * repositoryName.hashCode();
1046         }
1047         return result;
1048      }
1049 
1050     /***
1051      * This method calculates number of parameters in a prepared statement SQL String.
1052      * It does so by counting the number of '?' in the string 
1053      * @param sqlstring to return parameter count for
1054      * @return number of parameters
1055      **/
1056     private int getNumberOfParameters (String sqlstring) {
1057         //it is alas a java 1.4 feature to be able to call
1058         //getParameterMetaData which could provide us with the parameterCount
1059         char[] chars = sqlstring.toCharArray();
1060         int count = 0;
1061         for (int i = 0; i < chars.length; i++) {
1062             count += chars[i]=='?' ? 1 : 0;
1063         }
1064         return count;
1065     }
1066 }