1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.mailrepository;
21
22 import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
23 import org.apache.avalon.cornerstone.services.store.Store;
24 import org.apache.avalon.cornerstone.services.store.StreamRepository;
25 import org.apache.avalon.excalibur.datasource.DataSourceComponent;
26 import org.apache.avalon.framework.activity.Initializable;
27 import org.apache.avalon.framework.service.Serviceable;
28 import org.apache.avalon.framework.service.ServiceManager;
29 import org.apache.avalon.framework.service.ServiceException;
30 import org.apache.avalon.framework.configuration.Configurable;
31 import org.apache.avalon.framework.configuration.Configuration;
32 import org.apache.avalon.framework.configuration.ConfigurationException;
33 import org.apache.avalon.framework.configuration.DefaultConfiguration;
34 import org.apache.avalon.framework.context.Context;
35 import org.apache.avalon.framework.context.ContextException;
36 import org.apache.avalon.framework.context.Contextualizable;
37 import org.apache.avalon.framework.logger.AbstractLogEnabled;
38 import org.apache.james.context.AvalonContextUtilities;
39 import org.apache.james.core.MailImpl;
40 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
41 import org.apache.james.core.MimeMessageWrapper;
42 import org.apache.james.services.MailRepository;
43 import org.apache.james.util.JDBCUtil;
44 import org.apache.james.util.Lock;
45 import org.apache.james.util.SqlResources;
46 import org.apache.mailet.Mail;
47 import org.apache.mailet.MailAddress;
48
49 import javax.mail.MessagingException;
50 import javax.mail.internet.MimeMessage;
51
52 import java.io.ByteArrayInputStream;
53 import java.io.ByteArrayOutputStream;
54 import java.io.File;
55 import java.io.IOException;
56 import java.io.ObjectOutputStream;
57 import java.io.ObjectInputStream;
58 import java.sql.Blob;
59 import java.sql.Connection;
60 import java.sql.DatabaseMetaData;
61 import java.sql.PreparedStatement;
62 import java.sql.ResultSet;
63 import java.sql.SQLException;
64 import java.sql.Statement;
65 import java.util.ArrayList;
66 import java.util.Collection;
67 import java.util.HashMap;
68 import java.util.HashSet;
69 import java.util.Iterator;
70 import java.util.List;
71 import java.util.Map;
72 import java.util.Set;
73 import java.util.StringTokenizer;
74
75 /***
76 * Implementation of a MailRepository on a database.
77 *
78 * <p>Requires a configuration element in the .conf.xml file of the form:
79 * <br><repository destinationURL="db://<datasource>/<table_name>/<repository_name>"
80 * <br> type="MAIL"
81 * <br> model="SYNCHRONOUS"/>
82 * <br></repository>
83 * <p>destinationURL specifies..(Serge??)
84 * <br>Type can be SPOOL or MAIL
85 * <br>Model is currently not used and may be dropped
86 *
87 * <p>Requires a logger called MailRepository.
88 *
89 * @version CVS $Revision: 494012 $ $Date: 2007-01-08 10:23:58 +0000 (Mon, 08 Jan 2007) $
90 */
91 public class JDBCMailRepository
92 extends AbstractLogEnabled
93 implements MailRepository, Contextualizable, Serviceable, Configurable, Initializable {
94
95 /***
96 * Whether 'deep debugging' is turned on.
97 */
98 private static final boolean DEEP_DEBUG = false;
99
100 /***
101 * The Avalon componentManager used by the instance
102 */
103 private ServiceManager componentManager;
104
105 /***
106 * The Avalon context used by the instance
107 */
108 protected Context context;
109
110 /***
111 * A lock used to control access to repository elements, locking access
112 * based on the key
113 */
114 private Lock lock;
115
116 /***
117 * The table name parsed from the destination URL
118 */
119 protected String tableName;
120
121 /***
122 * The repository name parsed from the destination URL
123 */
124 protected String repositoryName;
125
126 /***
127 * The name of the SQL configuration file to be used to configure this repository.
128 */
129 private String sqlFileName;
130
131 /***
132 * The stream repository used in dbfile mode
133 */
134 private StreamRepository sr = null;
135
136 /***
137 * The selector used to obtain the JDBC datasource
138 */
139 protected DataSourceSelector datasources;
140
141 /***
142 * The JDBC datasource that provides the JDBC connection
143 */
144 protected DataSourceComponent datasource;
145
146 /***
147 * The name of the datasource used by this repository
148 */
149 protected String datasourceName;
150
151 /***
152 * Contains all of the sql strings for this component.
153 */
154 protected SqlResources sqlQueries;
155
156 /***
157 * The JDBCUtil helper class
158 */
159 protected JDBCUtil theJDBCUtil;
160
161 /***
162 * "Support for Mail Attributes under JDBC repositories is ready" indicator.
163 */
164 protected boolean jdbcMailAttributesReady = false;
165
166 /***
167 * The size threshold for in memory handling of storing operations
168 */
169 private int inMemorySizeLimit;
170
171 /***
172 * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
173 */
174 public void contextualize(final Context context)
175 throws ContextException {
176 this.context = context;
177 }
178
179 /***
180 * @see org.apache.avalon.framework.service.Servicable#service(ServiceManager)
181 */
182 public void service( final ServiceManager componentManager )
183 throws ServiceException {
184 StringBuffer logBuffer = null;
185 if (getLogger().isDebugEnabled()) {
186 logBuffer =
187 new StringBuffer(64)
188 .append(this.getClass().getName())
189 .append(".compose()");
190 getLogger().debug(logBuffer.toString());
191 }
192
193 datasources = (DataSourceSelector)componentManager.lookup( DataSourceSelector.ROLE );
194 this.componentManager = componentManager;
195
196 }
197
198 /***
199 * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
200 */
201 public void configure(Configuration conf) throws ConfigurationException {
202 if (getLogger().isDebugEnabled()) {
203 getLogger().debug(this.getClass().getName() + ".configure()");
204 }
205
206 String destination = conf.getAttribute("destinationURL");
207
208 if ( ! destination.endsWith("/") ) {
209 destination += "/";
210 }
211
212
213
214 List urlParams = new ArrayList();
215 int start = 5;
216 if (destination.startsWith("dbfile")) {
217
218 start += 4;
219 }
220 int end = destination.indexOf('/', start);
221 while ( end > -1 ) {
222 urlParams.add(destination.substring(start, end));
223 start = end + 1;
224 end = destination.indexOf('/', start);
225 }
226
227
228 if (urlParams.size() == 0) {
229 StringBuffer exceptionBuffer =
230 new StringBuffer(256)
231 .append("Malformed destinationURL - Must be of the format '")
232 .append("db://<data-source>[/<table>[/<repositoryName>]]'. Was passed ")
233 .append(conf.getAttribute("destinationURL"));
234 throw new ConfigurationException(exceptionBuffer.toString());
235 }
236 if (urlParams.size() >= 1) {
237 datasourceName = (String)urlParams.get(0);
238 }
239 if (urlParams.size() >= 2) {
240 tableName = (String)urlParams.get(1);
241 }
242 if (urlParams.size() >= 3) {
243 repositoryName = "";
244 for (int i = 2; i < urlParams.size(); i++) {
245 if (i >= 3) {
246 repositoryName += '/';
247 }
248 repositoryName += (String)urlParams.get(i);
249 }
250 }
251
252 if (getLogger().isDebugEnabled()) {
253 StringBuffer logBuffer =
254 new StringBuffer(128)
255 .append("Parsed URL: table = '")
256 .append(tableName)
257 .append("', repositoryName = '")
258 .append(repositoryName)
259 .append("'");
260 getLogger().debug(logBuffer.toString());
261 }
262
263 inMemorySizeLimit = conf.getChild("inMemorySizeLimit").getValueAsInteger(409600000);
264
265 String filestore = conf.getChild("filestore").getValue(null);
266 sqlFileName = conf.getChild("sqlFile").getValue();
267 if (!sqlFileName.startsWith("file://")) {
268 throw new ConfigurationException
269 ("Malformed sqlFile - Must be of the format 'file://<filename>'.");
270 }
271 try {
272 if (filestore != null) {
273 Store store = (Store)componentManager.lookup(Store.ROLE);
274
275 DefaultConfiguration streamConfiguration
276 = new DefaultConfiguration( "repository",
277 "generated:JDBCMailRepository.configure()" );
278
279 streamConfiguration.setAttribute( "destinationURL", filestore );
280 streamConfiguration.setAttribute( "type", "STREAM" );
281 streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
282 sr = (StreamRepository) store.select(streamConfiguration);
283
284 if (getLogger().isDebugEnabled()) {
285 getLogger().debug("Got filestore for JdbcMailRepository: " + filestore);
286 }
287 }
288
289 lock = new Lock();
290 if (getLogger().isDebugEnabled()) {
291 StringBuffer logBuffer =
292 new StringBuffer(128)
293 .append(this.getClass().getName())
294 .append(" created according to ")
295 .append(destination);
296 getLogger().debug(logBuffer.toString());
297 }
298 } catch (Exception e) {
299 final String message = "Failed to retrieve Store component:" + e.getMessage();
300 getLogger().error(message, e);
301 throw new ConfigurationException(message, e);
302 }
303 }
304
305 /***
306 * Initialises the JDBC repository.
307 * 1) Tests the connection to the database.
308 * 2) Loads SQL strings from the SQL definition file,
309 * choosing the appropriate SQL for this connection,
310 * and performing paramter substitution,
311 * 3) Initialises the database with the required tables, if necessary.
312 *
313 * @throws Exception if an error occurs
314 */
315 public void initialize() throws Exception {
316 StringBuffer logBuffer = null;
317 if (getLogger().isDebugEnabled()) {
318 getLogger().debug(this.getClass().getName() + ".initialize()");
319 }
320
321 theJDBCUtil =
322 new JDBCUtil() {
323 protected void delegatedLog(String logString) {
324 JDBCMailRepository.this.getLogger().warn("JDBCMailRepository: " + logString);
325 }
326 };
327
328 datasource = (DataSourceComponent)datasources.select(datasourceName);
329
330
331 Connection conn = datasource.getConnection();
332 PreparedStatement createStatement = null;
333
334 try {
335
336
337 File sqlFile = null;
338 try {
339 sqlFile = AvalonContextUtilities.getFile(context, sqlFileName);
340 sqlFileName = null;
341 } catch (Exception e) {
342 getLogger().fatalError(e.getMessage(), e);
343 throw e;
344 }
345
346 if (getLogger().isDebugEnabled()) {
347 logBuffer =
348 new StringBuffer(128)
349 .append("Reading SQL resources from file: ")
350 .append(sqlFile.getAbsolutePath())
351 .append(", section ")
352 .append(this.getClass().getName())
353 .append(".");
354 getLogger().debug(logBuffer.toString());
355 }
356
357
358 Map sqlParameters = new HashMap();
359 if (tableName != null) {
360 sqlParameters.put("table", tableName);
361 }
362 if (repositoryName != null) {
363 sqlParameters.put("repository", repositoryName);
364 }
365
366 sqlQueries = new SqlResources();
367 sqlQueries.init(sqlFile, this.getClass().getName(),
368 conn, sqlParameters);
369
370
371 DatabaseMetaData dbMetaData = conn.getMetaData();
372
373
374 if (!(theJDBCUtil.tableExists(dbMetaData, tableName))) {
375
376 createStatement =
377 conn.prepareStatement(sqlQueries.getSqlString("createTable", true));
378 createStatement.execute();
379
380 if (getLogger().isInfoEnabled()) {
381 logBuffer =
382 new StringBuffer(64)
383 .append("JdbcMailRepository: Created table '")
384 .append(tableName)
385 .append("'.");
386 getLogger().info(logBuffer.toString());
387 }
388 }
389
390 checkJdbcAttributesSupport(dbMetaData);
391
392 } finally {
393 theJDBCUtil.closeJDBCStatement(createStatement);
394 theJDBCUtil.closeJDBCConnection(conn);
395 }
396 }
397
398 /*** Checks whether support for JDBC Mail atributes is activated for this repository
399 * and if everything is consistent.
400 * Looks for both the "updateMessageAttributesSQL" and "retrieveMessageAttributesSQL"
401 * statements in sqlResources and for a table column named "message_attributes".
402 *
403 * @param dbMetaData the database metadata to be used to look up the column
404 * @throws SQLException if a fatal situation is met
405 */
406 protected void checkJdbcAttributesSupport(DatabaseMetaData dbMetaData) throws SQLException {
407 String attributesColumnName = "message_attributes";
408 boolean hasUpdateMessageAttributesSQL = false;
409 boolean hasRetrieveMessageAttributesSQL = false;
410
411 boolean hasMessageAttributesColumn = theJDBCUtil.columnExists(dbMetaData, tableName, attributesColumnName);
412
413 StringBuffer logBuffer = new StringBuffer(64)
414 .append("JdbcMailRepository '"
415 + repositoryName
416 + ", table '"
417 + tableName
418 + "': ");
419
420
421
422 String updateMessageAttrSql =
423 sqlQueries.getSqlString("updateMessageAttributesSQL", false);
424 if (updateMessageAttrSql!=null) {
425 hasUpdateMessageAttributesSQL = true;
426 }
427
428
429
430 String retrieveMessageAttrSql =
431 sqlQueries.getSqlString("retrieveMessageAttributesSQL", false);
432 if (retrieveMessageAttrSql!=null) {
433 hasRetrieveMessageAttributesSQL = true;
434 }
435
436 if (hasUpdateMessageAttributesSQL && !hasRetrieveMessageAttributesSQL) {
437 logBuffer.append("JDBC Mail Attributes support was activated for update but not for retrieval"
438 + "(found 'updateMessageAttributesSQL' but not 'retrieveMessageAttributesSQL'"
439 + "in table '"
440 + tableName
441 + "').");
442 getLogger().fatalError(logBuffer.toString());
443 throw new SQLException(logBuffer.toString());
444 }
445 if (!hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) {
446 logBuffer.append("JDBC Mail Attributes support was activated for retrieval but not for update"
447 + "(found 'retrieveMessageAttributesSQL' but not 'updateMessageAttributesSQL'"
448 + "in table '"
449 + tableName
450 + "'.");
451 getLogger().fatalError(logBuffer.toString());
452 throw new SQLException(logBuffer.toString());
453 }
454 if (!hasMessageAttributesColumn
455 && (hasUpdateMessageAttributesSQL || hasRetrieveMessageAttributesSQL)
456 ) {
457 logBuffer.append("JDBC Mail Attributes support was activated but column '"
458 + attributesColumnName
459 + "' is missing in table '"
460 + tableName
461 + "'.");
462 getLogger().fatalError(logBuffer.toString());
463 throw new SQLException(logBuffer.toString());
464 }
465 if (hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) {
466 jdbcMailAttributesReady = true;
467 if (getLogger().isInfoEnabled()) {
468 logBuffer.append("JDBC Mail Attributes support ready.");
469 getLogger().info(logBuffer.toString());
470 }
471 } else {
472 jdbcMailAttributesReady = false;
473 logBuffer.append("JDBC Mail Attributes support not activated. "
474 + "Missing both 'updateMessageAttributesSQL' "
475 + "and 'retrieveMessageAttributesSQL' "
476 + "statements for table '"
477 + tableName
478 + "' in sqlResources.xml. "
479 + "Will not persist in the repository '"
480 + repositoryName
481 + "'.");
482 getLogger().warn(logBuffer.toString());
483 }
484 }
485
486 /***
487 * Releases a lock on a message identified by a key
488 *
489 * @param key the key of the message to be unlocked
490 *
491 * @return true if successfully released the lock, false otherwise
492 */
493 public boolean unlock(String key) {
494 if (lock.unlock(key)) {
495 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
496 StringBuffer debugBuffer =
497 new StringBuffer(256)
498 .append("Unlocked ")
499 .append(key)
500 .append(" for ")
501 .append(Thread.currentThread().getName())
502 .append(" @ ")
503 .append(new java.util.Date(System.currentTimeMillis()));
504 getLogger().debug(debugBuffer.toString());
505 }
506 return true;
507 } else {
508 return false;
509 }
510 }
511
512 /***
513 * Obtains a lock on a message identified by a key
514 *
515 * @param key the key of the message to be locked
516 *
517 * @return true if successfully obtained the lock, false otherwise
518 */
519 public boolean lock(String key) {
520 if (lock.lock(key)) {
521 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
522 StringBuffer debugBuffer =
523 new StringBuffer(256)
524 .append("Locked ")
525 .append(key)
526 .append(" for ")
527 .append(Thread.currentThread().getName())
528 .append(" @ ")
529 .append(new java.util.Date(System.currentTimeMillis()));
530 getLogger().debug(debugBuffer.toString());
531 }
532 return true;
533 } else {
534 return false;
535 }
536 }
537
538 /***
539 * Store this message to the database. Optionally stores the message
540 * body to the filesystem and only writes the headers to the database.
541 */
542 public void store(Mail mc) throws MessagingException {
543 Connection conn = null;
544 boolean wasLocked = true;
545 String key = mc.getName();
546 try {
547 synchronized(this) {
548 wasLocked = lock.isLocked(key);
549
550 if (!wasLocked) {
551
552 lock(key);
553 }
554 }
555 conn = datasource.getConnection();
556
557
558
559
560 conn.setAutoCommit(false);
561
562 PreparedStatement checkMessageExists = null;
563 ResultSet rsExists = null;
564 boolean exists = false;
565 try {
566 checkMessageExists =
567 conn.prepareStatement(sqlQueries.getSqlString("checkMessageExistsSQL", true));
568 checkMessageExists.setString(1, mc.getName());
569 checkMessageExists.setString(2, repositoryName);
570 rsExists = checkMessageExists.executeQuery();
571 exists = rsExists.next() && rsExists.getInt(1) > 0;
572 } finally {
573 theJDBCUtil.closeJDBCResultSet(rsExists);
574 theJDBCUtil.closeJDBCStatement(checkMessageExists);
575 }
576
577 if (exists) {
578
579 PreparedStatement updateMessage = null;
580
581 try {
582 updateMessage =
583 conn.prepareStatement(sqlQueries.getSqlString("updateMessageSQL", true));
584 updateMessage.setString(1, mc.getState());
585 updateMessage.setString(2, mc.getErrorMessage());
586 if (mc.getSender() == null) {
587 updateMessage.setNull(3, java.sql.Types.VARCHAR);
588 } else {
589 updateMessage.setString(3, mc.getSender().toString());
590 }
591 StringBuffer recipients = new StringBuffer();
592 for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) {
593 recipients.append(i.next().toString());
594 if (i.hasNext()) {
595 recipients.append("\r\n");
596 }
597 }
598 updateMessage.setString(4, recipients.toString());
599 updateMessage.setString(5, mc.getRemoteHost());
600 updateMessage.setString(6, mc.getRemoteAddr());
601 updateMessage.setTimestamp(7, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
602 updateMessage.setString(8, mc.getName());
603 updateMessage.setString(9, repositoryName);
604 updateMessage.execute();
605 } finally {
606 Statement localUpdateMessage = updateMessage;
607
608 updateMessage = null;
609 theJDBCUtil.closeJDBCStatement(localUpdateMessage);
610 }
611
612
613 if (jdbcMailAttributesReady && mc.hasAttributes()) {
614 String updateMessageAttrSql =
615 sqlQueries.getSqlString("updateMessageAttributesSQL", false);
616 PreparedStatement updateMessageAttr = null;
617 try {
618 updateMessageAttr =
619 conn.prepareStatement(updateMessageAttrSql);
620 ByteArrayOutputStream baos = new ByteArrayOutputStream();
621 ObjectOutputStream oos = new ObjectOutputStream(baos);
622 try {
623 if (mc instanceof MailImpl) {
624 oos.writeObject(((MailImpl)mc).getAttributesRaw());
625 } else {
626 HashMap temp = new HashMap();
627 for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) {
628 String hashKey = (String) i.next();
629 temp.put(hashKey,mc.getAttribute(hashKey));
630 }
631 oos.writeObject(temp);
632 }
633 oos.flush();
634 ByteArrayInputStream attrInputStream =
635 new ByteArrayInputStream(baos.toByteArray());
636 updateMessageAttr.setBinaryStream(1, attrInputStream, baos.size());
637 } finally {
638 try {
639 if (oos != null) {
640 oos.close();
641 }
642 } catch (IOException ioe) {
643 getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe);
644 }
645 }
646 updateMessageAttr.setString(2, mc.getName());
647 updateMessageAttr.setString(3, repositoryName);
648 updateMessageAttr.execute();
649 } catch (SQLException sqle) {
650 getLogger().info("JDBCMailRepository: Trying to update mail attributes failed.",sqle);
651
652 } finally {
653 theJDBCUtil.closeJDBCStatement(updateMessageAttr);
654 }
655 }
656
657
658
659 MimeMessage messageBody = mc.getMessage();
660 boolean saveBody = false;
661
662 if (messageBody instanceof MimeMessageCopyOnWriteProxy) {
663 MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) messageBody;
664 messageBody = messageCow.getWrappedMessage();
665 }
666 if (messageBody instanceof MimeMessageWrapper) {
667 MimeMessageWrapper message = (MimeMessageWrapper)messageBody;
668 saveBody = message.isModified();
669 } else {
670 saveBody = true;
671 }
672
673 if (saveBody) {
674 PreparedStatement updateMessageBody =
675 conn.prepareStatement(sqlQueries.getSqlString("updateMessageBodySQL", true));
676 try {
677 MessageInputStream is = new MessageInputStream(mc,sr,inMemorySizeLimit);
678 updateMessageBody.setBinaryStream(1,is,(int) is.getSize());
679 updateMessageBody.setString(2, mc.getName());
680 updateMessageBody.setString(3, repositoryName);
681 updateMessageBody.execute();
682
683 } finally {
684 theJDBCUtil.closeJDBCStatement(updateMessageBody);
685 }
686 }
687
688
689 } else {
690
691 PreparedStatement insertMessage = null;
692 try {
693 String insertMessageSQL = sqlQueries.getSqlString("insertMessageSQL", true);
694 int number_of_parameters = getNumberOfParameters (insertMessageSQL);
695 insertMessage =
696 conn.prepareStatement(insertMessageSQL);
697 insertMessage.setString(1, mc.getName());
698 insertMessage.setString(2, repositoryName);
699 insertMessage.setString(3, mc.getState());
700 insertMessage.setString(4, mc.getErrorMessage());
701 if (mc.getSender() == null) {
702 insertMessage.setNull(5, java.sql.Types.VARCHAR);
703 } else {
704 insertMessage.setString(5, mc.getSender().toString());
705 }
706 StringBuffer recipients = new StringBuffer();
707 for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) {
708 recipients.append(i.next().toString());
709 if (i.hasNext()) {
710 recipients.append("\r\n");
711 }
712 }
713 insertMessage.setString(6, recipients.toString());
714 insertMessage.setString(7, mc.getRemoteHost());
715 insertMessage.setString(8, mc.getRemoteAddr());
716 insertMessage.setTimestamp(9, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
717
718 MessageInputStream is = new MessageInputStream(mc, sr, inMemorySizeLimit);
719
720 insertMessage.setBinaryStream(10, is, (int) is.getSize());
721
722
723 if (number_of_parameters > 10) {
724 ByteArrayOutputStream baos = new ByteArrayOutputStream();
725 ObjectOutputStream oos = new ObjectOutputStream(baos);
726 try {
727 if (mc instanceof MailImpl) {
728 oos.writeObject(((MailImpl)mc).getAttributesRaw());
729 } else {
730 HashMap temp = new HashMap();
731 for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) {
732 String hashKey = (String) i.next();
733 temp.put(hashKey,mc.getAttribute(hashKey));
734 }
735 oos.writeObject(temp);
736 }
737 oos.flush();
738 ByteArrayInputStream attrInputStream =
739 new ByteArrayInputStream(baos.toByteArray());
740 insertMessage.setBinaryStream(11, attrInputStream, baos.size());
741 } finally {
742 try {
743 if (oos != null) {
744 oos.close();
745 }
746 } catch (IOException ioe) {
747 getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe);
748 }
749 }
750 }
751
752 insertMessage.execute();
753 } finally {
754 theJDBCUtil.closeJDBCStatement(insertMessage);
755 }
756 }
757
758
759 conn.commit();
760 conn.setAutoCommit(true);
761
762 } catch (Exception e) {
763 getLogger().error("Exception caught while storing mail Container",e);
764 throw new MessagingException("Exception caught while storing mail Container: ",e);
765 } finally {
766 theJDBCUtil.closeJDBCConnection(conn);
767 if (!wasLocked) {
768
769 unlock(key);
770 synchronized (this) {
771 notify();
772 }
773 }
774 }
775 }
776
777 /***
778 * Retrieves a message given a key. At the moment, keys can be obtained
779 * from list()
780 *
781 * @param key the key of the message to retrieve
782 * @return the mail corresponding to this key, null if none exists
783 */
784 public Mail retrieve(String key) throws MessagingException {
785 if (DEEP_DEBUG) {
786 System.err.println("retrieving " + key);
787 }
788 Connection conn = null;
789 PreparedStatement retrieveMessage = null;
790 ResultSet rsMessage = null;
791 try {
792 conn = datasource.getConnection();
793 if (DEEP_DEBUG) {
794 System.err.println("got a conn " + key);
795 }
796
797 retrieveMessage =
798 conn.prepareStatement(sqlQueries.getSqlString("retrieveMessageSQL", true));
799 retrieveMessage.setString(1, key);
800 retrieveMessage.setString(2, repositoryName);
801 rsMessage = retrieveMessage.executeQuery();
802 if (DEEP_DEBUG) {
803 System.err.println("ran the query " + key);
804 }
805 if (!rsMessage.next()) {
806 if (getLogger().isDebugEnabled()) {
807 StringBuffer debugBuffer =
808 new StringBuffer(64)
809 .append("Did not find a record ")
810 .append(key)
811 .append(" in ")
812 .append(repositoryName);
813 getLogger().debug(debugBuffer.toString());
814 }
815 return null;
816 }
817
818 PreparedStatement retrieveMessageAttr = null;
819 HashMap attributes = null;
820 if (jdbcMailAttributesReady) {
821 String retrieveMessageAttrSql =
822 sqlQueries.getSqlString("retrieveMessageAttributesSQL", false);
823 ResultSet rsMessageAttr = null;
824 try {
825 retrieveMessageAttr =
826 conn.prepareStatement(retrieveMessageAttrSql);
827
828 retrieveMessageAttr.setString(1, key);
829 retrieveMessageAttr.setString(2, repositoryName);
830 rsMessageAttr = retrieveMessageAttr.executeQuery();
831
832 if (rsMessageAttr.next()) {
833 try {
834 byte[] serialized_attr = null;
835 String getAttributesOption = sqlQueries.getDbOption("getAttributes");
836 if (getAttributesOption != null && (getAttributesOption.equalsIgnoreCase("useBlob") || getAttributesOption.equalsIgnoreCase("useBinaryStream"))) {
837 Blob b = rsMessageAttr.getBlob(1);
838 serialized_attr = b.getBytes(1, (int)b.length());
839 } else {
840 serialized_attr = rsMessageAttr.getBytes(1);
841 }
842
843 if (serialized_attr != null) {
844 ByteArrayInputStream bais = new ByteArrayInputStream(serialized_attr);
845 ObjectInputStream ois = new ObjectInputStream(bais);
846 attributes = (HashMap)ois.readObject();
847 ois.close();
848 }
849 } catch (IOException ioe) {
850 if (getLogger().isDebugEnabled()) {
851 StringBuffer debugBuffer =
852 new StringBuffer(64)
853 .append("Exception reading attributes ")
854 .append(key)
855 .append(" in ")
856 .append(repositoryName);
857 getLogger().debug(debugBuffer.toString(), ioe);
858 }
859 }
860 } else {
861 if (getLogger().isDebugEnabled()) {
862 StringBuffer debugBuffer =
863 new StringBuffer(64)
864 .append("Did not find a record (attributes) ")
865 .append(key)
866 .append(" in ")
867 .append(repositoryName);
868 getLogger().debug(debugBuffer.toString());
869 }
870 }
871 } catch (SQLException sqle) {
872 StringBuffer errorBuffer = new StringBuffer(256)
873 .append("Error retrieving message")
874 .append(sqle.getMessage())
875 .append(sqle.getErrorCode())
876 .append(sqle.getSQLState())
877 .append(sqle.getNextException());
878 getLogger().error(errorBuffer.toString());
879 } finally {
880 theJDBCUtil.closeJDBCResultSet(rsMessageAttr);
881 theJDBCUtil.closeJDBCStatement(retrieveMessageAttr);
882 }
883 }
884
885 MailImpl mc = new MailImpl();
886 mc.setAttributesRaw (attributes);
887 mc.setName(key);
888 mc.setState(rsMessage.getString(1));
889 mc.setErrorMessage(rsMessage.getString(2));
890 String sender = rsMessage.getString(3);
891 if (sender == null) {
892 mc.setSender(null);
893 } else {
894 mc.setSender(new MailAddress(sender));
895 }
896 StringTokenizer st = new StringTokenizer(rsMessage.getString(4), "\r\n", false);
897 Set recipients = new HashSet();
898 while (st.hasMoreTokens()) {
899 recipients.add(new MailAddress(st.nextToken()));
900 }
901 mc.setRecipients(recipients);
902 mc.setRemoteHost(rsMessage.getString(5));
903 mc.setRemoteAddr(rsMessage.getString(6));
904 mc.setLastUpdated(rsMessage.getTimestamp(7));
905
906 MimeMessageJDBCSource source = new MimeMessageJDBCSource(this, key, sr);
907 MimeMessageCopyOnWriteProxy message = new MimeMessageCopyOnWriteProxy(source);
908 mc.setMessage(message);
909 return mc;
910 } catch (SQLException sqle) {
911 StringBuffer errorBuffer = new StringBuffer(256)
912 .append("Error retrieving message")
913 .append(sqle.getMessage())
914 .append(sqle.getErrorCode())
915 .append(sqle.getSQLState())
916 .append(sqle.getNextException());
917 getLogger().error(errorBuffer.toString());
918 throw new MessagingException("Exception while retrieving mail: " + sqle.getMessage());
919 } catch (Exception me) {
920 throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
921 } finally {
922 theJDBCUtil.closeJDBCResultSet(rsMessage);
923 theJDBCUtil.closeJDBCStatement(retrieveMessage);
924 theJDBCUtil.closeJDBCConnection(conn);
925 }
926 }
927
928 /***
929 * Removes a specified message
930 *
931 * @param mail the message to be removed from the repository
932 */
933 public void remove(Mail mail) throws MessagingException {
934 remove(mail.getName());
935 }
936
937 /***
938 * Removes a Collection of mails from the repository
939 * @param mails The Collection of <code>MailImpl</code>'s to delete
940 * @throws MessagingException
941 * @since 2.2.0
942 */
943 public void remove(Collection mails) throws MessagingException {
944 Iterator delList = mails.iterator();
945 while (delList.hasNext()) {
946 remove((Mail)delList.next());
947 }
948 }
949
950 /***
951 * Removes a message identified by a key.
952 *
953 * @param key the key of the message to be removed from the repository
954 */
955 public void remove(String key) throws MessagingException {
956
957 if (lock(key)) {
958 Connection conn = null;
959 PreparedStatement removeMessage = null;
960 try {
961 conn = datasource.getConnection();
962 removeMessage =
963 conn.prepareStatement(sqlQueries.getSqlString("removeMessageSQL", true));
964 removeMessage.setString(1, key);
965 removeMessage.setString(2, repositoryName);
966 removeMessage.execute();
967
968 if (sr != null) {
969 sr.remove(key);
970 }
971 } catch (Exception me) {
972 throw new MessagingException("Exception while removing mail: " + me.getMessage());
973 } finally {
974 theJDBCUtil.closeJDBCStatement(removeMessage);
975 theJDBCUtil.closeJDBCConnection(conn);
976 unlock(key);
977 }
978 }
979 }
980
981 /***
982 * Gets a list of message keys stored in this repository.
983 *
984 * @return an Iterator of the message keys
985 */
986 public Iterator list() throws MessagingException {
987
988 Connection conn = null;
989 PreparedStatement listMessages = null;
990 ResultSet rsListMessages = null;
991 try {
992 conn = datasource.getConnection();
993 listMessages =
994 conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
995 listMessages.setString(1, repositoryName);
996 rsListMessages = listMessages.executeQuery();
997
998 List messageList = new ArrayList();
999 while (rsListMessages.next() && !Thread.currentThread().isInterrupted()) {
1000 messageList.add(rsListMessages.getString(1));
1001 }
1002 return messageList.iterator();
1003 } catch (Exception me) {
1004 throw new MessagingException("Exception while listing mail: " + me.getMessage());
1005 } finally {
1006 theJDBCUtil.closeJDBCResultSet(rsListMessages);
1007 theJDBCUtil.closeJDBCStatement(listMessages);
1008 theJDBCUtil.closeJDBCConnection(conn);
1009 }
1010 }
1011
1012 /***
1013 * Gets the SQL connection to be used by this JDBCMailRepository
1014 *
1015 * @return the connection
1016 * @throws SQLException if there is an issue with getting the connection
1017 */
1018 protected Connection getConnection() throws SQLException {
1019 return datasource.getConnection();
1020 }
1021
1022 /***
1023 * @see java.lang.Object#equals(Object)
1024 */
1025 public boolean equals(Object obj) {
1026 if (!(obj instanceof JDBCMailRepository)) {
1027 return false;
1028 }
1029
1030
1031 JDBCMailRepository repository = (JDBCMailRepository)obj;
1032 return ((repository.tableName == tableName) || ((repository.tableName != null) && repository.tableName.equals(tableName))) &&
1033 ((repository.repositoryName == repositoryName) || ((repository.repositoryName != null) && repository.repositoryName.equals(repositoryName)));
1034 }
1035
1036 /***
1037 * Provide a hash code that is consistent with equals for this class
1038 *
1039 * @return the hash code
1040 */
1041 public int hashCode() {
1042 int result = 17;
1043 if (tableName != null) {
1044 result = 37 * tableName.hashCode();
1045 }
1046 if (repositoryName != null) {
1047 result = 37 * repositoryName.hashCode();
1048 }
1049 return result;
1050 }
1051
1052 /***
1053 * This method calculates number of parameters in a prepared statement SQL String.
1054 * It does so by counting the number of '?' in the string
1055 * @param sqlstring to return parameter count for
1056 * @return number of parameters
1057 **/
1058 private int getNumberOfParameters (String sqlstring) {
1059
1060
1061 char[] chars = sqlstring.toCharArray();
1062 int count = 0;
1063 for (int i = 0; i < chars.length; i++) {
1064 count += chars[i]=='?' ? 1 : 0;
1065 }
1066 return count;
1067 }
1068 }