1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.james.jspf.executor;
21
22 import org.apache.james.jspf.core.DNSLookupContinuation;
23 import org.apache.james.jspf.core.DNSResponse;
24 import org.apache.james.jspf.core.Logger;
25 import org.apache.james.jspf.core.SPFChecker;
26 import org.apache.james.jspf.core.SPFCheckerExceptionCatcher;
27 import org.apache.james.jspf.core.SPFSession;
28 import org.apache.james.jspf.core.exceptions.SPFResultException;
29 import org.apache.james.jspf.core.exceptions.TimeoutException;
30
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.Map;
36
37
38
39
40
41 public class StagedMultipleSPFExecutor implements SPFExecutor, Runnable {
42
43 private static final String ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION = "StagedMultipleSPFExecutor.continuation";
44
45 private static class ResponseQueueImpl extends LinkedList implements IResponseQueue {
46
47 private static final long serialVersionUID = 5714025260393791651L;
48
49 private int waitingThreads = 0;
50
51
52
53
54 public synchronized void insertResponse(IResponse r) {
55 addLast(r);
56 notify();
57 }
58
59
60
61
62 public synchronized IResponse removeResponse() {
63 if ( (size() - waitingThreads <= 0) ) {
64 try { waitingThreads++; wait();}
65 catch (InterruptedException e) {Thread.interrupted();}
66 waitingThreads--;
67 }
68 return (IResponse)removeFirst(); }
69
70 }
71
72
73
74
75
76
77
78
79
80 private static short id;
81
82 private synchronized int nextId() {
83 return id++;
84 }
85
86 private Logger log;
87 private DNSAsynchLookupService dnsProbe;
88 private Thread worker;
89 private Map sessions;
90 private Map results;
91 private ResponseQueueImpl responseQueue;
92
93 public StagedMultipleSPFExecutor(Logger log, DNSAsynchLookupService service) {
94 this.log = log;
95 this.dnsProbe = service;
96
97 this.responseQueue = new ResponseQueueImpl();
98
99 this.sessions = Collections.synchronizedMap(new HashMap());
100 this.results = Collections.synchronizedMap(new HashMap());
101
102 this.worker = new Thread(this);
103 this.worker.setDaemon(true);
104 this.worker.setName("SPFExecutor");
105 this.worker.start();
106 }
107
108
109
110
111
112
113
114
115 public void execute(SPFSession session, FutureSPFResult result) {
116 execute(session, result, true);
117 }
118
119 public void execute(SPFSession session, FutureSPFResult result, boolean throttle) {
120 SPFChecker checker;
121 while ((checker = session.popChecker()) != null) {
122
123 log.debug("Executing checker: " + checker);
124 try {
125 DNSLookupContinuation cont = checker.checkSPF(session);
126
127 if (cont != null) {
128 invokeAsynchService(session, result, cont, throttle);
129 return;
130 }
131 } catch (Exception e) {
132 while (e != null) {
133 while (checker == null || !(checker instanceof SPFCheckerExceptionCatcher)) {
134 checker = session.popChecker();
135 }
136 try {
137 ((SPFCheckerExceptionCatcher) checker).onException(e, session);
138 e = null;
139 } catch (SPFResultException ex) {
140 e = ex;
141 } finally {
142 checker = null;
143 }
144 }
145 }
146 }
147 result.setSPFResult(session);
148 }
149
150
151
152
153
154
155
156 private synchronized void invokeAsynchService(SPFSession session,
157 FutureSPFResult result, DNSLookupContinuation cont, boolean throttle) {
158 while (throttle && results.size() > 50) {
159 try {
160 this.wait(100);
161 } catch (InterruptedException e) {
162 }
163 }
164 int nextId = nextId();
165 sessions.put(new Integer(nextId), session);
166 results.put(new Integer(nextId), result);
167 session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION, cont);
168 dnsProbe.getRecordsAsynch(cont.getRequest(), nextId, responseQueue);
169 }
170
171 public void run() {
172
173 while (true) {
174
175 IResponse resp = responseQueue.removeResponse();
176
177 Integer respId = (Integer) resp.getId();
178 SPFSession session = (SPFSession) sessions.remove(respId);
179 FutureSPFResult result = (FutureSPFResult) results.remove(respId);
180
181 DNSLookupContinuation cont = (DNSLookupContinuation) session.getAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION);
182
183 DNSResponse response;
184 if (resp.getException() != null) {
185 response = new DNSResponse((TimeoutException) resp.getException());
186 } else {
187 response = new DNSResponse((List) resp.getValue());
188 }
189
190
191 try {
192 cont = cont.getListener().onDNSResponse(response, session);
193
194 if (cont != null) {
195 invokeAsynchService(session, result, cont, false);
196 } else {
197 execute(session, result, false);
198 }
199
200 } catch (Exception e) {
201 SPFChecker checker = null;
202 while (e != null) {
203 while (checker == null || !(checker instanceof SPFCheckerExceptionCatcher)) {
204 checker = session.popChecker();
205 }
206 try {
207 ((SPFCheckerExceptionCatcher) checker).onException(e, session);
208 e = null;
209 } catch (SPFResultException ex) {
210 e = ex;
211 } finally {
212 checker = null;
213 }
214 }
215 execute(session, result, false);
216 }
217 }
218 }
219
220 }