View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  
22  package org.apache.james.mailrepository;
23  
24  import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
25  import org.apache.avalon.cornerstone.services.store.StreamRepository;
26  import org.apache.avalon.excalibur.datasource.DataSourceComponent;
27  import org.apache.avalon.framework.service.ServiceManager;
28  import org.apache.avalon.framework.service.ServiceException;
29  import org.apache.avalon.framework.configuration.Configuration;
30  import org.apache.avalon.framework.configuration.ConfigurationException;
31  import org.apache.avalon.framework.configuration.DefaultConfiguration;
32  import org.apache.james.core.MailImpl;
33  import org.apache.james.core.MimeMessageCopyOnWriteProxy;
34  import org.apache.james.core.MimeMessageWrapper;
35  import org.apache.james.services.FileSystem;
36  import org.apache.james.util.sql.JDBCUtil;
37  import org.apache.james.util.sql.SqlResources;
38  import org.apache.mailet.Mail;
39  import org.apache.mailet.MailAddress;
40  
41  import javax.mail.MessagingException;
42  import javax.mail.internet.MimeMessage;
43  
44  import java.io.ByteArrayInputStream;
45  import java.io.ByteArrayOutputStream;
46  import java.io.IOException;
47  import java.io.InputStream;
48  import java.io.ObjectOutputStream;
49  import java.io.ObjectInputStream;
50  import java.sql.Blob;
51  import java.sql.Connection;
52  import java.sql.DatabaseMetaData;
53  import java.sql.PreparedStatement;
54  import java.sql.ResultSet;
55  import java.sql.SQLException;
56  import java.sql.Statement;
57  import java.util.ArrayList;
58  import java.util.HashMap;
59  import java.util.HashSet;
60  import java.util.Iterator;
61  import java.util.List;
62  import java.util.Map;
63  import java.util.Set;
64  import java.util.StringTokenizer;
65  
66  /**
67   * Implementation of a MailRepository on a database.
68   *
69   * <p>Requires a configuration element in the .conf.xml file of the form:
70   *  <br>&lt;repository destinationURL="db://&lt;datasource&gt;/&lt;table_name&gt;/&lt;repository_name&gt;"
71   *  <br>            type="MAIL"
72   *  <br>            model="SYNCHRONOUS"/&gt;
73   *  <br>&lt;/repository&gt;
74   * <p>destinationURL specifies..(Serge??)
75   * <br>Type can be SPOOL or MAIL
76   * <br>Model is currently not used and may be dropped
77   *
78   * <p>Requires a logger called MailRepository.
79   *
80   * @version CVS $Revision: 684816 $ $Date: 2008-08-11 18:03:27 +0100 (Mon, 11 Aug 2008) $
81   */
82  public class JDBCMailRepository
83      extends AbstractMailRepository {
84  
85      /**
86       * The table name parsed from the destination URL
87       */
88      protected String tableName;
89  
90      /**
91       * The repository name parsed from the destination URL
92       */
93      protected String repositoryName;
94  
95      /**
96       * The name of the SQL configuration file to be used to configure this repository.
97       */
98      private String sqlFileName;
99  
100     /**
101      * The stream repository used in dbfile mode
102      */
103     private StreamRepository sr = null;
104 
105     /**
106      * The selector used to obtain the JDBC datasource
107      */
108     protected DataSourceSelector datasources;
109 
110     /**
111      * The JDBC datasource that provides the JDBC connection
112      */
113     protected DataSourceComponent datasource;
114 
115     /**
116      * The name of the datasource used by this repository
117      */
118     protected String datasourceName;
119 
120     /**
121      * Contains all of the sql strings for this component.
122      */
123     protected SqlResources sqlQueries;
124 
125     /**
126      * The JDBCUtil helper class
127      */
128     protected JDBCUtil theJDBCUtil;
129     
130     /**
131      * "Support for Mail Attributes under JDBC repositories is ready" indicator.
132      */
133     protected boolean jdbcMailAttributesReady = false;
134 
135     /**
136      * The size threshold for in memory handling of storing operations
137      */
138     private int inMemorySizeLimit;
139 
140     private FileSystem fileSystem;
141 
142     public void setDatasources(DataSourceSelector datasources) {
143         this.datasources = datasources;
144     }
145 
146     /**
147      * @see org.apache.avalon.framework.service.Serviceable#service(ServiceManager)
148      */
149     public void service( final ServiceManager componentManager )
150         throws ServiceException {
151         super.service(componentManager);
152         StringBuffer logBuffer = null;
153         if (getLogger().isDebugEnabled()) {
154             logBuffer =
155                 new StringBuffer(64)
156                         .append(this.getClass().getName())
157                         .append(".compose()");
158             getLogger().debug(logBuffer.toString());
159         }
160         // Get the DataSourceSelector service
161         DataSourceSelector datasources = (DataSourceSelector)componentManager.lookup( DataSourceSelector.ROLE );
162         setDatasources(datasources);
163         setFileSystem((FileSystem) componentManager.lookup(FileSystem.ROLE));
164     }
165 
166     private void setFileSystem(FileSystem fileSystem) {
167         this.fileSystem = fileSystem;
168     }
169 
170     /**
171      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
172      */
173     public void configure(Configuration conf) throws ConfigurationException {
174         if (getLogger().isDebugEnabled()) {
175             getLogger().debug(this.getClass().getName() + ".configure()");
176         }
177 
178         String destination = conf.getAttribute("destinationURL");
179         // normalize the destination, to simplify processing.
180         if ( ! destination.endsWith("/") ) {
181             destination += "/";
182         }
183         // Parse the DestinationURL for the name of the datasource,
184         // the table to use, and the (optional) repository Key.
185         // Split on "/", starting after "db://"
186         List urlParams = new ArrayList();
187         int start = 5;
188         if (destination.startsWith("dbfile")) {
189             //this is dbfile:// instead of db://
190             start += 4;
191         }
192         int end = destination.indexOf('/', start);
193         while ( end > -1 ) {
194             urlParams.add(destination.substring(start, end));
195             start = end + 1;
196             end = destination.indexOf('/', start);
197         }
198 
199         // Build SqlParameters and get datasource name from URL parameters
200         if (urlParams.size() == 0) {
201             StringBuffer exceptionBuffer =
202                 new StringBuffer(256)
203                         .append("Malformed destinationURL - Must be of the format '")
204                         .append("db://<data-source>[/<table>[/<repositoryName>]]'.  Was passed ")
205                         .append(conf.getAttribute("destinationURL"));
206             throw new ConfigurationException(exceptionBuffer.toString());
207         }
208         if (urlParams.size() >= 1) {
209             datasourceName = (String)urlParams.get(0);
210         }
211         if (urlParams.size() >= 2) {
212             tableName = (String)urlParams.get(1);
213         }
214         if (urlParams.size() >= 3) {
215             repositoryName = "";
216             for (int i = 2; i < urlParams.size(); i++) {
217                 if (i >= 3) {
218                     repositoryName += '/';
219                 }
220                 repositoryName += (String)urlParams.get(i);
221             }
222         }
223 
224         if (getLogger().isDebugEnabled()) {
225             StringBuffer logBuffer =
226                 new StringBuffer(128)
227                         .append("Parsed URL: table = '")
228                         .append(tableName)
229                         .append("', repositoryName = '")
230                         .append(repositoryName)
231                         .append("'");
232             getLogger().debug(logBuffer.toString());
233         }
234         
235         inMemorySizeLimit = conf.getChild("inMemorySizeLimit").getValueAsInteger(409600000); 
236 
237         String filestore = conf.getChild("filestore").getValue(null);
238         sqlFileName = conf.getChild("sqlFile").getValue();
239         try {
240             if (filestore != null) {
241                 //prepare Configurations for stream repositories
242                 DefaultConfiguration streamConfiguration
243                     = new DefaultConfiguration( "repository",
244                                                 "generated:JDBCMailRepository.configure()" );
245 
246                 streamConfiguration.setAttribute( "destinationURL", filestore );
247                 streamConfiguration.setAttribute( "type", "STREAM" );
248                 streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
249                 sr = (StreamRepository) store.select(streamConfiguration);
250 
251                 if (getLogger().isDebugEnabled()) {
252                     getLogger().debug("Got filestore for JdbcMailRepository: " + filestore);
253                 }
254             }
255 
256             if (getLogger().isDebugEnabled()) {
257                 StringBuffer logBuffer =
258                     new StringBuffer(128)
259                             .append(this.getClass().getName())
260                             .append(" created according to ")
261                             .append(destination);
262                 getLogger().debug(logBuffer.toString());
263             }
264         } catch (Exception e) {
265             final String message = "Failed to retrieve Store component:" + e.getMessage();
266             getLogger().error(message, e);
267             throw new ConfigurationException(message, e);
268         }
269     }
270 
271     /**
272      * Initialises the JDBC repository.
273      * 1) Tests the connection to the database.
274      * 2) Loads SQL strings from the SQL definition file,
275      *     choosing the appropriate SQL for this connection,
276      *     and performing paramter substitution,
277      * 3) Initialises the database with the required tables, if necessary.
278      *
279      * @throws Exception if an error occurs
280      */
281     public void initialize() throws Exception {
282         super.initialize();
283         StringBuffer logBuffer = null;
284         if (getLogger().isDebugEnabled()) {
285             getLogger().debug(this.getClass().getName() + ".initialize()");
286         }
287 
288         theJDBCUtil =
289             new JDBCUtil() {
290                 protected void delegatedLog(String logString) {
291                     JDBCMailRepository.this.getLogger().warn("JDBCMailRepository: " + logString);
292                 }
293             };
294         // Get the data-source required.
295         datasource = (DataSourceComponent)datasources.select(datasourceName);
296 
297         // Test the connection to the database, by getting the DatabaseMetaData.
298         Connection conn = datasource.getConnection();
299         PreparedStatement createStatement = null;
300 
301         try {
302             // Initialise the sql strings.
303 
304             InputStream sqlFile = null;
305             try {
306                 sqlFile = fileSystem.getResource(sqlFileName);
307             } catch (Exception e) {
308                 getLogger().fatalError(e.getMessage(), e);
309                 throw e;
310             }
311 
312             if (getLogger().isDebugEnabled()) {
313                 logBuffer =
314                     new StringBuffer(128)
315                             .append("Reading SQL resources from file: ")
316                             .append(sqlFileName)
317                             .append(", section ")
318                             .append(this.getClass().getName())
319                             .append(".");
320                 getLogger().debug(logBuffer.toString());
321             }
322 
323             // Build the statement parameters
324             Map sqlParameters = new HashMap();
325             if (tableName != null) {
326                 sqlParameters.put("table", tableName);
327             }
328             if (repositoryName != null) {
329                 sqlParameters.put("repository", repositoryName);
330             }
331 
332             sqlQueries = new SqlResources();
333             sqlQueries.init(sqlFile, this.getClass().getName(),
334                             conn, sqlParameters);
335 
336             // Check if the required table exists. If not, create it.
337             DatabaseMetaData dbMetaData = conn.getMetaData();
338             // Need to ask in the case that identifiers are stored, ask the DatabaseMetaInfo.
339             // Try UPPER, lower, and MixedCase, to see if the table is there.
340             if (!(theJDBCUtil.tableExists(dbMetaData, tableName))) {
341                 // Users table doesn't exist - create it.
342                 createStatement =
343                     conn.prepareStatement(sqlQueries.getSqlString("createTable", true));
344                 createStatement.execute();
345 
346                 if (getLogger().isInfoEnabled()) {
347                     logBuffer =
348                         new StringBuffer(64)
349                                 .append("JdbcMailRepository: Created table '")
350                                 .append(tableName)
351                                 .append("'.");
352                     getLogger().info(logBuffer.toString());
353                 }
354             }
355             
356             checkJdbcAttributesSupport(dbMetaData);
357 
358         } finally {
359             theJDBCUtil.closeJDBCStatement(createStatement);
360             theJDBCUtil.closeJDBCConnection(conn);
361         }
362     }
363     
364     /** Checks whether support for JDBC Mail atributes is activated for this repository
365      * and if everything is consistent.
366      * Looks for both the "updateMessageAttributesSQL" and "retrieveMessageAttributesSQL"
367      * statements in sqlResources and for a table column named "message_attributes".
368      *
369      * @param dbMetaData the database metadata to be used to look up the column
370      * @throws SQLException if a fatal situation is met
371      */
372     protected void checkJdbcAttributesSupport(DatabaseMetaData dbMetaData) throws SQLException {
373         String attributesColumnName = "message_attributes";
374         boolean hasUpdateMessageAttributesSQL = false;
375         boolean hasRetrieveMessageAttributesSQL = false;
376         
377         boolean hasMessageAttributesColumn = theJDBCUtil.columnExists(dbMetaData, tableName, attributesColumnName);
378         
379         StringBuffer logBuffer = new StringBuffer(64)
380                                     .append("JdbcMailRepository '"
381                                             + repositoryName
382                                             + ", table '"
383                                             + tableName
384                                             + "': ");
385         
386         //Determine whether attributes are used and available for storing
387         //Do we have updateMessageAttributesSQL?
388         String updateMessageAttrSql =
389             sqlQueries.getSqlString("updateMessageAttributesSQL", false);
390         if (updateMessageAttrSql!=null) {
391             hasUpdateMessageAttributesSQL = true;
392         }
393         
394         //Determine whether attributes are used and retrieve them
395         //Do we have retrieveAttributesSQL?
396         String retrieveMessageAttrSql =
397             sqlQueries.getSqlString("retrieveMessageAttributesSQL", false);
398         if (retrieveMessageAttrSql!=null) {
399             hasRetrieveMessageAttributesSQL = true;
400         }
401         
402         if (hasUpdateMessageAttributesSQL && !hasRetrieveMessageAttributesSQL) {
403             logBuffer.append("JDBC Mail Attributes support was activated for update but not for retrieval"
404                              + "(found 'updateMessageAttributesSQL' but not 'retrieveMessageAttributesSQL'"
405                              + "in table '"
406                              + tableName
407                              + "').");
408             getLogger().fatalError(logBuffer.toString());
409             throw new SQLException(logBuffer.toString());
410         }
411         if (!hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) {
412             logBuffer.append("JDBC Mail Attributes support was activated for retrieval but not for update"
413                              + "(found 'retrieveMessageAttributesSQL' but not 'updateMessageAttributesSQL'"
414                              + "in table '"
415                              + tableName
416                              + "'.");
417             getLogger().fatalError(logBuffer.toString());
418             throw new SQLException(logBuffer.toString());
419         }
420         if (!hasMessageAttributesColumn
421             && (hasUpdateMessageAttributesSQL || hasRetrieveMessageAttributesSQL)
422             ) {
423                 logBuffer.append("JDBC Mail Attributes support was activated but column '"
424                                  + attributesColumnName
425                                  + "' is missing in table '"
426                                  + tableName
427                                  + "'.");
428                 getLogger().fatalError(logBuffer.toString());
429                 throw new SQLException(logBuffer.toString());
430         }
431         if (hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) {
432             jdbcMailAttributesReady = true;
433             if (getLogger().isInfoEnabled()) {
434                 logBuffer.append("JDBC Mail Attributes support ready.");
435                 getLogger().info(logBuffer.toString());
436             }
437         } else {
438             jdbcMailAttributesReady = false;
439             logBuffer.append("JDBC Mail Attributes support not activated. "
440                              + "Missing both 'updateMessageAttributesSQL' "
441                              + "and 'retrieveMessageAttributesSQL' "
442                              + "statements for table '"
443                              + tableName
444                              + "' in sqlResources.xml. "
445                              + "Will not persist in the repository '"
446                              + repositoryName
447                              + "'.");
448             getLogger().warn(logBuffer.toString());
449         }
450     }
451 
452 
453     /**
454      * @see org.apache.james.mailrepository.AbstractMailRepository#internalStore(Mail)
455      */
456     protected void internalStore(Mail mc) throws IOException, MessagingException {
457         Connection conn = null;
458         try {
459             conn = datasource.getConnection();
460             //Need to determine whether need to insert this record, or update it.
461     
462             //Begin a transaction
463             conn.setAutoCommit(false);
464     
465             PreparedStatement checkMessageExists = null;
466             ResultSet rsExists = null;
467             boolean exists = false;
468             try {
469                 checkMessageExists = 
470                     conn.prepareStatement(sqlQueries.getSqlString("checkMessageExistsSQL", true));
471                 checkMessageExists.setString(1, mc.getName());
472                 checkMessageExists.setString(2, repositoryName);
473                 rsExists = checkMessageExists.executeQuery();
474                 exists = rsExists.next() && rsExists.getInt(1) > 0;
475             } finally {
476                 theJDBCUtil.closeJDBCResultSet(rsExists);
477                 theJDBCUtil.closeJDBCStatement(checkMessageExists);
478             }
479     
480             if (exists) {
481                 //Update the existing record
482                 PreparedStatement updateMessage = null;
483     
484                 try {
485                     updateMessage =
486                         conn.prepareStatement(sqlQueries.getSqlString("updateMessageSQL", true));
487                     updateMessage.setString(1, mc.getState());
488                     updateMessage.setString(2, mc.getErrorMessage());
489                     if (mc.getSender() == null) {
490                         updateMessage.setNull(3, java.sql.Types.VARCHAR);
491                     } else {
492                         updateMessage.setString(3, mc.getSender().toString());
493                     }
494                     StringBuffer recipients = new StringBuffer();
495                     for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) {
496                         recipients.append(i.next().toString());
497                         if (i.hasNext()) {
498                             recipients.append("\r\n");
499                         }
500                     }
501                     updateMessage.setString(4, recipients.toString());
502                     updateMessage.setString(5, mc.getRemoteHost());
503                     updateMessage.setString(6, mc.getRemoteAddr());
504                     updateMessage.setTimestamp(7, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
505                     updateMessage.setString(8, mc.getName());
506                     updateMessage.setString(9, repositoryName);
507                     updateMessage.execute();
508                 } finally {
509                     Statement localUpdateMessage = updateMessage;
510                     // Clear reference to statement
511                     updateMessage = null;
512                     theJDBCUtil.closeJDBCStatement(localUpdateMessage);
513                 }
514     
515                 //Determine whether attributes are used and available for storing
516                 if (jdbcMailAttributesReady && mc.hasAttributes()) {
517                     String updateMessageAttrSql =
518                         sqlQueries.getSqlString("updateMessageAttributesSQL", false);
519                     PreparedStatement updateMessageAttr = null;
520                     try {
521                         updateMessageAttr =
522                             conn.prepareStatement(updateMessageAttrSql);
523                         ByteArrayOutputStream baos = new ByteArrayOutputStream();
524                         ObjectOutputStream oos = new ObjectOutputStream(baos);
525                         try {
526                             if (mc instanceof MailImpl) {
527                             oos.writeObject(((MailImpl)mc).getAttributesRaw());
528                             } else {
529                                 HashMap temp = new HashMap();
530                                 for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) {
531                                     String hashKey = (String) i.next();
532                                     temp.put(hashKey,mc.getAttribute(hashKey));
533                                 }
534                                 oos.writeObject(temp);
535                             }
536                             oos.flush();
537                             ByteArrayInputStream attrInputStream =
538                                 new ByteArrayInputStream(baos.toByteArray());
539                             updateMessageAttr.setBinaryStream(1, attrInputStream, baos.size());
540                         } finally {
541                             try {
542                                 if (oos != null) {
543                                     oos.close();
544                                 }
545                             } catch (IOException ioe) {
546                                 getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe);
547                             }
548                         }
549                         updateMessageAttr.setString(2, mc.getName());
550                         updateMessageAttr.setString(3, repositoryName);
551                         updateMessageAttr.execute();
552                     } catch (SQLException sqle) {
553                         getLogger().info("JDBCMailRepository: Trying to update mail attributes failed.",sqle);
554                         
555                     } finally {
556                         theJDBCUtil.closeJDBCStatement(updateMessageAttr);
557                     }
558                 }
559     
560                 //Determine whether the message body has changed, and possibly avoid
561                 //  updating the database.
562                 MimeMessage messageBody = mc.getMessage();
563                 boolean saveBody = false;
564                 // if the message is a CopyOnWrite proxy we check the modified wrapped object.
565                 if (messageBody instanceof MimeMessageCopyOnWriteProxy) {
566                     MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) messageBody;
567                     messageBody = messageCow.getWrappedMessage();
568                 }
569                 if (messageBody instanceof MimeMessageWrapper) {
570                     MimeMessageWrapper message = (MimeMessageWrapper)messageBody;
571                     saveBody = message.isModified();
572                 } else {
573                     saveBody = true;
574                 }
575                 
576                 if (saveBody) {
577                     PreparedStatement updateMessageBody = 
578                         conn.prepareStatement(sqlQueries.getSqlString("updateMessageBodySQL", true));
579                     try {
580                         MessageInputStream is = new MessageInputStream(mc,sr,inMemorySizeLimit);
581                         updateMessageBody.setBinaryStream(1,is,(int) is.getSize());
582                         updateMessageBody.setString(2, mc.getName());
583                         updateMessageBody.setString(3, repositoryName);
584                         updateMessageBody.execute();
585                         
586                     } finally {
587                         theJDBCUtil.closeJDBCStatement(updateMessageBody);
588                     }
589                 }
590                 
591     
592             } else {
593                 //Insert the record into the database
594                 PreparedStatement insertMessage = null;
595                 try {
596                     String insertMessageSQL = sqlQueries.getSqlString("insertMessageSQL", true);
597                     int number_of_parameters = getNumberOfParameters (insertMessageSQL);
598                     insertMessage =
599                         conn.prepareStatement(insertMessageSQL);
600                     insertMessage.setString(1, mc.getName());
601                     insertMessage.setString(2, repositoryName);
602                     insertMessage.setString(3, mc.getState());
603                     insertMessage.setString(4, mc.getErrorMessage());
604                     if (mc.getSender() == null) {
605                         insertMessage.setNull(5, java.sql.Types.VARCHAR);
606                     } else {
607                         insertMessage.setString(5, mc.getSender().toString());
608                     }
609                     StringBuffer recipients = new StringBuffer();
610                     for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) {
611                         recipients.append(i.next().toString());
612                         if (i.hasNext()) {
613                             recipients.append("\r\n");
614                         }
615                     }
616                     insertMessage.setString(6, recipients.toString());
617                     insertMessage.setString(7, mc.getRemoteHost());
618                     insertMessage.setString(8, mc.getRemoteAddr());
619                     insertMessage.setTimestamp(9, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
620     
621                     MessageInputStream is = new MessageInputStream(mc, sr, inMemorySizeLimit);
622     
623                     insertMessage.setBinaryStream(10, is, (int) is.getSize());
624                     
625                     //Store attributes
626                     if (number_of_parameters > 10) {
627                         ByteArrayOutputStream baos = new ByteArrayOutputStream();
628                         ObjectOutputStream oos = new ObjectOutputStream(baos);
629                         try {
630                             if (mc instanceof MailImpl) {
631                             oos.writeObject(((MailImpl)mc).getAttributesRaw());
632                             } else {
633                                 HashMap temp = new HashMap();
634                                 for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) {
635                                     String hashKey = (String) i.next();
636                                     temp.put(hashKey,mc.getAttribute(hashKey));
637                                 }
638                                 oos.writeObject(temp);
639                             }
640                             oos.flush();
641                             ByteArrayInputStream attrInputStream =
642                                 new ByteArrayInputStream(baos.toByteArray());
643                             insertMessage.setBinaryStream(11, attrInputStream, baos.size());
644                         } finally {
645                             try {
646                                 if (oos != null) {
647                                     oos.close();
648                                 }
649                             } catch (IOException ioe) {
650                                 getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe);
651                             }
652                         }                        
653                     }
654                     
655                     insertMessage.execute();
656                 } finally {
657                     theJDBCUtil.closeJDBCStatement(insertMessage);
658                 }
659             }
660     
661     
662             conn.commit();
663             conn.setAutoCommit(true);
664         } catch (SQLException e) {
665             getLogger().debug("Failed to store internal mail", e);
666             throw new IOException(e.getMessage());
667         } finally {
668             theJDBCUtil.closeJDBCConnection(conn);
669         }
670     }
671 
672 
673     /**
674      * @see org.apache.james.services.MailRepository#retrieve(String)
675      */
676     public Mail retrieve(String key) throws MessagingException {
677         if (DEEP_DEBUG) {
678             System.err.println("retrieving " + key);
679         }
680         Connection conn = null;
681         PreparedStatement retrieveMessage = null;
682         ResultSet rsMessage = null;
683         try {
684             conn = datasource.getConnection();
685             if (DEEP_DEBUG) {
686                 System.err.println("got a conn " + key);
687             }
688 
689             retrieveMessage =
690                 conn.prepareStatement(sqlQueries.getSqlString("retrieveMessageSQL", true));
691             retrieveMessage.setString(1, key);
692             retrieveMessage.setString(2, repositoryName);
693             rsMessage = retrieveMessage.executeQuery();
694             if (DEEP_DEBUG) {
695                 System.err.println("ran the query " + key);
696             }
697             if (!rsMessage.next()) {
698                 if (getLogger().isDebugEnabled()) {
699                     StringBuffer debugBuffer =
700                         new StringBuffer(64)
701                                 .append("Did not find a record ")
702                                 .append(key)
703                                 .append(" in ")
704                                 .append(repositoryName);
705                     getLogger().debug(debugBuffer.toString());
706                 }
707                 return null;
708             }
709             //Determine whether attributes are used and retrieve them
710             PreparedStatement retrieveMessageAttr = null;
711             HashMap attributes = null;
712             if (jdbcMailAttributesReady) {
713                 String retrieveMessageAttrSql =
714                     sqlQueries.getSqlString("retrieveMessageAttributesSQL", false);
715                 ResultSet rsMessageAttr = null;
716                 try {
717                     retrieveMessageAttr =
718                         conn.prepareStatement(retrieveMessageAttrSql);
719                     
720                     retrieveMessageAttr.setString(1, key);
721                     retrieveMessageAttr.setString(2, repositoryName);
722                     rsMessageAttr = retrieveMessageAttr.executeQuery();
723                     
724                     if (rsMessageAttr.next()) {
725                         try {
726                             byte[] serialized_attr = null;
727                             String getAttributesOption = sqlQueries.getDbOption("getAttributes");
728                             if (getAttributesOption != null && (getAttributesOption.equalsIgnoreCase("useBlob") || getAttributesOption.equalsIgnoreCase("useBinaryStream"))) {
729                                 Blob b = rsMessageAttr.getBlob(1);
730                                 serialized_attr = b.getBytes(1, (int)b.length());
731                             } else {
732                                 serialized_attr = rsMessageAttr.getBytes(1);
733                             }
734                             // this check is for better backwards compatibility
735                             if (serialized_attr != null) {
736                                 ByteArrayInputStream bais = new ByteArrayInputStream(serialized_attr);
737                                 ObjectInputStream ois = new ObjectInputStream(bais);
738                                 attributes = (HashMap)ois.readObject();
739                                 ois.close();
740                             }
741                         } catch (IOException ioe) {
742                             if (getLogger().isDebugEnabled()) {
743                                 StringBuffer debugBuffer =
744                                     new StringBuffer(64)
745                                     .append("Exception reading attributes ")
746                                     .append(key)
747                                     .append(" in ")
748                                     .append(repositoryName);
749                                 getLogger().debug(debugBuffer.toString(), ioe);
750                             }
751                         }
752                     } else {
753                         if (getLogger().isDebugEnabled()) {
754                             StringBuffer debugBuffer =
755                                 new StringBuffer(64)
756                                 .append("Did not find a record (attributes) ")
757                                 .append(key)
758                                 .append(" in ")
759                             .append(repositoryName);
760                             getLogger().debug(debugBuffer.toString());
761                         }
762                     }
763                 } catch (SQLException sqle) {
764                     StringBuffer errorBuffer =  new StringBuffer(256)
765                                                 .append("Error retrieving message")
766                                                 .append(sqle.getMessage())
767                                                 .append(sqle.getErrorCode())
768                                                 .append(sqle.getSQLState())
769                                                 .append(sqle.getNextException());
770                     getLogger().error(errorBuffer.toString());
771                 } finally {
772                     theJDBCUtil.closeJDBCResultSet(rsMessageAttr);
773                     theJDBCUtil.closeJDBCStatement(retrieveMessageAttr);
774                 }
775             }
776 
777             MailImpl mc = new MailImpl();
778             mc.setAttributesRaw (attributes);
779             mc.setName(key);
780             mc.setState(rsMessage.getString(1));
781             mc.setErrorMessage(rsMessage.getString(2));
782             String sender = rsMessage.getString(3);
783             if (sender == null) {
784                 mc.setSender(null);
785             } else {
786                 mc.setSender(new MailAddress(sender));
787             }
788             StringTokenizer st = new StringTokenizer(rsMessage.getString(4), "\r\n", false);
789             Set recipients = new HashSet();
790             while (st.hasMoreTokens()) {
791                 recipients.add(new MailAddress(st.nextToken()));
792             }
793             mc.setRecipients(recipients);
794             mc.setRemoteHost(rsMessage.getString(5));
795             mc.setRemoteAddr(rsMessage.getString(6));
796             mc.setLastUpdated(rsMessage.getTimestamp(7));
797 
798             MimeMessageJDBCSource source = new MimeMessageJDBCSource(this, key, sr);
799             MimeMessageCopyOnWriteProxy message = new MimeMessageCopyOnWriteProxy(source);
800             mc.setMessage(message);
801             return mc;
802         } catch (SQLException sqle) {
803             StringBuffer errorBuffer =  new StringBuffer(256)
804                                         .append("Error retrieving message")
805                                         .append(sqle.getMessage())
806                                         .append(sqle.getErrorCode())
807                                         .append(sqle.getSQLState())
808                                         .append(sqle.getNextException());
809             getLogger().error(errorBuffer.toString());
810             getLogger().debug("Failed to retrieve mail", sqle);
811             throw new MessagingException("Exception while retrieving mail: " + sqle.getMessage(), sqle);
812         } catch (Exception me) {
813             throw new MessagingException("Exception while retrieving mail: " + me.getMessage(), me);
814         } finally {
815             theJDBCUtil.closeJDBCResultSet(rsMessage);
816             theJDBCUtil.closeJDBCStatement(retrieveMessage);
817             theJDBCUtil.closeJDBCConnection(conn);
818         }
819     }
820 
821     /**
822      * @see org.apache.james.mailrepository.AbstractMailRepository#internalRemove(String)
823      */
824     protected void internalRemove(String key) throws MessagingException {
825         Connection conn = null;
826         PreparedStatement removeMessage = null;
827         try {
828             conn = datasource.getConnection();
829             removeMessage =
830                 conn.prepareStatement(sqlQueries.getSqlString("removeMessageSQL", true));
831             removeMessage.setString(1, key);
832             removeMessage.setString(2, repositoryName);
833             removeMessage.execute();
834 
835             if (sr != null) {
836                 sr.remove(key);
837             }
838         } catch (Exception me) {
839             throw new MessagingException("Exception while removing mail: " + me.getMessage(), me);
840         } finally {
841             theJDBCUtil.closeJDBCStatement(removeMessage);
842             theJDBCUtil.closeJDBCConnection(conn);
843         }
844     }
845 
846     /**
847      * @see org.apache.james.services.MailRepository#list()
848      */
849     public Iterator list() throws MessagingException {
850         //System.err.println("listing messages");
851         Connection conn = null;
852         PreparedStatement listMessages = null;
853         ResultSet rsListMessages = null;
854         try {
855             conn = datasource.getConnection();
856             listMessages =
857                 conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
858             listMessages.setString(1, repositoryName);
859             rsListMessages = listMessages.executeQuery();
860 
861             List messageList = new ArrayList();
862             while (rsListMessages.next() && !Thread.currentThread().isInterrupted()) {
863                 messageList.add(rsListMessages.getString(1));
864             }
865             return messageList.iterator();
866         } catch (Exception me) {
867             throw new MessagingException("Exception while listing mail: " + me.getMessage(), me);
868         } finally {
869             theJDBCUtil.closeJDBCResultSet(rsListMessages);
870             theJDBCUtil.closeJDBCStatement(listMessages);
871             theJDBCUtil.closeJDBCConnection(conn);
872         }
873     }
874 
875     /**
876      * Gets the SQL connection to be used by this JDBCMailRepository
877      *
878      * @return the connection
879      * @throws SQLException if there is an issue with getting the connection
880      */
881     protected Connection getConnection() throws SQLException {
882         return datasource.getConnection();
883     }
884 
885     /**
886      * @see java.lang.Object#equals(Object)
887      */
888     public boolean equals(Object obj) {
889         if (!(obj instanceof JDBCMailRepository)) {
890             return false;
891         }
892         // TODO: Figure out whether other instance variables should be part of
893         // the equals equation
894         JDBCMailRepository repository = (JDBCMailRepository)obj;
895         return  ((repository.tableName == tableName) || ((repository.tableName != null) && repository.tableName.equals(tableName))) && 
896                 ((repository.repositoryName == repositoryName) || ((repository.repositoryName != null) && repository.repositoryName.equals(repositoryName)));
897     }
898 
899     /**
900      * Provide a hash code that is consistent with equals for this class
901      *
902      * @return the hash code
903      */
904      public int hashCode() {
905         int result = 17;
906         if (tableName != null) {
907             result = 37 * tableName.hashCode();
908         }
909         if (repositoryName != null) {
910             result = 37 * repositoryName.hashCode();
911         }
912         return result;
913      }
914 
915     /**
916      * This method calculates number of parameters in a prepared statement SQL String.
917      * It does so by counting the number of '?' in the string 
918      * @param sqlstring to return parameter count for
919      * @return number of parameters
920      **/
921     private int getNumberOfParameters (String sqlstring) {
922         //it is alas a java 1.4 feature to be able to call
923         //getParameterMetaData which could provide us with the parameterCount
924         char[] chars = sqlstring.toCharArray();
925         int count = 0;
926         for (int i = 0; i < chars.length; i++) {
927             count += chars[i]=='?' ? 1 : 0;
928         }
929         return count;
930     }
931 }