1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.james.transport.mailets;
21
22 import org.apache.avalon.cornerstone.services.store.Store;
23 import org.apache.avalon.framework.configuration.DefaultConfiguration;
24 import org.apache.avalon.framework.container.ContainerUtil;
25 import org.apache.avalon.framework.service.ServiceException;
26 import org.apache.avalon.framework.service.ServiceManager;
27 import org.apache.james.Constants;
28 import org.apache.james.services.SpoolRepository;
29 import org.apache.mailet.base.GenericMailet;
30 import org.apache.mailet.Mail;
31 import org.apache.mailet.MailetContext;
32 import org.apache.oro.text.regex.MalformedPatternException;
33 import org.apache.oro.text.regex.MatchResult;
34 import org.apache.oro.text.regex.Pattern;
35 import org.apache.oro.text.regex.Perl5Compiler;
36 import org.apache.oro.text.regex.Perl5Matcher;
37
38 import javax.mail.MessagingException;
39
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Date;
43 import java.util.HashMap;
44 import java.util.Iterator;
45 import java.util.Locale;
46 import java.util.StringTokenizer;
47 import java.util.Vector;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 public class Retry extends GenericMailet implements Runnable {
84
85 private static final String RETRY_COUNT = "RETRY_COUNT";
86
87
88 public static final String ORIGINAL_ERROR = "originalError";
89
90
91 private static final long DEFAULT_DELAY_TIME = 21600000;
92
93
94 private static final String PATTERN_STRING = "\\s*([0-9]*\\s*[\\*])?\\s*([0-9]+)\\s*([a-z,A-Z]*)\\s*";
95
96
97 private static Pattern PATTERN = null;
98
99
100
101
102 private static final HashMap MULTIPLIERS = new HashMap(10);
103
104
105
106
107
108 static {
109 try {
110 Perl5Compiler compiler = new Perl5Compiler();
111 PATTERN = compiler.compile(PATTERN_STRING,
112 Perl5Compiler.READ_ONLY_MASK);
113 } catch (MalformedPatternException mpe) {
114
115 System.err.println("Malformed pattern: " + PATTERN_STRING);
116 mpe.printStackTrace(System.err);
117 }
118
119
120 MULTIPLIERS.put("msec", new Integer(1));
121 MULTIPLIERS.put("msecs", new Integer(1));
122 MULTIPLIERS.put("sec", new Integer(1000));
123 MULTIPLIERS.put("secs", new Integer(1000));
124 MULTIPLIERS.put("minute", new Integer(1000 * 60));
125 MULTIPLIERS.put("minutes", new Integer(1000 * 60));
126 MULTIPLIERS.put("hour", new Integer(1000 * 60 * 60));
127 MULTIPLIERS.put("hours", new Integer(1000 * 60 * 60));
128 MULTIPLIERS.put("day", new Integer(1000 * 60 * 60 * 24));
129 MULTIPLIERS.put("days", new Integer(1000 * 60 * 60 * 24));
130 }
131
132
133
134
135
136
137 private class MultipleDelayFilter implements SpoolRepository.AcceptFilter {
138
139
140
141
142 long youngest = 0;
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160 public boolean accept(String key, String state, long lastUpdated,
161 String errorMessage) {
162 int retries = Integer.parseInt(errorMessage);
163
164 long delay = getNextDelay(retries);
165 long timeToProcess = delay + lastUpdated;
166
167 if (System.currentTimeMillis() > timeToProcess) {
168
169 return true;
170 } else {
171
172 if (youngest == 0 || youngest > timeToProcess) {
173
174
175 youngest = timeToProcess;
176 }
177 return false;
178 }
179 }
180
181
182
183
184
185
186 public long getWaitTime() {
187 if (youngest == 0) {
188 return 0;
189 } else {
190 long duration = youngest - System.currentTimeMillis();
191 youngest = 0;
192 return duration <= 0 ? 1 : duration;
193 }
194 }
195 }
196
197
198 private boolean isDebug = false;
199
200
201 private SpoolRepository workRepository;
202
203
204 private long[] delayTimes;
205
206
207 private int maxRetries = 5;
208
209
210 private int workersThreadCount = 1;
211
212
213 private Collection workersThreads = new Vector();
214
215
216 private String retryProcessor = Mail.DEFAULT;
217
218
219
220
221 private String errorProcessor = Mail.ERROR;
222
223
224 private volatile boolean destroyed = false;
225
226
227
228 private Perl5Matcher delayTimeMatcher;
229
230
231 private MultipleDelayFilter delayFilter = new MultipleDelayFilter();
232
233
234 private String workRepositoryPath = null;
235
236
237
238
239
240
241
242
243 public void init() throws MessagingException {
244
245 isDebug = (getInitParameter("debug") == null) ? false : new Boolean(getInitParameter("debug")).booleanValue();
246
247
248 ArrayList delayTimesList = new ArrayList();
249 try {
250 if (getInitParameter("delayTime") != null) {
251 delayTimeMatcher = new Perl5Matcher();
252 String delayTimesParm = getInitParameter("delayTime");
253
254
255 StringTokenizer st = new StringTokenizer (delayTimesParm,",");
256 while (st.hasMoreTokens()) {
257 String delayTime = st.nextToken();
258 delayTimesList.add (new Delay(delayTime));
259 }
260 } else {
261
262 delayTimesList.add(new Delay());
263 }
264 } catch (Exception e) {
265 log("Invalid delayTime setting: " + getInitParameter("delayTime"));
266 }
267
268 try {
269
270 if (getInitParameter("maxRetries") != null) {
271 maxRetries = Integer.parseInt(getInitParameter("maxRetries"));
272 }
273
274
275 int totalAttempts = calcTotalAttempts(delayTimesList);
276
277
278 if (totalAttempts > maxRetries) {
279 log("Total number of delayTime attempts exceeds maxRetries specified. "
280 + " Increasing maxRetries from "
281 + maxRetries
282 + " to "
283 + totalAttempts);
284 maxRetries = totalAttempts;
285 } else {
286 int extra = maxRetries - totalAttempts;
287 if (extra != 0) {
288 log("maxRetries is larger than total number of attempts specified. "
289 + "Increasing last delayTime with "
290 + extra
291 + " attempts ");
292
293
294 if (delayTimesList.size() != 0) {
295
296 Delay delay = (Delay) delayTimesList.get(delayTimesList
297 .size() - 1);
298
299
300 delay.setAttempts(delay.getAttempts() + extra);
301 log("Delay of " + delay.getDelayTime()
302 + " msecs is now attempted: " + delay.getAttempts()
303 + " times");
304 } else {
305 throw new MessagingException(
306 "No delaytimes, cannot continue");
307 }
308 }
309 }
310 delayTimes = expandDelays(delayTimesList);
311 } catch (Exception e) {
312 log("Invalid maxRetries setting: " + getInitParameter("maxRetries"));
313 }
314
315 ServiceManager compMgr = (ServiceManager) getMailetContext()
316 .getAttribute(Constants.AVALON_COMPONENT_MANAGER);
317
318
319
320
321
322 workRepositoryPath = getInitParameter("retryRepository");
323 if (workRepositoryPath == null) {
324 workRepositoryPath = "file://var/mail/retry/";
325 }
326
327 try {
328
329 Store mailstore = (Store) compMgr.lookup(Store.ROLE);
330
331 DefaultConfiguration spoolConf = new DefaultConfiguration(
332 "repository", "generated:Retry");
333 spoolConf.setAttribute("destinationURL", workRepositoryPath);
334 spoolConf.setAttribute("type", "SPOOL");
335 workRepository = (SpoolRepository) mailstore.select(spoolConf);
336 } catch (ServiceException cnfe) {
337 log("Failed to retrieve Store component:" + cnfe.getMessage());
338 throw new MessagingException("Failed to retrieve Store component",
339 cnfe);
340 }
341
342
343 workersThreadCount = Integer.parseInt(getInitParameter("retryThreads"));
344 for (int i = 0; i < workersThreadCount; i++) {
345 String threadName = "Retry thread (" + i + ")";
346 Thread t = new Thread(this, threadName);
347 t.start();
348 workersThreads.add(t);
349 }
350
351
352 String processor = getInitParameter("retryProcessor");
353 retryProcessor = (processor == null) ? Mail.DEFAULT : processor;
354
355
356 processor = getInitParameter("errorProcessor");
357 errorProcessor = (processor == null) ? Mail.ERROR : processor;
358 }
359
360
361
362
363
364
365
366
367 private int calcTotalAttempts (ArrayList delayList) {
368 int sum = 0;
369 Iterator i = delayList.iterator();
370 while (i.hasNext()) {
371 Delay delay = (Delay)i.next();
372 sum += delay.getAttempts();
373 }
374 return sum;
375 }
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399 private long[] expandDelays(ArrayList delayList) {
400 long[] delays = new long[calcTotalAttempts(delayList)];
401 int idx = 0;
402 for (int i = 0; i < delayList.size(); i++) {
403 for (int j = 0; j < ((Delay) delayList.get(i)).getAttempts(); j++) {
404 delays[idx++] = ((Delay) delayList.get(i)).getDelayTime();
405 }
406 }
407 return delays;
408 }
409
410
411
412
413
414
415
416
417 private long getNextDelay(int retryCount) {
418 if (retryCount > delayTimes.length) {
419 return DEFAULT_DELAY_TIME;
420 }
421 return delayTimes[retryCount];
422 }
423
424
425
426
427
428
429 private class Delay {
430 private int attempts = 1;
431
432 private long delayTime = DEFAULT_DELAY_TIME;
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448 public Delay(String initString) throws MessagingException {
449
450 String unit = "msec";
451
452 if (delayTimeMatcher.matches(initString, PATTERN)) {
453 MatchResult res = delayTimeMatcher.getMatch();
454
455
456
457
458
459 if (res.group(1) != null && !res.group(1).equals("")) {
460
461 String attemptMatch = res.group(1);
462
463
464 attemptMatch = attemptMatch.substring(0,
465 attemptMatch.length() - 1).trim();
466 attempts = Integer.parseInt(attemptMatch);
467 }
468
469 delayTime = Long.parseLong(res.group(2));
470
471 if (!res.group(3).equals("")) {
472
473 unit = res.group(3).toLowerCase(Locale.US);
474 }
475 } else {
476 throw new MessagingException(initString + " does not match "
477 + PATTERN_STRING);
478 }
479
480
481 if (MULTIPLIERS.get(unit) != null) {
482 int multiplier = ((Integer) MULTIPLIERS.get(unit)).intValue();
483 delayTime *= multiplier;
484 } else {
485 throw new MessagingException("Unknown unit: " + unit);
486 }
487 }
488
489
490
491
492
493 public Delay() {
494 }
495
496
497
498
499 public long getDelayTime() {
500 return delayTime;
501 }
502
503
504
505
506 public int getAttempts() {
507 return attempts;
508 }
509
510
511
512
513 public void setAttempts(int value) {
514 attempts = value;
515 }
516
517
518
519
520 public String toString() {
521 String message = getAttempts() + "*" + getDelayTime() + "msecs";
522 return message;
523 }
524 }
525
526 public String getMailetInfo() {
527 return "Retry Mailet";
528 }
529
530
531
532
533
534
535
536
537
538
539
540
541
542 public void service(Mail mail) throws MessagingException {
543 if (isDebug) {
544 log("Retrying mail " + mail.getName());
545 }
546
547
548 mail.setAttribute(ORIGINAL_ERROR, mail.getErrorMessage());
549
550
551
552
553
554 String retryCount = (String) mail.getAttribute(RETRY_COUNT);
555 if (retryCount == null) {
556 retryCount = "0";
557 }
558 mail.setErrorMessage(retryCount);
559
560 int retries = Integer.parseInt(retryCount);
561 String message = "";
562
563
564
565 if (retries < maxRetries) {
566 message = "Storing " + mail.getMessage().getMessageID()
567 + " to retry repository " + workRepositoryPath
568 + ", retry " + retries;
569 log(message);
570
571 mail.setAttribute(RETRY_COUNT, retryCount);
572 workRepository.store(mail);
573 mail.setState(Mail.GHOST);
574 } else {
575
576 message = "Sending " + mail.getMessage().getMessageID()
577 + " to error processor after retrying " + retries
578 + " times.";
579 log(message);
580 mail.setState(errorProcessor);
581 MailetContext mc = getMailetContext();
582 try {
583 message = "Message failed after " + retries
584 + " retries with error " + "message: "
585 + mail.getAttribute(ORIGINAL_ERROR);
586 mail.setErrorMessage(message);
587 mc.sendMail(mail);
588 } catch (MessagingException e) {
589
590
591 log("Exception re-inserting failed mail: ", e);
592 throw new MessagingException(
593 "Exception encountered while bouncing "
594 + "mail in Retry process.", e);
595 }
596 }
597 }
598
599
600
601
602
603 public synchronized void destroy() {
604
605 destroyed = true;
606
607
608 for (Iterator i = workersThreads.iterator(); i.hasNext(); ) {
609 Thread t = (Thread)i.next();
610 t.interrupt();
611 }
612 notifyAll();
613 }
614
615
616
617
618
619 public void run() {
620 try {
621 while (!Thread.interrupted() && !destroyed) {
622 try {
623
624
625
626
627
628
629
630 Mail mail = workRepository.accept(delayFilter);
631 String key = mail.getName();
632 try {
633 if (isDebug) {
634 String message = Thread.currentThread().getName()
635 + " will process mail " + key;
636 log(message);
637 }
638
639
640 if (retry(mail)) {
641
642
643 workRepository.remove(key);
644 } else {
645
646
647 workRepository.store(mail);
648 ContainerUtil.dispose(mail);
649
650
651
652 workRepository.unlock(key);
653
654
655
656
657
658
659 }
660
661
662
663 mail = null;
664 } catch (Exception e) {
665
666
667
668
669
670
671 ContainerUtil.dispose(mail);
672 workRepository.remove(key);
673 throw e;
674 }
675 } catch (Throwable e) {
676 if (!destroyed) {
677 log("Exception caught in Retry.run()", e);
678 }
679 }
680 }
681 } finally {
682
683 Thread.interrupted();
684 }
685 }
686
687
688
689
690
691
692
693
694
695 private boolean retry(Mail mail) {
696 if (isDebug) {
697 log("Attempting to deliver " + mail.getName());
698 }
699
700
701 int retries = Integer.parseInt((String) mail.getAttribute(RETRY_COUNT));
702 ++retries;
703 mail.setErrorMessage(retries + "");
704 mail.setAttribute(RETRY_COUNT, String.valueOf(retries));
705 mail.setLastUpdated(new Date());
706
707
708 preprocess(mail);
709
710
711 mail.setState(retryProcessor);
712 MailetContext mc = getMailetContext();
713 try {
714 String message = "Retrying message "
715 + mail.getMessage().getMessageID() + ". Attempt #: "
716 + retries;
717 log(message);
718 mc.sendMail(mail);
719 } catch (MessagingException e) {
720
721
722 log("Exception while retrying message. ", e);
723 return false;
724 }
725 return true;
726 }
727
728
729
730
731
732
733
734
735
736
737
738
739 protected void preprocess(Mail mail) {
740 }
741 }