1 /************************************************************************
2 * Copyright (c) 2000-2006 The Apache Software Foundation. *
3 * All rights reserved. *
4 * ------------------------------------------------------------------- *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you *
6 * may not use this file except in compliance with the License. You *
7 * may obtain a copy of the License at: *
8 * *
9 * http://www.apache.org/licenses/LICENSE-2.0 *
10 * *
11 * Unless required by applicable law or agreed to in writing, software *
12 * distributed under the License is distributed on an "AS IS" BASIS, *
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14 * implied. See the License for the specific language governing *
15 * permissions and limitations under the License. *
16 ***********************************************************************/
17
18 package org.apache.james.mailrepository;
19
20 import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
21 import org.apache.avalon.cornerstone.services.store.Store;
22 import org.apache.avalon.cornerstone.services.store.StreamRepository;
23 import org.apache.avalon.excalibur.datasource.DataSourceComponent;
24 import org.apache.avalon.framework.activity.Initializable;
25 import org.apache.avalon.framework.service.Serviceable;
26 import org.apache.avalon.framework.service.ServiceManager;
27 import org.apache.avalon.framework.service.ServiceException;
28 import org.apache.avalon.framework.configuration.Configurable;
29 import org.apache.avalon.framework.configuration.Configuration;
30 import org.apache.avalon.framework.configuration.ConfigurationException;
31 import org.apache.avalon.framework.configuration.DefaultConfiguration;
32 import org.apache.avalon.framework.context.Context;
33 import org.apache.avalon.framework.context.ContextException;
34 import org.apache.avalon.framework.context.Contextualizable;
35 import org.apache.avalon.framework.logger.AbstractLogEnabled;
36 import org.apache.james.context.AvalonContextUtilities;
37 import org.apache.james.core.MailImpl;
38 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
39 import org.apache.james.core.MimeMessageWrapper;
40 import org.apache.james.services.MailRepository;
41 import org.apache.james.util.JDBCUtil;
42 import org.apache.james.util.Lock;
43 import org.apache.james.util.SqlResources;
44 import org.apache.mailet.Mail;
45 import org.apache.mailet.MailAddress;
46
47 import javax.mail.MessagingException;
48 import javax.mail.internet.MimeMessage;
49
50 import java.io.ByteArrayInputStream;
51 import java.io.ByteArrayOutputStream;
52 import java.io.File;
53 import java.io.IOException;
54 import java.io.ObjectOutputStream;
55 import java.io.ObjectInputStream;
56 import java.sql.Blob;
57 import java.sql.Connection;
58 import java.sql.DatabaseMetaData;
59 import java.sql.PreparedStatement;
60 import java.sql.ResultSet;
61 import java.sql.SQLException;
62 import java.sql.Statement;
63 import java.util.ArrayList;
64 import java.util.Collection;
65 import java.util.HashMap;
66 import java.util.HashSet;
67 import java.util.Iterator;
68 import java.util.List;
69 import java.util.Map;
70 import java.util.Set;
71 import java.util.StringTokenizer;
72
73 /***
74 * Implementation of a MailRepository on a database.
75 *
76 * <p>Requires a configuration element in the .conf.xml file of the form:
77 * <br><repository destinationURL="db://<datasource>/<table_name>/<repository_name>"
78 * <br> type="MAIL"
79 * <br> model="SYNCHRONOUS"/>
80 * <br></repository>
81 * <p>destinationURL specifies..(Serge??)
82 * <br>Type can be SPOOL or MAIL
83 * <br>Model is currently not used and may be dropped
84 *
85 * <p>Requires a logger called MailRepository.
86 *
87 * @version CVS $Revision: 397494 $ $Date: 2006-04-27 09:43:34 +0000 (gio, 27 apr 2006) $
88 */
89 public class JDBCMailRepository
90 extends AbstractLogEnabled
91 implements MailRepository, Contextualizable, Serviceable, Configurable, Initializable {
92
93 /***
94 * Whether 'deep debugging' is turned on.
95 */
96 private static final boolean DEEP_DEBUG = false;
97
98 /***
99 * The Avalon componentManager used by the instance
100 */
101 private ServiceManager componentManager;
102
103 /***
104 * The Avalon context used by the instance
105 */
106 protected Context context;
107
108 /***
109 * A lock used to control access to repository elements, locking access
110 * based on the key
111 */
112 private Lock lock;
113
114 /***
115 * The table name parsed from the destination URL
116 */
117 protected String tableName;
118
119 /***
120 * The repository name parsed from the destination URL
121 */
122 protected String repositoryName;
123
124 /***
125 * The name of the SQL configuration file to be used to configure this repository.
126 */
127 private String sqlFileName;
128
129 /***
130 * The stream repository used in dbfile mode
131 */
132 private StreamRepository sr = null;
133
134 /***
135 * The selector used to obtain the JDBC datasource
136 */
137 protected DataSourceSelector datasources;
138
139 /***
140 * The JDBC datasource that provides the JDBC connection
141 */
142 protected DataSourceComponent datasource;
143
144 /***
145 * The name of the datasource used by this repository
146 */
147 protected String datasourceName;
148
149 /***
150 * Contains all of the sql strings for this component.
151 */
152 protected SqlResources sqlQueries;
153
154 /***
155 * The JDBCUtil helper class
156 */
157 protected JDBCUtil theJDBCUtil;
158
159 /***
160 * "Support for Mail Attributes under JDBC repositories is ready" indicator.
161 */
162 protected boolean jdbcMailAttributesReady = false;
163
164 /***
165 * The size threshold for in memory handling of storing operations
166 */
167 private int inMemorySizeLimit;
168
169 /***
170 * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
171 */
172 public void contextualize(final Context context)
173 throws ContextException {
174 this.context = context;
175 }
176
177 /***
178 * @see org.apache.avalon.framework.service.Servicable#service(ServiceManager)
179 */
180 public void service( final ServiceManager componentManager )
181 throws ServiceException {
182 StringBuffer logBuffer = null;
183 if (getLogger().isDebugEnabled()) {
184 logBuffer =
185 new StringBuffer(64)
186 .append(this.getClass().getName())
187 .append(".compose()");
188 getLogger().debug(logBuffer.toString());
189 }
190
191 datasources = (DataSourceSelector)componentManager.lookup( DataSourceSelector.ROLE );
192 this.componentManager = componentManager;
193
194 }
195
196 /***
197 * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
198 */
199 public void configure(Configuration conf) throws ConfigurationException {
200 if (getLogger().isDebugEnabled()) {
201 getLogger().debug(this.getClass().getName() + ".configure()");
202 }
203
204 String destination = conf.getAttribute("destinationURL");
205
206 if ( ! destination.endsWith("/") ) {
207 destination += "/";
208 }
209
210
211
212 List urlParams = new ArrayList();
213 int start = 5;
214 if (destination.startsWith("dbfile")) {
215
216 start += 4;
217 }
218 int end = destination.indexOf('/', start);
219 while ( end > -1 ) {
220 urlParams.add(destination.substring(start, end));
221 start = end + 1;
222 end = destination.indexOf('/', start);
223 }
224
225
226 if (urlParams.size() == 0) {
227 StringBuffer exceptionBuffer =
228 new StringBuffer(256)
229 .append("Malformed destinationURL - Must be of the format '")
230 .append("db://<data-source>[/<table>[/<repositoryName>]]'. Was passed ")
231 .append(conf.getAttribute("destinationURL"));
232 throw new ConfigurationException(exceptionBuffer.toString());
233 }
234 if (urlParams.size() >= 1) {
235 datasourceName = (String)urlParams.get(0);
236 }
237 if (urlParams.size() >= 2) {
238 tableName = (String)urlParams.get(1);
239 }
240 if (urlParams.size() >= 3) {
241 repositoryName = "";
242 for (int i = 2; i < urlParams.size(); i++) {
243 if (i >= 3) {
244 repositoryName += '/';
245 }
246 repositoryName += (String)urlParams.get(i);
247 }
248 }
249
250 if (getLogger().isDebugEnabled()) {
251 StringBuffer logBuffer =
252 new StringBuffer(128)
253 .append("Parsed URL: table = '")
254 .append(tableName)
255 .append("', repositoryName = '")
256 .append(repositoryName)
257 .append("'");
258 getLogger().debug(logBuffer.toString());
259 }
260
261 inMemorySizeLimit = conf.getChild("inMemorySizeLimit").getValueAsInteger(409600000);
262
263 String filestore = conf.getChild("filestore").getValue(null);
264 sqlFileName = conf.getChild("sqlFile").getValue();
265 if (!sqlFileName.startsWith("file://")) {
266 throw new ConfigurationException
267 ("Malformed sqlFile - Must be of the format 'file://<filename>'.");
268 }
269 try {
270 if (filestore != null) {
271 Store store = (Store)componentManager.lookup(Store.ROLE);
272
273 DefaultConfiguration streamConfiguration
274 = new DefaultConfiguration( "repository",
275 "generated:JDBCMailRepository.configure()" );
276
277 streamConfiguration.setAttribute( "destinationURL", filestore );
278 streamConfiguration.setAttribute( "type", "STREAM" );
279 streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
280 sr = (StreamRepository) store.select(streamConfiguration);
281
282 if (getLogger().isDebugEnabled()) {
283 getLogger().debug("Got filestore for JdbcMailRepository: " + filestore);
284 }
285 }
286
287 lock = new Lock();
288 if (getLogger().isDebugEnabled()) {
289 StringBuffer logBuffer =
290 new StringBuffer(128)
291 .append(this.getClass().getName())
292 .append(" created according to ")
293 .append(destination);
294 getLogger().debug(logBuffer.toString());
295 }
296 } catch (Exception e) {
297 final String message = "Failed to retrieve Store component:" + e.getMessage();
298 getLogger().error(message, e);
299 throw new ConfigurationException(message, e);
300 }
301 }
302
303 /***
304 * Initialises the JDBC repository.
305 * 1) Tests the connection to the database.
306 * 2) Loads SQL strings from the SQL definition file,
307 * choosing the appropriate SQL for this connection,
308 * and performing paramter substitution,
309 * 3) Initialises the database with the required tables, if necessary.
310 *
311 * @throws Exception if an error occurs
312 */
313 public void initialize() throws Exception {
314 StringBuffer logBuffer = null;
315 if (getLogger().isDebugEnabled()) {
316 getLogger().debug(this.getClass().getName() + ".initialize()");
317 }
318
319 theJDBCUtil =
320 new JDBCUtil() {
321 protected void delegatedLog(String logString) {
322 JDBCMailRepository.this.getLogger().warn("JDBCMailRepository: " + logString);
323 }
324 };
325
326 datasource = (DataSourceComponent)datasources.select(datasourceName);
327
328
329 Connection conn = datasource.getConnection();
330 PreparedStatement createStatement = null;
331
332 try {
333
334
335 File sqlFile = null;
336 try {
337 sqlFile = AvalonContextUtilities.getFile(context, sqlFileName);
338 sqlFileName = null;
339 } catch (Exception e) {
340 getLogger().fatalError(e.getMessage(), e);
341 throw e;
342 }
343
344 if (getLogger().isDebugEnabled()) {
345 logBuffer =
346 new StringBuffer(128)
347 .append("Reading SQL resources from file: ")
348 .append(sqlFile.getAbsolutePath())
349 .append(", section ")
350 .append(this.getClass().getName())
351 .append(".");
352 getLogger().debug(logBuffer.toString());
353 }
354
355
356 Map sqlParameters = new HashMap();
357 if (tableName != null) {
358 sqlParameters.put("table", tableName);
359 }
360 if (repositoryName != null) {
361 sqlParameters.put("repository", repositoryName);
362 }
363
364 sqlQueries = new SqlResources();
365 sqlQueries.init(sqlFile, this.getClass().getName(),
366 conn, sqlParameters);
367
368
369 DatabaseMetaData dbMetaData = conn.getMetaData();
370
371
372 if (!(theJDBCUtil.tableExists(dbMetaData, tableName))) {
373
374 createStatement =
375 conn.prepareStatement(sqlQueries.getSqlString("createTable", true));
376 createStatement.execute();
377
378 if (getLogger().isInfoEnabled()) {
379 logBuffer =
380 new StringBuffer(64)
381 .append("JdbcMailRepository: Created table '")
382 .append(tableName)
383 .append("'.");
384 getLogger().info(logBuffer.toString());
385 }
386 }
387
388 checkJdbcAttributesSupport(dbMetaData);
389
390 } finally {
391 theJDBCUtil.closeJDBCStatement(createStatement);
392 theJDBCUtil.closeJDBCConnection(conn);
393 }
394 }
395
396 /*** Checks whether support for JDBC Mail atributes is activated for this repository
397 * and if everything is consistent.
398 * Looks for both the "updateMessageAttributesSQL" and "retrieveMessageAttributesSQL"
399 * statements in sqlResources and for a table column named "message_attributes".
400 *
401 * @param dbMetaData the database metadata to be used to look up the column
402 * @throws SQLException if a fatal situation is met
403 */
404 protected void checkJdbcAttributesSupport(DatabaseMetaData dbMetaData) throws SQLException {
405 String attributesColumnName = "message_attributes";
406 boolean hasUpdateMessageAttributesSQL = false;
407 boolean hasRetrieveMessageAttributesSQL = false;
408
409 boolean hasMessageAttributesColumn = theJDBCUtil.columnExists(dbMetaData, tableName, attributesColumnName);
410
411 StringBuffer logBuffer = new StringBuffer(64)
412 .append("JdbcMailRepository '"
413 + repositoryName
414 + ", table '"
415 + tableName
416 + "': ");
417
418
419
420 String updateMessageAttrSql =
421 sqlQueries.getSqlString("updateMessageAttributesSQL", false);
422 if (updateMessageAttrSql!=null) {
423 hasUpdateMessageAttributesSQL = true;
424 }
425
426
427
428 String retrieveMessageAttrSql =
429 sqlQueries.getSqlString("retrieveMessageAttributesSQL", false);
430 if (retrieveMessageAttrSql!=null) {
431 hasRetrieveMessageAttributesSQL = true;
432 }
433
434 if (hasUpdateMessageAttributesSQL && !hasRetrieveMessageAttributesSQL) {
435 logBuffer.append("JDBC Mail Attributes support was activated for update but not for retrieval"
436 + "(found 'updateMessageAttributesSQL' but not 'retrieveMessageAttributesSQL'"
437 + "in table '"
438 + tableName
439 + "').");
440 getLogger().fatalError(logBuffer.toString());
441 throw new SQLException(logBuffer.toString());
442 }
443 if (!hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) {
444 logBuffer.append("JDBC Mail Attributes support was activated for retrieval but not for update"
445 + "(found 'retrieveMessageAttributesSQL' but not 'updateMessageAttributesSQL'"
446 + "in table '"
447 + tableName
448 + "'.");
449 getLogger().fatalError(logBuffer.toString());
450 throw new SQLException(logBuffer.toString());
451 }
452 if (!hasMessageAttributesColumn
453 && (hasUpdateMessageAttributesSQL || hasRetrieveMessageAttributesSQL)
454 ) {
455 logBuffer.append("JDBC Mail Attributes support was activated but column '"
456 + attributesColumnName
457 + "' is missing in table '"
458 + tableName
459 + "'.");
460 getLogger().fatalError(logBuffer.toString());
461 throw new SQLException(logBuffer.toString());
462 }
463 if (hasUpdateMessageAttributesSQL && hasRetrieveMessageAttributesSQL) {
464 jdbcMailAttributesReady = true;
465 if (getLogger().isInfoEnabled()) {
466 logBuffer.append("JDBC Mail Attributes support ready.");
467 getLogger().info(logBuffer.toString());
468 }
469 } else {
470 jdbcMailAttributesReady = false;
471 logBuffer.append("JDBC Mail Attributes support not activated. "
472 + "Missing both 'updateMessageAttributesSQL' "
473 + "and 'retrieveMessageAttributesSQL' "
474 + "statements for table '"
475 + tableName
476 + "' in sqlResources.xml. "
477 + "Will not persist in the repository '"
478 + repositoryName
479 + "'.");
480 getLogger().warn(logBuffer.toString());
481 }
482 }
483
484 /***
485 * Releases a lock on a message identified by a key
486 *
487 * @param key the key of the message to be unlocked
488 *
489 * @return true if successfully released the lock, false otherwise
490 */
491 public boolean unlock(String key) {
492 if (lock.unlock(key)) {
493 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
494 StringBuffer debugBuffer =
495 new StringBuffer(256)
496 .append("Unlocked ")
497 .append(key)
498 .append(" for ")
499 .append(Thread.currentThread().getName())
500 .append(" @ ")
501 .append(new java.util.Date(System.currentTimeMillis()));
502 getLogger().debug(debugBuffer.toString());
503 }
504 return true;
505 } else {
506 return false;
507 }
508 }
509
510 /***
511 * Obtains a lock on a message identified by a key
512 *
513 * @param key the key of the message to be locked
514 *
515 * @return true if successfully obtained the lock, false otherwise
516 */
517 public boolean lock(String key) {
518 if (lock.lock(key)) {
519 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
520 StringBuffer debugBuffer =
521 new StringBuffer(256)
522 .append("Locked ")
523 .append(key)
524 .append(" for ")
525 .append(Thread.currentThread().getName())
526 .append(" @ ")
527 .append(new java.util.Date(System.currentTimeMillis()));
528 getLogger().debug(debugBuffer.toString());
529 }
530 return true;
531 } else {
532 return false;
533 }
534 }
535
536 /***
537 * Store this message to the database. Optionally stores the message
538 * body to the filesystem and only writes the headers to the database.
539 */
540 public void store(Mail mc) throws MessagingException {
541 Connection conn = null;
542 boolean wasLocked = true;
543 String key = mc.getName();
544 try {
545 synchronized(this) {
546 wasLocked = lock.isLocked(key);
547
548 if (!wasLocked) {
549
550 lock(key);
551 }
552 }
553 conn = datasource.getConnection();
554
555
556
557
558 conn.setAutoCommit(false);
559
560 PreparedStatement checkMessageExists = null;
561 ResultSet rsExists = null;
562 boolean exists = false;
563 try {
564 checkMessageExists =
565 conn.prepareStatement(sqlQueries.getSqlString("checkMessageExistsSQL", true));
566 checkMessageExists.setString(1, mc.getName());
567 checkMessageExists.setString(2, repositoryName);
568 rsExists = checkMessageExists.executeQuery();
569 exists = rsExists.next() && rsExists.getInt(1) > 0;
570 } finally {
571 theJDBCUtil.closeJDBCResultSet(rsExists);
572 theJDBCUtil.closeJDBCStatement(checkMessageExists);
573 }
574
575 if (exists) {
576
577 PreparedStatement updateMessage = null;
578
579 try {
580 updateMessage =
581 conn.prepareStatement(sqlQueries.getSqlString("updateMessageSQL", true));
582 updateMessage.setString(1, mc.getState());
583 updateMessage.setString(2, mc.getErrorMessage());
584 if (mc.getSender() == null) {
585 updateMessage.setNull(3, java.sql.Types.VARCHAR);
586 } else {
587 updateMessage.setString(3, mc.getSender().toString());
588 }
589 StringBuffer recipients = new StringBuffer();
590 for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) {
591 recipients.append(i.next().toString());
592 if (i.hasNext()) {
593 recipients.append("\r\n");
594 }
595 }
596 updateMessage.setString(4, recipients.toString());
597 updateMessage.setString(5, mc.getRemoteHost());
598 updateMessage.setString(6, mc.getRemoteAddr());
599 updateMessage.setTimestamp(7, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
600 updateMessage.setString(8, mc.getName());
601 updateMessage.setString(9, repositoryName);
602 updateMessage.execute();
603 } finally {
604 Statement localUpdateMessage = updateMessage;
605
606 updateMessage = null;
607 theJDBCUtil.closeJDBCStatement(localUpdateMessage);
608 }
609
610
611 if (jdbcMailAttributesReady && mc.hasAttributes()) {
612 String updateMessageAttrSql =
613 sqlQueries.getSqlString("updateMessageAttributesSQL", false);
614 PreparedStatement updateMessageAttr = null;
615 try {
616 updateMessageAttr =
617 conn.prepareStatement(updateMessageAttrSql);
618 ByteArrayOutputStream baos = new ByteArrayOutputStream();
619 ObjectOutputStream oos = new ObjectOutputStream(baos);
620 try {
621 if (mc instanceof MailImpl) {
622 oos.writeObject(((MailImpl)mc).getAttributesRaw());
623 } else {
624 HashMap temp = new HashMap();
625 for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) {
626 String hashKey = (String) i.next();
627 temp.put(hashKey,mc.getAttribute(hashKey));
628 }
629 oos.writeObject(temp);
630 }
631 oos.flush();
632 ByteArrayInputStream attrInputStream =
633 new ByteArrayInputStream(baos.toByteArray());
634 updateMessageAttr.setBinaryStream(1, attrInputStream, baos.size());
635 } finally {
636 try {
637 if (oos != null) {
638 oos.close();
639 }
640 } catch (IOException ioe) {
641 getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe);
642 }
643 }
644 updateMessageAttr.setString(2, mc.getName());
645 updateMessageAttr.setString(3, repositoryName);
646 updateMessageAttr.execute();
647 } catch (SQLException sqle) {
648 getLogger().info("JDBCMailRepository: Trying to update mail attributes failed.",sqle);
649
650 } finally {
651 theJDBCUtil.closeJDBCStatement(updateMessageAttr);
652 }
653 }
654
655
656
657 MimeMessage messageBody = mc.getMessage();
658 boolean saveBody = false;
659
660 if (messageBody instanceof MimeMessageCopyOnWriteProxy) {
661 MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) messageBody;
662 messageBody = messageCow.getWrappedMessage();
663 }
664 if (messageBody instanceof MimeMessageWrapper) {
665 MimeMessageWrapper message = (MimeMessageWrapper)messageBody;
666 saveBody = message.isModified();
667 } else {
668 saveBody = true;
669 }
670
671 if (saveBody) {
672 PreparedStatement updateMessageBody =
673 conn.prepareStatement(sqlQueries.getSqlString("updateMessageBodySQL", true));
674 try {
675 MessageInputStream is = new MessageInputStream(mc,sr,inMemorySizeLimit);
676 updateMessageBody.setBinaryStream(1,is,(int) is.getSize());
677 updateMessageBody.setString(2, mc.getName());
678 updateMessageBody.setString(3, repositoryName);
679 updateMessageBody.execute();
680
681 } finally {
682 theJDBCUtil.closeJDBCStatement(updateMessageBody);
683 }
684 }
685
686
687 } else {
688
689 PreparedStatement insertMessage = null;
690 try {
691 String insertMessageSQL = sqlQueries.getSqlString("insertMessageSQL", true);
692 int number_of_parameters = getNumberOfParameters (insertMessageSQL);
693 insertMessage =
694 conn.prepareStatement(insertMessageSQL);
695 insertMessage.setString(1, mc.getName());
696 insertMessage.setString(2, repositoryName);
697 insertMessage.setString(3, mc.getState());
698 insertMessage.setString(4, mc.getErrorMessage());
699 if (mc.getSender() == null) {
700 insertMessage.setNull(5, java.sql.Types.VARCHAR);
701 } else {
702 insertMessage.setString(5, mc.getSender().toString());
703 }
704 StringBuffer recipients = new StringBuffer();
705 for (Iterator i = mc.getRecipients().iterator(); i.hasNext(); ) {
706 recipients.append(i.next().toString());
707 if (i.hasNext()) {
708 recipients.append("\r\n");
709 }
710 }
711 insertMessage.setString(6, recipients.toString());
712 insertMessage.setString(7, mc.getRemoteHost());
713 insertMessage.setString(8, mc.getRemoteAddr());
714 insertMessage.setTimestamp(9, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
715
716 MessageInputStream is = new MessageInputStream(mc, sr, inMemorySizeLimit);
717
718 insertMessage.setBinaryStream(10, is, (int) is.getSize());
719
720
721 if (number_of_parameters > 10) {
722 ByteArrayOutputStream baos = new ByteArrayOutputStream();
723 ObjectOutputStream oos = new ObjectOutputStream(baos);
724 try {
725 if (mc instanceof MailImpl) {
726 oos.writeObject(((MailImpl)mc).getAttributesRaw());
727 } else {
728 HashMap temp = new HashMap();
729 for (Iterator i = mc.getAttributeNames(); i.hasNext(); ) {
730 String hashKey = (String) i.next();
731 temp.put(hashKey,mc.getAttribute(hashKey));
732 }
733 oos.writeObject(temp);
734 }
735 oos.flush();
736 ByteArrayInputStream attrInputStream =
737 new ByteArrayInputStream(baos.toByteArray());
738 insertMessage.setBinaryStream(11, attrInputStream, baos.size());
739 } finally {
740 try {
741 if (oos != null) {
742 oos.close();
743 }
744 } catch (IOException ioe) {
745 getLogger().debug("JDBCMailRepository: Unexpected exception while closing output stream.",ioe);
746 }
747 }
748 }
749
750 insertMessage.execute();
751 } finally {
752 theJDBCUtil.closeJDBCStatement(insertMessage);
753 }
754 }
755
756
757 conn.commit();
758 conn.setAutoCommit(true);
759
760 } catch (Exception e) {
761 getLogger().error("Exception caught while storing mail Container",e);
762 throw new MessagingException("Exception caught while storing mail Container: ",e);
763 } finally {
764 theJDBCUtil.closeJDBCConnection(conn);
765 if (!wasLocked) {
766
767 unlock(key);
768 synchronized (this) {
769 notify();
770 }
771 }
772 }
773 }
774
775 /***
776 * Retrieves a message given a key. At the moment, keys can be obtained
777 * from list()
778 *
779 * @param key the key of the message to retrieve
780 * @return the mail corresponding to this key, null if none exists
781 */
782 public Mail retrieve(String key) throws MessagingException {
783 if (DEEP_DEBUG) {
784 System.err.println("retrieving " + key);
785 }
786 Connection conn = null;
787 PreparedStatement retrieveMessage = null;
788 ResultSet rsMessage = null;
789 try {
790 conn = datasource.getConnection();
791 if (DEEP_DEBUG) {
792 System.err.println("got a conn " + key);
793 }
794
795 retrieveMessage =
796 conn.prepareStatement(sqlQueries.getSqlString("retrieveMessageSQL", true));
797 retrieveMessage.setString(1, key);
798 retrieveMessage.setString(2, repositoryName);
799 rsMessage = retrieveMessage.executeQuery();
800 if (DEEP_DEBUG) {
801 System.err.println("ran the query " + key);
802 }
803 if (!rsMessage.next()) {
804 if (getLogger().isDebugEnabled()) {
805 StringBuffer debugBuffer =
806 new StringBuffer(64)
807 .append("Did not find a record ")
808 .append(key)
809 .append(" in ")
810 .append(repositoryName);
811 getLogger().debug(debugBuffer.toString());
812 }
813 return null;
814 }
815
816 PreparedStatement retrieveMessageAttr = null;
817 HashMap attributes = null;
818 if (jdbcMailAttributesReady) {
819 String retrieveMessageAttrSql =
820 sqlQueries.getSqlString("retrieveMessageAttributesSQL", false);
821 ResultSet rsMessageAttr = null;
822 try {
823 retrieveMessageAttr =
824 conn.prepareStatement(retrieveMessageAttrSql);
825
826 retrieveMessageAttr.setString(1, key);
827 retrieveMessageAttr.setString(2, repositoryName);
828 rsMessageAttr = retrieveMessageAttr.executeQuery();
829
830 if (rsMessageAttr.next()) {
831 try {
832 byte[] serialized_attr = null;
833 String getAttributesOption = sqlQueries.getDbOption("getAttributes");
834 if (getAttributesOption != null && (getAttributesOption.equalsIgnoreCase("useBlob") || getAttributesOption.equalsIgnoreCase("useBinaryStream"))) {
835 Blob b = rsMessageAttr.getBlob(1);
836 serialized_attr = b.getBytes(1, (int)b.length());
837 } else {
838 serialized_attr = rsMessageAttr.getBytes(1);
839 }
840
841 if (serialized_attr != null) {
842 ByteArrayInputStream bais = new ByteArrayInputStream(serialized_attr);
843 ObjectInputStream ois = new ObjectInputStream(bais);
844 attributes = (HashMap)ois.readObject();
845 ois.close();
846 }
847 } catch (IOException ioe) {
848 if (getLogger().isDebugEnabled()) {
849 StringBuffer debugBuffer =
850 new StringBuffer(64)
851 .append("Exception reading attributes ")
852 .append(key)
853 .append(" in ")
854 .append(repositoryName);
855 getLogger().debug(debugBuffer.toString(), ioe);
856 }
857 }
858 } else {
859 if (getLogger().isDebugEnabled()) {
860 StringBuffer debugBuffer =
861 new StringBuffer(64)
862 .append("Did not find a record (attributes) ")
863 .append(key)
864 .append(" in ")
865 .append(repositoryName);
866 getLogger().debug(debugBuffer.toString());
867 }
868 }
869 } catch (SQLException sqle) {
870 StringBuffer errorBuffer = new StringBuffer(256)
871 .append("Error retrieving message")
872 .append(sqle.getMessage())
873 .append(sqle.getErrorCode())
874 .append(sqle.getSQLState())
875 .append(sqle.getNextException());
876 getLogger().error(errorBuffer.toString());
877 } finally {
878 theJDBCUtil.closeJDBCResultSet(rsMessageAttr);
879 theJDBCUtil.closeJDBCStatement(retrieveMessageAttr);
880 }
881 }
882
883 MailImpl mc = new MailImpl();
884 mc.setAttributesRaw (attributes);
885 mc.setName(key);
886 mc.setState(rsMessage.getString(1));
887 mc.setErrorMessage(rsMessage.getString(2));
888 String sender = rsMessage.getString(3);
889 if (sender == null) {
890 mc.setSender(null);
891 } else {
892 mc.setSender(new MailAddress(sender));
893 }
894 StringTokenizer st = new StringTokenizer(rsMessage.getString(4), "\r\n", false);
895 Set recipients = new HashSet();
896 while (st.hasMoreTokens()) {
897 recipients.add(new MailAddress(st.nextToken()));
898 }
899 mc.setRecipients(recipients);
900 mc.setRemoteHost(rsMessage.getString(5));
901 mc.setRemoteAddr(rsMessage.getString(6));
902 mc.setLastUpdated(rsMessage.getTimestamp(7));
903
904 MimeMessageJDBCSource source = new MimeMessageJDBCSource(this, key, sr);
905 MimeMessageCopyOnWriteProxy message = new MimeMessageCopyOnWriteProxy(source);
906 mc.setMessage(message);
907 return mc;
908 } catch (SQLException sqle) {
909 StringBuffer errorBuffer = new StringBuffer(256)
910 .append("Error retrieving message")
911 .append(sqle.getMessage())
912 .append(sqle.getErrorCode())
913 .append(sqle.getSQLState())
914 .append(sqle.getNextException());
915 getLogger().error(errorBuffer.toString());
916 throw new MessagingException("Exception while retrieving mail: " + sqle.getMessage());
917 } catch (Exception me) {
918 throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
919 } finally {
920 theJDBCUtil.closeJDBCResultSet(rsMessage);
921 theJDBCUtil.closeJDBCStatement(retrieveMessage);
922 theJDBCUtil.closeJDBCConnection(conn);
923 }
924 }
925
926 /***
927 * Removes a specified message
928 *
929 * @param mail the message to be removed from the repository
930 */
931 public void remove(Mail mail) throws MessagingException {
932 remove(mail.getName());
933 }
934
935 /***
936 * Removes a Collection of mails from the repository
937 * @param mails The Collection of <code>MailImpl</code>'s to delete
938 * @throws MessagingException
939 * @since 2.2.0
940 */
941 public void remove(Collection mails) throws MessagingException {
942 Iterator delList = mails.iterator();
943 while (delList.hasNext()) {
944 remove((Mail)delList.next());
945 }
946 }
947
948 /***
949 * Removes a message identified by a key.
950 *
951 * @param key the key of the message to be removed from the repository
952 */
953 public void remove(String key) throws MessagingException {
954
955 if (lock(key)) {
956 Connection conn = null;
957 PreparedStatement removeMessage = null;
958 try {
959 conn = datasource.getConnection();
960 removeMessage =
961 conn.prepareStatement(sqlQueries.getSqlString("removeMessageSQL", true));
962 removeMessage.setString(1, key);
963 removeMessage.setString(2, repositoryName);
964 removeMessage.execute();
965
966 if (sr != null) {
967 sr.remove(key);
968 }
969 } catch (Exception me) {
970 throw new MessagingException("Exception while removing mail: " + me.getMessage());
971 } finally {
972 theJDBCUtil.closeJDBCStatement(removeMessage);
973 theJDBCUtil.closeJDBCConnection(conn);
974 unlock(key);
975 }
976 }
977 }
978
979 /***
980 * Gets a list of message keys stored in this repository.
981 *
982 * @return an Iterator of the message keys
983 */
984 public Iterator list() throws MessagingException {
985
986 Connection conn = null;
987 PreparedStatement listMessages = null;
988 ResultSet rsListMessages = null;
989 try {
990 conn = datasource.getConnection();
991 listMessages =
992 conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
993 listMessages.setString(1, repositoryName);
994 rsListMessages = listMessages.executeQuery();
995
996 List messageList = new ArrayList();
997 while (rsListMessages.next() && !Thread.currentThread().isInterrupted()) {
998 messageList.add(rsListMessages.getString(1));
999 }
1000 return messageList.iterator();
1001 } catch (Exception me) {
1002 throw new MessagingException("Exception while listing mail: " + me.getMessage());
1003 } finally {
1004 theJDBCUtil.closeJDBCResultSet(rsListMessages);
1005 theJDBCUtil.closeJDBCStatement(listMessages);
1006 theJDBCUtil.closeJDBCConnection(conn);
1007 }
1008 }
1009
1010 /***
1011 * Gets the SQL connection to be used by this JDBCMailRepository
1012 *
1013 * @return the connection
1014 * @throws SQLException if there is an issue with getting the connection
1015 */
1016 protected Connection getConnection() throws SQLException {
1017 return datasource.getConnection();
1018 }
1019
1020 /***
1021 * @see java.lang.Object#equals(Object)
1022 */
1023 public boolean equals(Object obj) {
1024 if (!(obj instanceof JDBCMailRepository)) {
1025 return false;
1026 }
1027
1028
1029 JDBCMailRepository repository = (JDBCMailRepository)obj;
1030 return ((repository.tableName == tableName) || ((repository.tableName != null) && repository.tableName.equals(tableName))) &&
1031 ((repository.repositoryName == repositoryName) || ((repository.repositoryName != null) && repository.repositoryName.equals(repositoryName)));
1032 }
1033
1034 /***
1035 * Provide a hash code that is consistent with equals for this class
1036 *
1037 * @return the hash code
1038 */
1039 public int hashCode() {
1040 int result = 17;
1041 if (tableName != null) {
1042 result = 37 * tableName.hashCode();
1043 }
1044 if (repositoryName != null) {
1045 result = 37 * repositoryName.hashCode();
1046 }
1047 return result;
1048 }
1049
1050 /***
1051 * This method calculates number of parameters in a prepared statement SQL String.
1052 * It does so by counting the number of '?' in the string
1053 * @param sqlstring to return parameter count for
1054 * @return number of parameters
1055 **/
1056 private int getNumberOfParameters (String sqlstring) {
1057
1058
1059 char[] chars = sqlstring.toCharArray();
1060 int count = 0;
1061 for (int i = 0; i < chars.length; i++) {
1062 count += chars[i]=='?' ? 1 : 0;
1063 }
1064 return count;
1065 }
1066 }