1 /****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20
21
22 package org.apache.james.mailrepository;
23
24 import org.apache.james.services.SpoolRepository;
25 import org.apache.mailet.Mail;
26
27 import java.util.ConcurrentModificationException;
28 import java.util.Iterator;
29
30 /**
31 * Implementation of a MailRepository on a FileSystem.
32 *
33 * Requires a configuration element in the .conf.xml file of the form:
34 * <repository destinationURL="file://path-to-root-dir-for-repository"
35 * type="MAIL"
36 * model="SYNCHRONOUS"/>
37 * Requires a logger called MailRepository.
38 *
39 * @version 1.0.0, 24/04/1999
40 */
41 public class AvalonSpoolRepository
42 extends AvalonMailRepository
43 implements SpoolRepository {
44
45 /**
46 * <p>Returns an arbitrarily selected mail deposited in this Repository.
47 * Usage: SpoolManager calls accept() to see if there are any unprocessed
48 * mails in the spool repository.</p>
49 *
50 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
51 *
52 * @return the mail
53 */
54 public synchronized Mail accept() throws InterruptedException {
55 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
56 getLogger().debug("Method accept() called");
57 }
58 return accept(new SpoolRepository.AcceptFilter () {
59 public boolean accept (String _, String __, long ___, String ____) {
60 return true;
61 }
62
63 public long getWaitTime () {
64 return 0;
65 }
66 });
67 }
68
69 /**
70 * <p>Returns an arbitrarily selected mail deposited in this Repository that
71 * is either ready immediately for delivery, or is younger than it's last_updated plus
72 * the number of failed attempts times the delay time.
73 * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
74 * unprocessed mail is available.</p>
75 *
76 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
77 *
78 * @return the mail
79 */
80 public synchronized Mail accept(final long delay) throws InterruptedException
81 {
82 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
83 getLogger().debug("Method accept(delay) called");
84 }
85 return accept(new SpoolRepository.AcceptFilter () {
86 long youngest = 0;
87
88 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
89 if (state.equals(Mail.ERROR)) {
90 //Test the time...
91 long timeToProcess = delay + lastUpdated;
92
93 if (System.currentTimeMillis() > timeToProcess) {
94 //We're ready to process this again
95 return true;
96 } else {
97 //We're not ready to process this.
98 if (youngest == 0 || youngest > timeToProcess) {
99 //Mark this as the next most likely possible mail to process
100 youngest = timeToProcess;
101 }
102 return false;
103 }
104 } else {
105 //This mail is good to go... return the key
106 return true;
107 }
108 }
109
110 public long getWaitTime () {
111 if (youngest == 0) {
112 return 0;
113 } else {
114 long duration = youngest - System.currentTimeMillis();
115 youngest = 0; //get ready for next round
116 return duration <= 0 ? 1 : duration;
117 }
118 }
119 });
120 }
121
122
123 /**
124 * Returns an arbitrarily select mail deposited in this Repository for
125 * which the supplied filter's accept method returns true.
126 * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
127 * based on number of retries if the mail is ready for processing.
128 * If no message is ready the method will block until one is, the amount of time to block is
129 * determined by calling the filters getWaitTime method.
130 *
131 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
132 *
133 * @return the mail
134 */
135 public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
136 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
137 getLogger().debug("Method accept(Filter) called");
138 }
139 while (!Thread.currentThread().isInterrupted()) try {
140 for (Iterator it = list(); it.hasNext(); ) {
141 String s = it.next().toString();
142 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
143 StringBuffer logBuffer =
144 new StringBuffer(64)
145 .append("Found item ")
146 .append(s)
147 .append(" in spool.");
148 getLogger().debug(logBuffer.toString());
149 }
150 if (lock(s)) {
151 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
152 getLogger().debug("accept(Filter) has locked: " + s);
153 }
154 try {
155 Mail mail = retrieve(s);
156 // Retrieve can return null if the mail is no longer on the spool
157 // (i.e. another thread has gotten to it first).
158 // In this case we simply continue to the next key
159 if (mail == null || !filter.accept (mail.getName(),
160 mail.getState(),
161 mail.getLastUpdated().getTime(),
162 mail.getErrorMessage())) {
163 unlock(s);
164 continue;
165 }
166 return mail;
167 } catch (javax.mail.MessagingException e) {
168 unlock(s);
169 getLogger().error("Exception during retrieve -- skipping item " + s, e);
170 }
171 }
172 }
173
174 //We did not find any... let's wait for a certain amount of time
175 wait (filter.getWaitTime());
176 } catch (InterruptedException ex) {
177 throw ex;
178 } catch (ConcurrentModificationException cme) {
179 // Should never get here now that list methods clones keyset for iterator
180 getLogger().error("CME in spooler - please report to http://james.apache.org", cme);
181 }
182 throw new InterruptedException();
183 }
184
185 }