View Javadoc

1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.jspf.executor;
21  
22  import org.apache.james.jspf.core.DNSLookupContinuation;
23  import org.apache.james.jspf.core.DNSResponse;
24  import org.apache.james.jspf.core.Logger;
25  import org.apache.james.jspf.core.SPFChecker;
26  import org.apache.james.jspf.core.SPFCheckerExceptionCatcher;
27  import org.apache.james.jspf.core.SPFSession;
28  import org.apache.james.jspf.core.exceptions.SPFResultException;
29  import org.apache.james.jspf.core.exceptions.TimeoutException;
30  
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.LinkedList;
34  import java.util.List;
35  import java.util.Map;
36  
37  /**
38   * Async implementation of SPFExecutor
39   *
40   */
41  public class StagedMultipleSPFExecutor implements SPFExecutor, Runnable {
42  
43      private static final String ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION = "StagedMultipleSPFExecutor.continuation";
44  
45      private static class ResponseQueueImpl extends LinkedList implements IResponseQueue {
46  
47          private static final long serialVersionUID = 5714025260393791651L;
48          
49          private int waitingThreads = 0;
50  
51          /**
52           * @see org.apache.james.jspf.executor.IResponseQueue#insertResponse(org.apache.james.jspf.executor.IResponse)
53           */
54          public synchronized void insertResponse(IResponse r) {
55              addLast(r);
56              notify();
57          }
58  
59          /**
60           * @see org.apache.james.jspf.executor.IResponseQueue#removeResponse()
61           */
62          public synchronized IResponse removeResponse() {
63              if ( (size() - waitingThreads <= 0) ) {
64                  try { waitingThreads++; wait();}
65                  catch (InterruptedException e)  {Thread.interrupted();}
66                  waitingThreads--;
67              }
68              return (IResponse)removeFirst();        }
69  
70      }
71  
72      // Use short as id because the id header is limited to 16 bit
73      // From RFC1035 4.1.1. Header section format :
74      // 
75      // ID              A 16 bit identifier assigned by the program that
76      //                 generates any kind of query.  This identifier is copied
77      //                 the corresponding reply and can be used by the requester
78      //                 to match up replies to outstanding queries.
79      //
80      private static short id;
81      
82      private synchronized int nextId() {
83          return id++;
84      }
85      
86      private Logger log;
87      private DNSAsynchLookupService dnsProbe;
88      private Thread worker;
89      private Map sessions;
90      private Map results;
91      private ResponseQueueImpl responseQueue;
92  
93      public StagedMultipleSPFExecutor(Logger log, DNSAsynchLookupService service) {
94          this.log = log;
95          this.dnsProbe = service;
96  
97          this.responseQueue = new ResponseQueueImpl();
98  
99          this.sessions = Collections.synchronizedMap(new HashMap());
100         this.results = Collections.synchronizedMap(new HashMap());
101 
102         this.worker = new Thread(this);
103         this.worker.setDaemon(true);
104         this.worker.setName("SPFExecutor");
105         this.worker.start();
106     }
107 
108     /**
109      * Execute the non-blocking part of the processing and returns.
110      * If the working queue is full (50 pending responses) this method will not return
111      * until the queue is again not full.
112      * 
113      * @see org.apache.james.jspf.executor.SPFExecutor#execute(org.apache.james.jspf.core.SPFSession, org.apache.james.jspf.executor.FutureSPFResult)
114      */
115     public void execute(SPFSession session, FutureSPFResult result) {
116         execute(session, result, true);
117     }
118         
119     public void execute(SPFSession session, FutureSPFResult result, boolean throttle) {
120         SPFChecker checker;
121         while ((checker = session.popChecker()) != null) {
122             // only execute checkers we added (better recursivity)
123             log.debug("Executing checker: " + checker);
124             try {
125                 DNSLookupContinuation cont = checker.checkSPF(session);
126                 // if the checker returns a continuation we return it
127                 if (cont != null) {
128                     invokeAsynchService(session, result, cont, throttle);
129                     return;
130                 }
131             } catch (Exception e) {
132                 while (e != null) {
133                     while (checker == null || !(checker instanceof SPFCheckerExceptionCatcher)) {
134                         checker = session.popChecker();
135                     }
136                     try {
137                         ((SPFCheckerExceptionCatcher) checker).onException(e, session);
138                         e = null;
139                     } catch (SPFResultException ex) {
140                         e = ex;
141                     } finally {
142                         checker = null;
143                     }
144                 }
145             }
146         }
147         result.setSPFResult(session);
148     }
149 
150     /**
151      * throttle should be true only when the caller thread is the client and not the worker thread.
152      * We could even remove the throttle parameter and check the currentThread.
153      * This way the worker is never "blocked" while outside callers will be blocked if our
154      * queue is too big (so this is not fully "asynchronous").
155      */
156     private synchronized void invokeAsynchService(SPFSession session,
157             FutureSPFResult result, DNSLookupContinuation cont, boolean throttle) {
158         while (throttle && results.size() > 50) {
159             try {
160                 this.wait(100);
161             } catch (InterruptedException e) {
162             }
163         }
164         int nextId = nextId();
165         sessions.put(new Integer(nextId), session);
166         results.put(new Integer(nextId), result);
167         session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION, cont);
168         dnsProbe.getRecordsAsynch(cont.getRequest(), nextId, responseQueue);
169     }
170 
171     public void run() {
172 
173         while (true) {
174             
175             IResponse resp = responseQueue.removeResponse();
176             
177             Integer respId = (Integer) resp.getId();
178             SPFSession session = (SPFSession) sessions.remove(respId);
179             FutureSPFResult result = (FutureSPFResult) results.remove(respId);
180             
181             DNSLookupContinuation cont = (DNSLookupContinuation) session.getAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION);
182             
183             DNSResponse response;
184             if (resp.getException() != null) {
185                 response = new DNSResponse((TimeoutException) resp.getException());
186             } else {
187                 response = new DNSResponse((List) resp.getValue());
188             }
189             
190             
191             try {
192                 cont = cont.getListener().onDNSResponse(response, session);
193                 
194                 if (cont != null) {
195                     invokeAsynchService(session, result, cont, false);
196                 } else {
197                     execute(session, result, false);
198                 }
199 
200             } catch (Exception e) {
201                 SPFChecker checker = null;
202                 while (e != null) {
203                     while (checker == null || !(checker instanceof SPFCheckerExceptionCatcher)) {
204                         checker = session.popChecker();
205                     }
206                     try {
207                         ((SPFCheckerExceptionCatcher) checker).onException(e, session);
208                         e = null;
209                     } catch (SPFResultException ex) {
210                         e = ex;
211                     } finally {
212                         checker = null;
213                     }
214                 }
215                 execute(session, result, false);
216             }
217         }
218     }
219 
220 }