1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.james.mpt;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.nio.ByteBuffer;
25 import java.nio.CharBuffer;
26 import java.nio.channels.ServerSocketChannel;
27 import java.nio.channels.SocketChannel;
28 import java.nio.charset.Charset;
29 import java.util.Collection;
30 import java.util.Iterator;
31 import java.util.LinkedList;
32 import java.util.Queue;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36
37
38
39
40 public class DiscardProtocol {
41
42 private static final Charset ASCII = Charset.forName("US-ASCII");
43
44 private static final int SOCKET_CONNECTION_WAIT_MILLIS = 30;
45
46 private static final int IDLE_TIMEOUT = 120000;
47
48 private static final Log LOG = LogFactory.getLog(DiscardProtocol.class);
49
50
51 private final int port;
52
53
54
55
56
57 private final Queue<Server> queue;
58
59 private final Collection<Server> runningServers;
60
61
62
63
64
65 private volatile ServerSocketChannel socket;
66
67
68 public DiscardProtocol(final int port) {
69 super();
70 this.port = port;
71 queue = new LinkedList<Server>();
72 runningServers = new LinkedList<Server>();
73 }
74
75
76
77
78
79
80 public void start() throws IOException {
81 synchronized (queue)
82 {
83 if (socket == null) {
84 socket = ServerSocketChannel.open();
85 socket.socket().bind(new InetSocketAddress(port));
86
87 socket.configureBlocking(false);
88
89 final Thread socketMonitorThread = new Thread(new SocketMonitor());
90 socketMonitorThread.start();
91
92 } else {
93 throw new IllegalStateException("Already started");
94 }
95 }
96 }
97
98
99 public Record recordNext() {
100 synchronized (queue)
101 {
102 Server server = new Server();
103 queue.add(server);
104 return server;
105 }
106 }
107
108 private void abort() {
109 synchronized (queue)
110 {
111 stop();
112 for (Iterator it=queue.iterator();it.hasNext();) {
113 final Server server = (Server) it.next();
114 server.abort();
115 }
116 queue.clear();
117 }
118 }
119
120
121
122
123
124 public void stop() {
125 synchronized (queue)
126 {
127 try {
128 if (socket != null) {
129 if (socket.isOpen()) {
130 socket.close();
131 }
132 }
133 } catch (IOException e) {
134 LOG.warn("Failed to close socket", e);
135 }
136 socket = null;
137 for (Iterator it = runningServers.iterator(); it.hasNext();) {
138 final Server server = (Server) it.next();
139 server.abort();
140 }
141 }
142 }
143
144 private final class SocketMonitor implements Runnable {
145 public void run() {
146 try
147 {
148 long lastConnection = System.currentTimeMillis();
149 while(socket != null) {
150 final SocketChannel socketChannel = socket.accept();
151 if (socketChannel == null) {
152 if (System.currentTimeMillis() - lastConnection > IDLE_TIMEOUT) {
153 throw new Exception ("Idle timeout");
154 }
155 Thread.sleep(SOCKET_CONNECTION_WAIT_MILLIS);
156 } else {
157 synchronized(queue) {
158 Server nextServer = (Server) queue.poll();
159 if (nextServer == null) {
160 nextServer = new Server();
161 }
162 nextServer.setSocketChannel(socketChannel);
163
164 final Thread channelThread = new Thread(nextServer);
165 channelThread.start();
166 runningServers.add(nextServer);
167 lastConnection = System.currentTimeMillis();
168 }
169 }
170 }
171 } catch (Exception e) {
172 LOG.fatal("Cannot accept connection", e);
173 abort();
174 }
175 }
176 }
177
178 public interface Record {
179
180 public String complete() throws Exception;
181 }
182
183
184
185
186 private final static class Server implements Runnable, Record {
187
188 private static final int COMPLETION_TIMEOUT = 60000;
189
190 private static final int COMPLETION_PAUSE = 1000;
191
192 private static final int INITIAL_BUFFER_CAPACITY = 2048;
193
194 private final ByteBuffer buffer;
195
196
197
198 private final StringBuffer out;
199
200
201
202
203 private SocketChannel socketChannel;
204
205 private volatile boolean aborted;
206 private volatile boolean complete;
207
208 public Server() {
209 complete = false;
210 out = new StringBuffer(INITIAL_BUFFER_CAPACITY);
211 buffer = ByteBuffer.allocate(INITIAL_BUFFER_CAPACITY);
212 aborted = false;
213 socketChannel = null;
214 }
215
216
217 public SocketChannel getSocketChannel() {
218 return socketChannel;
219 }
220
221 public void setSocketChannel(SocketChannel socketChannel) {
222 this.socketChannel = socketChannel;
223 }
224
225 public void run() {
226 try
227 {
228 if (socketChannel == null)
229 {
230 LOG.fatal("Socket channel must be set before instance is run.");
231 }
232 else
233 {
234 try {
235 while(!socketChannel.finishConnect()) {
236 Thread.sleep(SOCKET_CONNECTION_WAIT_MILLIS);
237 }
238
239 int read = 0;
240 while(!aborted && socketChannel.isOpen() && read >= 0) {
241 read = socketChannel.read(buffer);
242 if (!buffer.hasRemaining()) {
243 decant();
244 }
245 }
246
247 } catch (Exception e) {
248 LOG.fatal("Socket communication failed", e);
249 aborted = true;
250
251
252 } finally {
253 try {
254 socketChannel.close();
255 } catch (Exception e) {
256 LOG.debug("Ignoring failure to close socket.", e);
257 }
258 }
259 }
260 } finally {
261 synchronized (this)
262 {
263
264 complete = true;
265
266 notifyAll();
267 }
268 }
269 }
270
271
272
273
274
275 private void decant() {
276 buffer.flip();
277 final CharBuffer decoded = ASCII.decode(buffer);
278 out.append(decoded);
279 buffer.clear();
280 }
281
282
283 public void abort() {
284 aborted = true;
285 }
286
287
288
289
290 public synchronized String complete() throws Exception {
291 if (aborted) {
292 throw new Exception("Aborted");
293 }
294 final long startTime = System.currentTimeMillis();
295 boolean isTimedOut = false;
296 while (!complete && !isTimedOut) {
297 wait(COMPLETION_PAUSE);
298 isTimedOut = (System.currentTimeMillis() - startTime) > COMPLETION_TIMEOUT;
299 }
300 if (isTimedOut && !complete) {
301 throw new Exception("Timed out wait for be notified that read is complete");
302 }
303 decant();
304 return out.toString();
305 }
306 }
307 }