View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.mailrepository;
21  
22  import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
23  import org.apache.avalon.cornerstone.services.store.Store;
24  import org.apache.avalon.cornerstone.services.store.StreamRepository;
25  import org.apache.avalon.excalibur.datasource.DataSourceComponent;
26  import org.apache.avalon.framework.activity.Initializable;
27  import org.apache.avalon.framework.service.Serviceable;
28  import org.apache.avalon.framework.service.ServiceManager;
29  import org.apache.avalon.framework.service.ServiceException;
30  import org.apache.avalon.framework.configuration.Configurable;
31  import org.apache.avalon.framework.configuration.Configuration;
32  import org.apache.avalon.framework.configuration.ConfigurationException;
33  import org.apache.avalon.framework.configuration.DefaultConfiguration;
34  import org.apache.avalon.framework.context.Context;
35  import org.apache.avalon.framework.context.ContextException;
36  import org.apache.avalon.framework.context.Contextualizable;
37  import org.apache.avalon.framework.logger.AbstractLogEnabled;
38  import org.apache.james.context.AvalonContextUtilities;
39  import org.apache.james.core.MailImpl;
40  import org.apache.james.core.MimeMessageCopyOnWriteProxy;
41  import org.apache.james.core.MimeMessageWrapper;
42  import org.apache.james.services.MailRepository;
43  import org.apache.james.util.JDBCUtil;
44  import org.apache.james.util.Lock;
45  import org.apache.james.util.SqlResources;
46  import org.apache.mailet.Mail;
47  import org.apache.mailet.MailAddress;
48  
49  import javax.mail.MessagingException;
50  import javax.mail.internet.MimeMessage;
51  
52  import java.io.ByteArrayInputStream;
53  import java.io.ByteArrayOutputStream;
54  import java.io.File;
55  import java.io.IOException;
56  import java.io.ObjectOutputStream;
57  import java.io.ObjectInputStream;
58  import java.sql.Blob;
59  import java.sql.Connection;
60  import java.sql.DatabaseMetaData;
61  import java.sql.PreparedStatement;
62  import java.sql.ResultSet;
63  import java.sql.SQLException;
64  import java.sql.Statement;
65  import java.util.ArrayList;
66  import java.util.Collection;
67  import java.util.HashMap;
68  import java.util.HashSet;
69  import java.util.Iterator;
70  import java.util.List;
71  import java.util.Map;
72  import java.util.Set;
73  import java.util.StringTokenizer;
74  
75  /***
76   * Implementation of a MailRepository on a database.
77   *
78   * <p>Requires a configuration element in the .conf.xml file of the form:
79   *  <br>&lt;repository destinationURL="db://&lt;datasource&gt;/&lt;table_name&gt;/&lt;repository_name&gt;"
80   *  <br>            type="MAIL"
81   *  <br>            model="SYNCHRONOUS"/&gt;
82   *  <br>&lt;/repository&gt;
83   * <p>destinationURL specifies..(Serge??)
84   * <br>Type can be SPOOL or MAIL
85   * <br>Model is currently not used and may be dropped
86   *
87   * <p>Requires a logger called MailRepository.
88   *
89   * @version CVS $Revision: 494012 $ $Date: 2007-01-08 10:23:58 +0000 (Mon, 08 Jan 2007) $
90   */
91  public class JDBCMailRepository
92      extends AbstractLogEnabled
93      implements MailRepository, Contextualizable, Serviceable, Configurable, Initializable {
94  
95      /***
96       * Whether 'deep debugging' is turned on.
97       */
98      private static final boolean DEEP_DEBUG = false;
99  
100     /***
101      * The Avalon componentManager used by the instance
102      */
103     private ServiceManager componentManager;
104 
105     /***
106      * The Avalon context used by the instance
107      */
108     protected Context context;
109 
110     /***
111      * A lock used to control access to repository elements, locking access
112      * based on the key 
113      */
114     private Lock lock;
115 
116     /***
117      * The table name parsed from the destination URL
118      */
119     protected String tableName;
120 
121     /***
122      * The repository name parsed from the destination URL
123      */
124     protected String repositoryName;
125 
126     /***
127      * The name of the SQL configuration file to be used to configure this repository.
128      */
129     private String sqlFileName;
130 
131     /***
132      * The stream repository used in dbfile mode
133      */
134     private StreamRepository sr = null;
135 
136     /***
137      * The selector used to obtain the JDBC datasource
138      */
139     protected DataSourceSelector datasources;
140 
141     /***
142      * The JDBC datasource that provides the JDBC connection
143      */
144     protected DataSourceComponent datasource;
145 
146     /***
147      * The name of the datasource used by this repository
148      */
149     protected String datasourceName;
150 
151     /***
152      * Contains all of the sql strings for this component.
153      */
154     protected SqlResources sqlQueries;
155 
156     /***
157      * The JDBCUtil helper class
158      */
159     protected JDBCUtil theJDBCUtil;
160     
161     /***
162      * "Support for Mail Attributes under JDBC repositories is ready" indicator.
163      */
164     protected boolean jdbcMailAttributesReady = false;
165 
166     /***
167      * The size threshold for in memory handling of storing operations
168      */
169     private int inMemorySizeLimit;
170 
171     /***
172      * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
173      */
174     public void contextualize(final Context context)
175             throws ContextException {
176         this.context = context;
177     }
178 
179     /***
180      * @see org.apache.avalon.framework.service.Servicable#service(ServiceManager)
181      */
182     public void service( final ServiceManager componentManager )
183         throws ServiceException {
184         StringBuffer logBuffer = null;
185         if (getLogger().isDebugEnabled()) {
186             logBuffer =
187                 new StringBuffer(64)
188                         .append(this.getClass().getName())
189                         .append(".compose()");
190             getLogger().debug(logBuffer.toString());
191         }
192         // Get the DataSourceSelector service
193         datasources = (DataSourceSelector)componentManager.lookup( DataSourceSelector.ROLE );
194         this.componentManager = componentManager;
195 
196     }
197 
198     /***
199      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
200      */
201     public void configure(Configuration conf) throws ConfigurationException {
202         if (getLogger().isDebugEnabled()) {
203             getLogger().debug(this.getClass().getName() + ".configure()");
204         }
205 
206         String destination = conf.getAttribute("destinationURL");
207         // normalize the destination, to simplify processing.
208         if ( ! destination.endsWith("/") ) {
209             destination += "/";
210         }
211         // Parse the DestinationURL for the name of the datasource,
212         // the table to use, and the (optional) repository Key.
213         // Split on "/", starting after "db://"
214         List urlParams = new ArrayList();
215         int start = 5;
216         if (destination.startsWith("dbfile")) {
217             //this is dbfile:// instead of db://
218             start += 4;
219         }
220         int end = destination.indexOf('/', start);
221         while ( end > -1 ) {
222             urlParams.add(destination.substring(start, end));
223             start = end + 1;
224             end = destination.indexOf('/', start);
225         }
226 
227         // Build SqlParameters and get datasource name from URL parameters
228         if (urlParams.size() == 0) {
229             StringBuffer exceptionBuffer =
230                 new StringBuffer(256)
231                         .append("Malformed destinationURL - Must be of the format '")
232                         .append("db://<data-source>[/<table>[/<repositoryName>]]'.  Was passed ")
233                         .append(conf.getAttribute("destinationURL"));
234             throw new ConfigurationException(exceptionBuffer.toString());
235         }
236         if (urlParams.size() >= 1) {
237             datasourceName = (String)urlParams.get(0);
238         }
239         if (urlParams.size() >= 2) {
240             tableName = (String)urlParams.get(1);
241         }
242         if (urlParams.size() >= 3) {
243             repositoryName = "";
244             for (int i = 2; i < urlParams.size(); i++) {
245                 if (i >= 3) {
246                     repositoryName += '/';
247                 }
248                 repositoryName += (String)urlParams.get(i);
249             }
250         }
251 
252         if (getLogger().isDebugEnabled()) {
253             StringBuffer logBuffer =
254                 new StringBuffer(128)
255                         .append("Parsed URL: table = '")
256                         .append(tableName)
257                         .append("', repositoryName = '")
258                         .append(repositoryName)
259                         .append("'");
260             getLogger().debug(logBuffer.toString());
261         }
262         
263         inMemorySizeLimit = conf.getChild("inMemorySizeLimit").getValueAsInteger(409600000); 
264 
265         String filestore = conf.getChild("filestore").getValue(null);
266         sqlFileName = conf.getChild("sqlFile").getValue();
267         if (!sqlFileName.startsWith("file://")) {
268             throw new ConfigurationException
269                 ("Malformed sqlFile - Must be of the format 'file://<filename>'.");
270         }
271         try {
272             if (filestore != null) {
273                 Store store = (Store)componentManager.lookup(Store.ROLE);
274                 //prepare Configurations for stream repositories
275                 DefaultConfiguration streamConfiguration
276                     = new DefaultConfiguration( "repository",
277                                                 "generated:JDBCMailRepository.configure()" );
278 
279                 streamConfiguration.setAttribute( "destinationURL", filestore );
280                 streamConfiguration.setAttribute( "type", "STREAM" );
281                 streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
282                 sr = (StreamRepository) store.select(streamConfiguration);
283 
284                 if (getLogger().isDebugEnabled()) {
285                     getLogger().debug("Got filestore for JdbcMailRepository: " + filestore);
286                 }
287             }
288 
289             lock = new Lock();
290             if (getLogger().isDebugEnabled()) {
291                 StringBuffer logBuffer =
292                     new StringBuffer(128)
293                             .append(this.getClass().getName())
294                             .append(" created according to ")
295                             .append(destination);
296                 getLogger().debug(logBuffer.toString());
297             }
298         } catch (Exception e) {
299             final String message = "Failed to retrieve Store component:" + e.getMessage();
300             getLogger().error(message, e);
301             throw new ConfigurationException(message, e);
302         }
303     }
304 
305     /***
306      * Initialises the JDBC repository.
307      * 1) Tests the connection to the database.
308      * 2) Loads SQL strings from the SQL definition file,
309      *     choosing the appropriate SQL for this connection,
310      *     and performing paramter substitution,
311      * 3) Initialises the database with the required tables, if necessary.
312      *
313      * @throws Exception if an error occurs
314      */
315     public void initialize() throws Exception {
316         StringBuffer logBuffer = null;
317         if (getLogger().isDebugEnabled()) {
318             getLogger().debug(this.getClass().getName() + ".initialize()");
319         }
320 
321         theJDBCUtil =
322             new JDBCUtil() {
323                 protected void delegatedLog(String logString) {
324                     JDBCMailRepository.this.getLogger().warn("JDBCMailRepository: " + logString);
325                 }
326             };
327         // Get the data-source required.
328         datasource = (DataSourceComponent)datasources.select(datasourceName);
329 
330         // Test the connection to the database, by getting the DatabaseMetaData.
331         Connection conn = datasource.getConnection();
332         PreparedStatement createStatement = null;
333 
334         try {
335             // Initialise the sql strings.
336 
337             File sqlFile = null;
338             try {
339                 sqlFile = AvalonContextUtilities.getFile(context, sqlFileName);
340                 sqlFileName = null;
341             } catch (Exception e) {
342                 getLogger().fatalError(e.getMessage(), e);
343                 throw e;
344             }
345 
346             if (getLogger().isDebugEnabled()) {
347                 logBuffer =
348                     new StringBuffer(128)
349                             .append("Reading SQL resources from file: ")
350                             .append(sqlFile.getAbsolutePath())
351                             .append(", section ")
352                             .append(this.getClass().getName())
353                             .append(".");
354                 getLogger().debug(logBuffer.toString());
355             }
356 
357             // Build the statement parameters
358             Map sqlParameters = new HashMap();
359             if (tableName != null) {
360                 sqlParameters.put("table", tableName);
361             }
362             if (repositoryName != null) {
363                 sqlParameters.put("repository", repositoryName);
364             }
365 
366             sqlQueries = new SqlResources();
367             sqlQueries.init(sqlFile, this.getClass().getName(),
368                             conn, sqlParameters);
369 
370             // Check if the required table exists. If not, create it.
371             DatabaseMetaData dbMetaData = conn.getMetaData();
372             // Need to ask in the case that identifiers are stored, ask the DatabaseMetaInfo.
373             // Try UPPER, lower, and MixedCase, to see if the table is there.
374             if (!(theJDBCUtil.tableExists(dbMetaData, tableName))) {
375                 // Users table doesn't exist - create it.
376                 createStatement =
377                     conn.prepareStatement(sqlQueries.getSqlString("createTable", true));
378                 createStatement.execute();
379 
380                 if (getLogger().isInfoEnabled()) {
381                     logBuffer =
382                         new StringBuffer(64)
383                                 .append("JdbcMailRepository: Created table '")
384                                 .append(tableName)
385                                 .append("'.");
386                     getLogger().info(logBuffer.toString());
387                 }
388             }
389             
390             checkJdbcAttributesSupport(dbMetaData);
391 
392         } finally {
393             theJDBCUtil.closeJDBCStatement(createStatement);
394             theJDBCUtil.closeJDBCConnection(conn);
395         }
396     }
397     
398     /*** Checks whether support for JDBC Mail atributes is activated for this repository
399      * and if everything is consistent.
400      * Looks for both the "updateMessageAttributesSQL" and "retrieveMessageAttributesSQL"
401      * statements in sqlResources and for a table column named "message_attributes".
402      *
403      * @param dbMetaData the database metadata to be used to look up the column
404      * @throws SQLException if a fatal situation is met
405      */
406     protected void checkJdbcAttributesSupport(DatabaseMetaData dbMetaData) throws SQLException {
407         String attributesColumnName = "message_attributes";
408         boolean hasUpdateMessageAttributesSQL = false;
409         boolean hasRetrieveMessageAttributesSQL = false;
410         
411         boolean hasMessageAttributesColumn = theJDBCUtil.columnExists(dbMetaData, tableName, attributesColumnName);
412         
413         StringBuffer logBuffer = new StringBuffer(64)
414                                     .append("JdbcMailRepository '"
415                                             + repositoryName
416                                             + ", table '"
417                                             + tableName
418                                             + "': ");
419         
420         //Determine whether attributes are used and available for storing
421         //Do we have updateMessageAttributesSQL?
422         String updateMessageAttrSql =
423             sqlQueries.getSqlString("updateMessageAttributesSQL", false);
424         if (updateMessageAttrSql!=null) {
425             hasUpdateMessageAttributesSQL = true;
426         }
427         
428         //Determine whether attributes are used and retrieve them
429         //Do we have retrieveAttributesSQL?
430         String retrieveMessageAttrSql =
431             sqlQueries.getSqlString("retrieveMessageAttributesSQL", false);
432         if (retrieveMessageAttrSql!=null) {
433             hasRetrieveMessageAttributesSQL = true;
434         }
435         
436         if (hasUpdateMessageAttributesSQL && !hasRetrieveMessageAttributesSQL) {
437             logBuffer.append("JDBC Mail Attributes support was activated for update but not for retrieval"
438                              + "(found 'updateMessageAttributesSQL' but not 'retrieveMessageAttributesSQL'"
439                              + "in table '"
440                              + tableName
441                              + "').");
442             getLogger().fatalError(logBuffer.toString());
443             throw new SQLException(logBuffer.toString());
444         }
445         if (!hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) {
446             logBuffer.append("JDBC Mail Attributes support was activated for retrieval but not for update"
447                              + "(found 'retrieveMessageAttributesSQL' but not 'updateMessageAttributesSQL'"
448                              + "in table '"
449                              + tableName
450                              + "'.");
451             getLogger().fatalError(logBuffer.toString());
452             throw new SQLException(logBuffer.toString());
453         }
454         if (!hasMessageAttributesColumn
455             && (hasUpdateMessageAttributesSQL || hasRetrieveMessageAttributesSQL)
456             ) {
457                 logBuffer.append("JDBC Mail Attributes support was activated but column '"
458                                  + attributesColumnName
459                                  + "' is missing in table '"
460                                  + tableName
461                                  + "'.");
462                 getLogger().fatalError(logBuffer.toString());
463                 throw new SQLException(logBuffer.toString());
464         }
465         if (hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) {
466             jdbcMailAttributesReady = true;
467             if (getLogger().isInfoEnabled()) {
468                 logBuffer.append("JDBC Mail Attributes support ready.");
469                 getLogger().info(logBuffer.toString());
470             }
471         } else {
472             jdbcMailAttributesReady = false;
473             logBuffer.append("JDBC Mail Attributes support not activated. "
474                              + "Missing both 'updateMessageAttributesSQL' "
475                              + "and 'retrieveMessageAttributesSQL' "
476                              + "statements for table '"
477                              + tableName
478                              + "' in sqlResources.xml. "
479                              + "Will not persist in the repository '"
480                              + repositoryName
481                              + "'.");
482             getLogger().warn(logBuffer.toString());
483         }
484     }
485 
486     /***
487      * Releases a lock on a message identified by a key
488      *
489      * @param key the key of the message to be unlocked
490      *
491      * @return true if successfully released the lock, false otherwise
492      */
493     public boolean unlock(String key) {
494         if (lock.unlock(key)) {
495             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
496                 StringBuffer debugBuffer =
497                     new StringBuffer(256)
498                             .append("Unlocked ")
499                             .append(key)
500                             .append(" for ")
501                             .append(Thread.currentThread().getName())
502                             .append(" @ ")
503                             .append(new java.util.Date(System.currentTimeMillis()));
504                 getLogger().debug(debugBuffer.toString());
505             }
506             return true;
507         } else {
508             return false;
509         }
510     }
511 
512     /***
513      * Obtains a lock on a message identified by a key
514      *
515      * @param key the key of the message to be locked
516      *
517      * @return true if successfully obtained the lock, false otherwise
518      */
519     public boolean lock(String key) {
520         if (lock.lock(key)) {
521             if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
522                 StringBuffer debugBuffer =
523                     new StringBuffer(256)
524                             .append("Locked ")
525                             .append(key)
526                             .append(" for ")
527                             .append(Thread.currentThread().getName())
528                             .append(" @ ")
529                             .append(new java.util.Date(System.currentTimeMillis()));
530                 getLogger().debug(debugBuffer.toString());
531             }
532             return true;
533         } else {
534             return false;
535         }
536     }
537 
538     /***
539      * Store this message to the database.  Optionally stores the message
540      * body to the filesystem and only writes the headers to the database.
541      */
542     public void store(Mail mc) throws MessagingException {
543         Connection conn = null;
544         boolean wasLocked = true;
545         String key = mc.getName();
546         try {
547             synchronized(this) {
548                   wasLocked = lock.isLocked(key);
549     
550                   if (!wasLocked) {
551                       //If it wasn't locked, we want a lock during the store
552                       lock(key);
553                   }
554             }
555             conn = datasource.getConnection();
556 
557             //Need to determine whether need to insert this record, or update it.
558 
559             //Begin a transaction
560             conn.setAutoCommit(false);
561 
562             PreparedStatement checkMessageExists = null;
563             ResultSet rsExists = null;
564             boolean exists = false;
565             try {
566                 checkMessageExists = 
567                     conn.prepareStatement(sqlQueries.getSqlString("checkMessageExistsSQL", true));
568                 checkMessageExists.setString(1, mc.getName());
569                 checkMessageExists.setString(2, repositoryName);
570                 rsExists = checkMessageExists.executeQuery();
571                 exists = rsExists.next() && rsExists.getInt(1) > 0;
572             } finally {
573                 theJDBCUtil.closeJDBCResultSet(rsExists);
574                 theJDBCUtil.closeJDBCStatement(checkMessageExists);
575             }
576 
577             if (exists) {
578                 //Update the existing record
579                 PreparedStatement updateMessage = null;
580 
581                 try {
582                     updateMessage =
583                         conn.prepareStatement(sqlQueries.getSqlString("updateMessageSQL", true));
584                     updateMessage.setString(1, mc.getState());
585                     updateMessage.setString(2, mc.getErrorMessage());
586                     if (mc.getSender() == null) {
587                         updateMessage.setNull(3, java.sql.Types.VARCHAR);
588                     } else {
589                         updateMessage.setString(3, mc.getSender().toString());
590                     }
591                     StringBuffer recipients = new StringBuffer();
592                     for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) {
593                         recipients.append(i.next().toString());
594                         if (i.hasNext()) {
595                             recipients.append("\r\n");
596                         }
597                     }
598                     updateMessage.setString(4, recipients.toString());
599                     updateMessage.setString(5, mc.getRemoteHost());
600                     updateMessage.setString(6, mc.getRemoteAddr());
601                     updateMessage.setTimestamp(7, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
602                     updateMessage.setString(8, mc.getName());
603                     updateMessage.setString(9, repositoryName);
604                     updateMessage.execute();
605                 } finally {
606                     Statement localUpdateMessage = updateMessage;
607                     // Clear reference to statement
608                     updateMessage = null;
609                     theJDBCUtil.closeJDBCStatement(localUpdateMessage);
610                 }
611 
612                 //Determine whether attributes are used and available for storing
613                 if (jdbcMailAttributesReady && mc.hasAttributes()) {
614                     String updateMessageAttrSql =
615                         sqlQueries.getSqlString("updateMessageAttributesSQL", false);
616                     PreparedStatement updateMessageAttr = null;
617                     try {
618                         updateMessageAttr =
619                             conn.prepareStatement(updateMessageAttrSql);
620                         ByteArrayOutputStream baos = new ByteArrayOutputStream();
621                         ObjectOutputStream oos = new ObjectOutputStream(baos);
622                         try {
623                             if (mc instanceof MailImpl) {
624                             oos.writeObject(((MailImpl)mc).getAttributesRaw());
625                             } else {
626                                 HashMap temp = new HashMap();
627                                 for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) {
628                                     String hashKey = (String) i.next();
629                                     temp.put(hashKey,mc.getAttribute(hashKey));
630                                 }
631                                 oos.writeObject(temp);
632                             }
633                             oos.flush();
634                             ByteArrayInputStream attrInputStream =
635                                 new ByteArrayInputStream(baos.toByteArray());
636                             updateMessageAttr.setBinaryStream(1, attrInputStream, baos.size());
637                         } finally {
638                             try {
639                                 if (oos != null) {
640                                     oos.close();
641                                 }
642                             } catch (IOException ioe) {
643                                 getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe);
644                             }
645                         }
646                         updateMessageAttr.setString(2, mc.getName());
647                         updateMessageAttr.setString(3, repositoryName);
648                         updateMessageAttr.execute();
649                     } catch (SQLException sqle) {
650                         getLogger().info("JDBCMailRepository: Trying to update mail attributes failed.",sqle);
651                         
652                     } finally {
653                         theJDBCUtil.closeJDBCStatement(updateMessageAttr);
654                     }
655                 }
656 
657                 //Determine whether the message body has changed, and possibly avoid
658                 //  updating the database.
659                 MimeMessage messageBody = mc.getMessage();
660                 boolean saveBody = false;
661                 // if the message is a CopyOnWrite proxy we check the modified wrapped object.
662                 if (messageBody instanceof MimeMessageCopyOnWriteProxy) {
663                     MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) messageBody;
664                     messageBody = messageCow.getWrappedMessage();
665                 }
666                 if (messageBody instanceof MimeMessageWrapper) {
667                     MimeMessageWrapper message = (MimeMessageWrapper)messageBody;
668                     saveBody = message.isModified();
669                 } else {
670                     saveBody = true;
671                 }
672                 
673                 if (saveBody) {
674                     PreparedStatement updateMessageBody = 
675                         conn.prepareStatement(sqlQueries.getSqlString("updateMessageBodySQL", true));
676                     try {
677                         MessageInputStream is = new MessageInputStream(mc,sr,inMemorySizeLimit);
678                         updateMessageBody.setBinaryStream(1,is,(int) is.getSize());
679                         updateMessageBody.setString(2, mc.getName());
680                         updateMessageBody.setString(3, repositoryName);
681                         updateMessageBody.execute();
682                         
683                     } finally {
684                         theJDBCUtil.closeJDBCStatement(updateMessageBody);
685                     }
686                 }
687                 
688 
689             } else {
690                 //Insert the record into the database
691                 PreparedStatement insertMessage = null;
692                 try {
693                     String insertMessageSQL = sqlQueries.getSqlString("insertMessageSQL", true);
694                     int number_of_parameters = getNumberOfParameters (insertMessageSQL);
695                     insertMessage =
696                         conn.prepareStatement(insertMessageSQL);
697                     insertMessage.setString(1, mc.getName());
698                     insertMessage.setString(2, repositoryName);
699                     insertMessage.setString(3, mc.getState());
700                     insertMessage.setString(4, mc.getErrorMessage());
701                     if (mc.getSender() == null) {
702                         insertMessage.setNull(5, java.sql.Types.VARCHAR);
703                     } else {
704                         insertMessage.setString(5, mc.getSender().toString());
705                     }
706                     StringBuffer recipients = new StringBuffer();
707                     for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) {
708                         recipients.append(i.next().toString());
709                         if (i.hasNext()) {
710                             recipients.append("\r\n");
711                         }
712                     }
713                     insertMessage.setString(6, recipients.toString());
714                     insertMessage.setString(7, mc.getRemoteHost());
715                     insertMessage.setString(8, mc.getRemoteAddr());
716                     insertMessage.setTimestamp(9, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
717 
718                     MessageInputStream is = new MessageInputStream(mc, sr, inMemorySizeLimit);
719 
720                     insertMessage.setBinaryStream(10, is, (int) is.getSize());
721                     
722                     //Store attributes
723                     if (number_of_parameters > 10) {
724                         ByteArrayOutputStream baos = new ByteArrayOutputStream();
725                         ObjectOutputStream oos = new ObjectOutputStream(baos);
726                         try {
727                             if (mc instanceof MailImpl) {
728                             oos.writeObject(((MailImpl)mc).getAttributesRaw());
729                             } else {
730                                 HashMap temp = new HashMap();
731                                 for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) {
732                                     String hashKey = (String) i.next();
733                                     temp.put(hashKey,mc.getAttribute(hashKey));
734                                 }
735                                 oos.writeObject(temp);
736                             }
737                             oos.flush();
738                             ByteArrayInputStream attrInputStream =
739                                 new ByteArrayInputStream(baos.toByteArray());
740                             insertMessage.setBinaryStream(11, attrInputStream, baos.size());
741                         } finally {
742                             try {
743                                 if (oos != null) {
744                                     oos.close();
745                                 }
746                             } catch (IOException ioe) {
747                                 getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe);
748                             }
749                         }                        
750                     }
751                     
752                     insertMessage.execute();
753                 } finally {
754                     theJDBCUtil.closeJDBCStatement(insertMessage);
755                 }
756             }
757 
758 
759             conn.commit();
760             conn.setAutoCommit(true);
761 
762         } catch (Exception e) {
763             getLogger().error("Exception caught while storing mail Container",e);
764             throw new MessagingException("Exception caught while storing mail Container: ",e);
765         } finally {
766             theJDBCUtil.closeJDBCConnection(conn);
767             if (!wasLocked) {
768                 // If it wasn't locked, we need to unlock now
769                 unlock(key);
770                 synchronized (this) {
771                     notify();
772                 }
773             }
774         }
775     }
776 
777     /***
778      * Retrieves a message given a key. At the moment, keys can be obtained
779      * from list()
780      *
781      * @param key the key of the message to retrieve
782      * @return the mail corresponding to this key, null if none exists
783      */
784     public Mail retrieve(String key) throws MessagingException {
785         if (DEEP_DEBUG) {
786             System.err.println("retrieving " + key);
787         }
788         Connection conn = null;
789         PreparedStatement retrieveMessage = null;
790         ResultSet rsMessage = null;
791         try {
792             conn = datasource.getConnection();
793             if (DEEP_DEBUG) {
794                 System.err.println("got a conn " + key);
795             }
796 
797             retrieveMessage =
798                 conn.prepareStatement(sqlQueries.getSqlString("retrieveMessageSQL", true));
799             retrieveMessage.setString(1, key);
800             retrieveMessage.setString(2, repositoryName);
801             rsMessage = retrieveMessage.executeQuery();
802             if (DEEP_DEBUG) {
803                 System.err.println("ran the query " + key);
804             }
805             if (!rsMessage.next()) {
806                 if (getLogger().isDebugEnabled()) {
807                     StringBuffer debugBuffer =
808                         new StringBuffer(64)
809                                 .append("Did not find a record ")
810                                 .append(key)
811                                 .append(" in ")
812                                 .append(repositoryName);
813                     getLogger().debug(debugBuffer.toString());
814                 }
815                 return null;
816             }
817             //Determine whether attributes are used and retrieve them
818             PreparedStatement retrieveMessageAttr = null;
819             HashMap attributes = null;
820             if (jdbcMailAttributesReady) {
821                 String retrieveMessageAttrSql =
822                     sqlQueries.getSqlString("retrieveMessageAttributesSQL", false);
823                 ResultSet rsMessageAttr = null;
824                 try {
825                     retrieveMessageAttr =
826                         conn.prepareStatement(retrieveMessageAttrSql);
827                     
828                     retrieveMessageAttr.setString(1, key);
829                     retrieveMessageAttr.setString(2, repositoryName);
830                     rsMessageAttr = retrieveMessageAttr.executeQuery();
831                     
832                     if (rsMessageAttr.next()) {
833                         try {
834                             byte[] serialized_attr = null;
835                             String getAttributesOption = sqlQueries.getDbOption("getAttributes");
836                             if (getAttributesOption != null && (getAttributesOption.equalsIgnoreCase("useBlob") || getAttributesOption.equalsIgnoreCase("useBinaryStream"))) {
837                                 Blob b = rsMessageAttr.getBlob(1);
838                                 serialized_attr = b.getBytes(1, (int)b.length());
839                             } else {
840                                 serialized_attr = rsMessageAttr.getBytes(1);
841                             }
842                             // this check is for better backwards compatibility
843                             if (serialized_attr != null) {
844                                 ByteArrayInputStream bais = new ByteArrayInputStream(serialized_attr);
845                                 ObjectInputStream ois = new ObjectInputStream(bais);
846                                 attributes = (HashMap)ois.readObject();
847                                 ois.close();
848                             }
849                         } catch (IOException ioe) {
850                             if (getLogger().isDebugEnabled()) {
851                                 StringBuffer debugBuffer =
852                                     new StringBuffer(64)
853                                     .append("Exception reading attributes ")
854                                     .append(key)
855                                     .append(" in ")
856                                     .append(repositoryName);
857                                 getLogger().debug(debugBuffer.toString(), ioe);
858                             }
859                         }
860                     } else {
861                         if (getLogger().isDebugEnabled()) {
862                             StringBuffer debugBuffer =
863                                 new StringBuffer(64)
864                                 .append("Did not find a record (attributes) ")
865                                 .append(key)
866                                 .append(" in ")
867                             .append(repositoryName);
868                             getLogger().debug(debugBuffer.toString());
869                         }
870                     }
871                 } catch (SQLException sqle) {
872                     StringBuffer errorBuffer =  new StringBuffer(256)
873                                                 .append("Error retrieving message")
874                                                 .append(sqle.getMessage())
875                                                 .append(sqle.getErrorCode())
876                                                 .append(sqle.getSQLState())
877                                                 .append(sqle.getNextException());
878                     getLogger().error(errorBuffer.toString());
879                 } finally {
880                     theJDBCUtil.closeJDBCResultSet(rsMessageAttr);
881                     theJDBCUtil.closeJDBCStatement(retrieveMessageAttr);
882                 }
883             }
884 
885             MailImpl mc = new MailImpl();
886             mc.setAttributesRaw (attributes);
887             mc.setName(key);
888             mc.setState(rsMessage.getString(1));
889             mc.setErrorMessage(rsMessage.getString(2));
890             String sender = rsMessage.getString(3);
891             if (sender == null) {
892                 mc.setSender(null);
893             } else {
894                 mc.setSender(new MailAddress(sender));
895             }
896             StringTokenizer st = new StringTokenizer(rsMessage.getString(4), "\r\n", false);
897             Set recipients = new HashSet();
898             while (st.hasMoreTokens()) {
899                 recipients.add(new MailAddress(st.nextToken()));
900             }
901             mc.setRecipients(recipients);
902             mc.setRemoteHost(rsMessage.getString(5));
903             mc.setRemoteAddr(rsMessage.getString(6));
904             mc.setLastUpdated(rsMessage.getTimestamp(7));
905 
906             MimeMessageJDBCSource source = new MimeMessageJDBCSource(this, key, sr);
907             MimeMessageCopyOnWriteProxy message = new MimeMessageCopyOnWriteProxy(source);
908             mc.setMessage(message);
909             return mc;
910         } catch (SQLException sqle) {
911             StringBuffer errorBuffer =  new StringBuffer(256)
912                                         .append("Error retrieving message")
913                                         .append(sqle.getMessage())
914                                         .append(sqle.getErrorCode())
915                                         .append(sqle.getSQLState())
916                                         .append(sqle.getNextException());
917             getLogger().error(errorBuffer.toString());
918             throw new MessagingException("Exception while retrieving mail: " + sqle.getMessage());
919         } catch (Exception me) {
920             throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
921         } finally {
922             theJDBCUtil.closeJDBCResultSet(rsMessage);
923             theJDBCUtil.closeJDBCStatement(retrieveMessage);
924             theJDBCUtil.closeJDBCConnection(conn);
925         }
926     }
927 
928     /***
929      * Removes a specified message
930      *
931      * @param mail the message to be removed from the repository
932      */
933     public void remove(Mail mail) throws MessagingException {
934         remove(mail.getName());
935     }
936 
937     /***
938      * Removes a Collection of mails from the repository
939      * @param mails The Collection of <code>MailImpl</code>'s to delete
940      * @throws MessagingException
941      * @since 2.2.0
942      */
943     public void remove(Collection mails) throws MessagingException {
944         Iterator delList = mails.iterator();
945         while (delList.hasNext()) {
946             remove((Mail)delList.next());
947         }
948     }
949 
950     /***
951      * Removes a message identified by a key.
952      *
953      * @param key the key of the message to be removed from the repository
954      */
955     public void remove(String key) throws MessagingException {
956         //System.err.println("removing " + key);
957         if (lock(key)) {
958             Connection conn = null;
959             PreparedStatement removeMessage = null;
960             try {
961                 conn = datasource.getConnection();
962                 removeMessage =
963                     conn.prepareStatement(sqlQueries.getSqlString("removeMessageSQL", true));
964                 removeMessage.setString(1, key);
965                 removeMessage.setString(2, repositoryName);
966                 removeMessage.execute();
967 
968                 if (sr != null) {
969                     sr.remove(key);
970                 }
971             } catch (Exception me) {
972                 throw new MessagingException("Exception while removing mail: " + me.getMessage());
973             } finally {
974                 theJDBCUtil.closeJDBCStatement(removeMessage);
975                 theJDBCUtil.closeJDBCConnection(conn);
976                 unlock(key);
977             }
978         }
979     }
980 
981     /***
982      * Gets a list of message keys stored in this repository.
983      *
984      * @return an Iterator of the message keys
985      */
986     public Iterator list() throws MessagingException {
987         //System.err.println("listing messages");
988         Connection conn = null;
989         PreparedStatement listMessages = null;
990         ResultSet rsListMessages = null;
991         try {
992             conn = datasource.getConnection();
993             listMessages =
994                 conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
995             listMessages.setString(1, repositoryName);
996             rsListMessages = listMessages.executeQuery();
997 
998             List messageList = new ArrayList();
999             while (rsListMessages.next() && !Thread.currentThread().isInterrupted()) {
1000                 messageList.add(rsListMessages.getString(1));
1001             }
1002             return messageList.iterator();
1003         } catch (Exception me) {
1004             throw new MessagingException("Exception while listing mail: " + me.getMessage());
1005         } finally {
1006             theJDBCUtil.closeJDBCResultSet(rsListMessages);
1007             theJDBCUtil.closeJDBCStatement(listMessages);
1008             theJDBCUtil.closeJDBCConnection(conn);
1009         }
1010     }
1011 
1012     /***
1013      * Gets the SQL connection to be used by this JDBCMailRepository
1014      *
1015      * @return the connection
1016      * @throws SQLException if there is an issue with getting the connection
1017      */
1018     protected Connection getConnection() throws SQLException {
1019         return datasource.getConnection();
1020     }
1021 
1022     /***
1023      * @see java.lang.Object#equals(Object)
1024      */
1025     public boolean equals(Object obj) {
1026         if (!(obj instanceof JDBCMailRepository)) {
1027             return false;
1028         }
1029         // TODO: Figure out whether other instance variables should be part of
1030         // the equals equation
1031         JDBCMailRepository repository = (JDBCMailRepository)obj;
1032         return  ((repository.tableName == tableName) || ((repository.tableName != null) && repository.tableName.equals(tableName))) && 
1033                 ((repository.repositoryName == repositoryName) || ((repository.repositoryName != null) && repository.repositoryName.equals(repositoryName)));
1034     }
1035 
1036     /***
1037      * Provide a hash code that is consistent with equals for this class
1038      *
1039      * @return the hash code
1040      */
1041      public int hashCode() {
1042         int result = 17;
1043         if (tableName != null) {
1044             result = 37 * tableName.hashCode();
1045         }
1046         if (repositoryName != null) {
1047             result = 37 * repositoryName.hashCode();
1048         }
1049         return result;
1050      }
1051 
1052     /***
1053      * This method calculates number of parameters in a prepared statement SQL String.
1054      * It does so by counting the number of '?' in the string 
1055      * @param sqlstring to return parameter count for
1056      * @return number of parameters
1057      **/
1058     private int getNumberOfParameters (String sqlstring) {
1059         //it is alas a java 1.4 feature to be able to call
1060         //getParameterMetaData which could provide us with the parameterCount
1061         char[] chars = sqlstring.toCharArray();
1062         int count = 0;
1063         for (int i = 0; i < chars.length; i++) {
1064             count += chars[i]=='?' ? 1 : 0;
1065         }
1066         return count;
1067     }
1068 }