1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.apache.james.mailrepository;
23
24 import org.apache.james.services.SpoolRepository;
25 import org.apache.mailet.Mail;
26
27 import java.util.ConcurrentModificationException;
28 import java.util.Iterator;
29
30
31
32
33
34
35
36
37
38
39
40
41 public class AvalonSpoolRepository
42 extends AvalonMailRepository
43 implements SpoolRepository {
44
45
46
47
48
49
50
51
52
53
54 public synchronized Mail accept() throws InterruptedException {
55 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
56 getLogger().debug("Method accept() called");
57 }
58 return accept(new SpoolRepository.AcceptFilter () {
59 public boolean accept (String _, String __, long ___, String ____) {
60 return true;
61 }
62
63 public long getWaitTime () {
64 return 0;
65 }
66 });
67 }
68
69
70
71
72
73
74
75
76
77
78
79
80 public synchronized Mail accept(final long delay) throws InterruptedException
81 {
82 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
83 getLogger().debug("Method accept(delay) called");
84 }
85 return accept(new SpoolRepository.AcceptFilter () {
86 long youngest = 0;
87
88 public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
89 if (state.equals(Mail.ERROR)) {
90
91 long timeToProcess = delay + lastUpdated;
92
93 if (System.currentTimeMillis() > timeToProcess) {
94
95 return true;
96 } else {
97
98 if (youngest == 0 || youngest > timeToProcess) {
99
100 youngest = timeToProcess;
101 }
102 return false;
103 }
104 } else {
105
106 return true;
107 }
108 }
109
110 public long getWaitTime () {
111 if (youngest == 0) {
112 return 0;
113 } else {
114 long duration = youngest - System.currentTimeMillis();
115 youngest = 0;
116 return duration <= 0 ? 1 : duration;
117 }
118 }
119 });
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135 public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
136 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
137 getLogger().debug("Method accept(Filter) called");
138 }
139 while (!Thread.currentThread().isInterrupted()) try {
140 for (Iterator it = list(); it.hasNext(); ) {
141 String s = it.next().toString();
142 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
143 StringBuffer logBuffer =
144 new StringBuffer(64)
145 .append("Found item ")
146 .append(s)
147 .append(" in spool.");
148 getLogger().debug(logBuffer.toString());
149 }
150 if (lock(s)) {
151 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
152 getLogger().debug("accept(Filter) has locked: " + s);
153 }
154 try {
155 Mail mail = retrieve(s);
156
157
158
159 if (mail == null || !filter.accept (mail.getName(),
160 mail.getState(),
161 mail.getLastUpdated().getTime(),
162 mail.getErrorMessage())) {
163 unlock(s);
164 continue;
165 }
166 return mail;
167 } catch (javax.mail.MessagingException e) {
168 unlock(s);
169 getLogger().error("Exception during retrieve -- skipping item " + s, e);
170 }
171 }
172 }
173
174
175 wait (filter.getWaitTime());
176 } catch (InterruptedException ex) {
177 throw ex;
178 } catch (ConcurrentModificationException cme) {
179
180 getLogger().error("CME in spooler - please report to http://james.apache.org", cme);
181 }
182 throw new InterruptedException();
183 }
184
185 }