1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20 package org.apache.james.mailrepository;
21
22 import org.apache.james.services.SpoolRepository;
23 import org.apache.mailet.Mail;
24
25 import java.util.ConcurrentModificationException;
26 import java.util.Iterator;
27
28 /***
29 * Implementation of a MailRepository on a FileSystem.
30 *
31 * Requires a configuration element in the .conf.xml file of the form:
32 * <repository destinationURL="file://path-to-root-dir-for-repository"
33 * type="MAIL"
34 * model="SYNCHRONOUS"/>
35 * Requires a logger called MailRepository.
36 *
37 * @version 1.0.0, 24/04/1999
38 */
39 public class AvalonSpoolRepository
40 extends AvalonMailRepository
41 implements SpoolRepository {
42
43 /***
44 * <p>Returns an arbitrarily selected mail deposited in this Repository.
45 * Usage: SpoolManager calls accept() to see if there are any unprocessed
46 * mails in the spool repository.</p>
47 *
48 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
49 *
50 * @return the mail
51 */
52 public synchronized Mail accept() throws InterruptedException {
53 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
54 getLogger().debug("Method accept() called");
55 }
56 return accept(new SpoolRepository.AcceptFilter () {
57 public boolean accept (String _, String __, long ___, String ____) {
58 return true;
59 }
60
61 public long getWaitTime () {
62 return 0;
63 }
64 });
65 }
66
67 /***
68 * <p>Returns an arbitrarily selected mail deposited in this Repository that
69 * is either ready immediately for delivery, or is younger than it's last_updated plus
70 * the number of failed attempts times the delay time.
71 * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
72 * unprocessed mail is available.</p>
73 *
74 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
75 *
76 * @return the mail
77 */
78 public synchronized Mail accept(final long delay) throws InterruptedException
79 {
80 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
81 getLogger().debug("Method accept(delay) called");
82 }
83 return accept(new SpoolRepository.AcceptFilter () {
84 long youngest = 0;
85
86 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
87 if (state.equals(Mail.ERROR)) {
88
89 long timeToProcess = delay + lastUpdated;
90
91 if (System.currentTimeMillis() > timeToProcess) {
92
93 return true;
94 } else {
95
96 if (youngest == 0 || youngest > timeToProcess) {
97
98 youngest = timeToProcess;
99 }
100 return false;
101 }
102 } else {
103
104 return true;
105 }
106 }
107
108 public long getWaitTime () {
109 if (youngest == 0) {
110 return 0;
111 } else {
112 long duration = youngest - System.currentTimeMillis();
113 youngest = 0;
114 return duration <= 0 ? 1 : duration;
115 }
116 }
117 });
118 }
119
120
121 /***
122 * Returns an arbitrarily select mail deposited in this Repository for
123 * which the supplied filter's accept method returns true.
124 * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
125 * based on number of retries if the mail is ready for processing.
126 * If no message is ready the method will block until one is, the amount of time to block is
127 * determined by calling the filters getWaitTime method.
128 *
129 * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
130 *
131 * @return the mail
132 */
133 public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
134 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
135 getLogger().debug("Method accept(Filter) called");
136 }
137 while (!Thread.currentThread().isInterrupted()) try {
138 for (Iterator it = list(); it.hasNext(); ) {
139 String s = it.next().toString();
140 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
141 StringBuffer logBuffer =
142 new StringBuffer(64)
143 .append("Found item ")
144 .append(s)
145 .append(" in spool.");
146 getLogger().debug(logBuffer.toString());
147 }
148 if (lock(s)) {
149 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
150 getLogger().debug("accept(Filter) has locked: " + s);
151 }
152 try {
153 Mail mail = retrieve(s);
154
155
156
157 if (mail == null || !filter.accept (mail.getName(),
158 mail.getState(),
159 mail.getLastUpdated().getTime(),
160 mail.getErrorMessage())) {
161 unlock(s);
162 continue;
163 }
164 return mail;
165 } catch (javax.mail.MessagingException e) {
166 unlock(s);
167 getLogger().error("Exception during retrieve -- skipping item " + s, e);
168 }
169 }
170 }
171
172
173 wait (filter.getWaitTime());
174 } catch (InterruptedException ex) {
175 throw ex;
176 } catch (ConcurrentModificationException cme) {
177
178 getLogger().error("CME in spooler - please report to http://james.apache.org", cme);
179 }
180 throw new InterruptedException();
181 }
182
183 }