1   /****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  package org.apache.james.mpt;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.nio.ByteBuffer;
25  import java.nio.CharBuffer;
26  import java.nio.channels.ServerSocketChannel;
27  import java.nio.channels.SocketChannel;
28  import java.nio.charset.Charset;
29  import java.util.Collection;
30  import java.util.Iterator;
31  import java.util.LinkedList;
32  import java.util.Queue;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  
37  /**
38   * Simple <a href='http://tools.ietf.org/html/rfc863'>RFC 863</a> implementation.
39   */
40  public class DiscardProtocol {
41  
42      private static final Charset ASCII = Charset.forName("US-ASCII");
43      
44      private static final int SOCKET_CONNECTION_WAIT_MILLIS = 30;
45      
46      private static final int IDLE_TIMEOUT = 120000;
47  
48      private static final Log LOG = LogFactory.getLog(DiscardProtocol.class);
49      
50      /** Serve on this port */
51      private final int port;
52      
53      /** 
54       * Queues requests for recordings.
55       * Also, used as lock.
56       */
57      private final Queue<Server> queue;
58      
59      private final Collection<Server> runningServers;
60      
61      /** 
62       * Server socket when started, null otherwise.
63       * Null indicates to the socket serving thread that the server is stopped.
64       */
65      private volatile ServerSocketChannel socket;
66      
67  
68      public DiscardProtocol(final int port) {
69          super();
70          this.port = port;
71          queue = new LinkedList<Server>();
72          runningServers = new LinkedList<Server>();
73      }
74      
75      /**
76       * Starts serving.
77       * @throws IOException when connection fails
78       * @throws IllegalStateException when already started
79       */
80      public void start() throws IOException {
81          synchronized (queue)
82          {
83              if (socket == null) {
84                  socket = ServerSocketChannel.open();
85                  socket.socket().bind(new InetSocketAddress(port));
86                  // only going to record a single conversation
87                  socket.configureBlocking(false);
88                  
89                  final Thread socketMonitorThread = new Thread(new SocketMonitor());
90                  socketMonitorThread.start();
91                  
92              } else {
93                  throw new IllegalStateException("Already started");
94              }
95          }
96      }
97      
98      
99      public Record recordNext() {
100         synchronized (queue)
101         {
102             Server server = new Server();
103             queue.add(server);
104             return server;
105         }
106     }
107     
108     private void abort() {
109         synchronized (queue)
110         {
111             stop();
112             for (Iterator it=queue.iterator();it.hasNext();) {
113                 final Server server = (Server) it.next();
114                 server.abort();
115             }
116             queue.clear();
117         }
118     }
119     
120     /**
121      * Stops serving.
122      * @return ASCII bytes sent to socket by first
123      */
124     public void stop() {
125         synchronized (queue)
126         {
127             try {
128                 if (socket != null) {
129                     if (socket.isOpen()) {
130                         socket.close();
131                     }
132                 }
133             } catch (IOException e) {
134                 LOG.warn("Failed to close socket", e);
135             }
136             socket = null;
137             for (Iterator it = runningServers.iterator(); it.hasNext();) {
138                 final Server server = (Server) it.next();
139                 server.abort();
140             }
141         }
142     }
143     
144     private final class SocketMonitor implements Runnable {
145         public void run() {
146             try
147             {
148                 long lastConnection = System.currentTimeMillis();
149                 while(socket != null) {
150                     final SocketChannel socketChannel = socket.accept();
151                     if (socketChannel == null) {
152                         if (System.currentTimeMillis() - lastConnection > IDLE_TIMEOUT) {
153                             throw new Exception ("Idle timeout");
154                         }
155                         Thread.sleep(SOCKET_CONNECTION_WAIT_MILLIS);
156                     } else {
157                         synchronized(queue) {
158                             Server nextServer = (Server) queue.poll();
159                             if (nextServer == null) {
160                                 nextServer = new Server();
161                             }
162                             nextServer.setSocketChannel(socketChannel);
163                             
164                             final Thread channelThread = new Thread(nextServer);
165                             channelThread.start();
166                             runningServers.add(nextServer);
167                             lastConnection = System.currentTimeMillis();
168                         }
169                     }
170                 }
171             } catch (Exception e) {
172                 LOG.fatal("Cannot accept connection", e);
173                 abort();
174             }
175         }
176     }
177 
178     public interface Record {
179         /** Blocks until completion of conversation */
180         public String complete() throws Exception;
181     }
182     
183     /**
184      * Basic server.
185      */
186     private final static class Server implements Runnable, Record {
187 
188         private static final int COMPLETION_TIMEOUT = 60000;
189 
190         private static final int COMPLETION_PAUSE = 1000;
191 
192         private static final int INITIAL_BUFFER_CAPACITY = 2048;
193         
194         private final ByteBuffer buffer;
195         /**
196          * Safe for concurrent access by multiple threads.
197          */
198         private final StringBuffer out;
199         
200         /**
201          * Initialised by setter 
202          */
203         private SocketChannel socketChannel;
204         
205         private volatile boolean aborted;
206         private volatile boolean complete;
207         
208         public Server() {
209             complete = false;
210             out = new StringBuffer(INITIAL_BUFFER_CAPACITY);
211             buffer = ByteBuffer.allocate(INITIAL_BUFFER_CAPACITY);
212             aborted = false;
213             socketChannel = null;
214         }
215         
216         
217         public SocketChannel getSocketChannel() {
218             return socketChannel;
219         }
220         
221         public void setSocketChannel(SocketChannel socketChannel) {
222             this.socketChannel = socketChannel;
223         }
224 
225         public void run() {
226             try
227             {
228                 if (socketChannel == null)
229                 {
230                     LOG.fatal("Socket channel must be set before instance is run.");
231                 }
232                 else
233                 {
234                     try {
235                         while(!socketChannel.finishConnect()) {
236                             Thread.sleep(SOCKET_CONNECTION_WAIT_MILLIS);
237                         }
238                         
239                         int read = 0;
240                         while(!aborted && socketChannel.isOpen() && read >= 0) {
241                             read = socketChannel.read(buffer);
242                             if (!buffer.hasRemaining()) {
243                                 decant();
244                             }
245                         }
246                         
247                     } catch (Exception e) {
248                         LOG.fatal("Socket communication failed", e);
249                         aborted = true;
250                         
251                     // Tidy up
252                     } finally {
253                         try {
254                             socketChannel.close();
255                         } catch (Exception e) {
256                             LOG.debug("Ignoring failure to close socket.", e);
257                         }
258                     }
259                 }
260             } finally {
261                 synchronized (this)
262                 {
263                     // Ensure completion is flagged
264                     complete = true;
265                     // Signal to any waiting threads 
266                     notifyAll();
267                 }
268             }
269         }
270 
271         /**
272          * Transfers all data from buffer to builder
273          *
274          */
275         private void decant() {
276             buffer.flip();
277             final CharBuffer decoded = ASCII.decode(buffer);
278             out.append(decoded);
279             buffer.clear();
280         }
281 
282 
283         public void abort() {
284             aborted = true;
285         }
286         
287         /**
288          * Blocks until connection is complete (closed)
289          */
290         public synchronized String complete() throws Exception {
291             if (aborted) {
292                 throw new Exception("Aborted");
293             }
294             final long startTime = System.currentTimeMillis();
295             boolean isTimedOut = false;
296             while (!complete  && !isTimedOut) {
297                 wait(COMPLETION_PAUSE);
298                 isTimedOut = (System.currentTimeMillis() - startTime) > COMPLETION_TIMEOUT;
299             }
300             if (isTimedOut && !complete) {
301                 throw new Exception("Timed out wait for be notified that read is complete");
302             }
303             decant();
304             return out.toString();
305         }        
306     }
307 }