1 /*****************************************************************
2 * Licensed to the Apache Software Foundation (ASF) under one *
3 * or more contributor license agreements. See the NOTICE file *
4 * distributed with this work for additional information *
5 * regarding copyright ownership. The ASF licenses this file *
6 * to you under the Apache License, Version 2.0 (the *
7 * "License"); you may not use this file except in compliance *
8 * with the License. You may obtain a copy of the License at *
9 * *
10 * http://www.apache.org/licenses/LICENSE-2.0 *
11 * *
12 * Unless required by applicable law or agreed to in writing, *
13 * software distributed under the License is distributed on an *
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15 * KIND, either express or implied. See the License for the *
16 * specific language governing permissions and limitations *
17 * under the License. *
18 ****************************************************************/
19
20
21 package org.apache.james.postage;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.james.postage.client.POP3Client;
26 import org.apache.james.postage.client.RemoteManagerClient;
27 import org.apache.james.postage.client.SMTPClient;
28 import org.apache.james.postage.configuration.MailSender;
29 import org.apache.james.postage.configuration.PostageConfiguration;
30 import org.apache.james.postage.configuration.SendProfile;
31 import org.apache.james.postage.execution.SampleController;
32 import org.apache.james.postage.jmx.JVMResourceSampler;
33 import org.apache.james.postage.result.PostageRunnerResult;
34 import org.apache.james.postage.result.PostageRunnerResultImpl;
35 import org.apache.james.postage.smtpserver.SMTPMailSink;
36
37 import java.io.File;
38 import java.util.ArrayList;
39 import java.util.Iterator;
40 import java.util.LinkedHashSet;
41 import java.util.List;
42 import java.util.Set;
43
44 /***
45 * central controlling class for the testing process. starts all workers, collects data and stops when time is out.<br/>
46 * relates to one and only one Scenario section from the configuration file.
47 */
48 public class PostageRunner implements Runnable {
49
50 private static Log log = LogFactory.getLog(PostageRunner.class);
51
52 public static final int PHASE_CREATED = 0;
53 public static final int PHASE_STARTING = 1;
54 public static final int PHASE_RUNNING = 2;
55 public static final int PHASE_ABORTED = 3;
56 public static final int PHASE_COMPLETED = 4;
57
58 private int m_currentPhase = PHASE_CREATED;
59
60 private final PostageConfiguration m_postageConfiguration;
61 private final PostageRunnerResult m_results = new PostageRunnerResultImpl();
62
63 private POP3Client m_inboundMailingChecker;
64 private SampleController m_inboundMailingController;
65
66 private SMTPMailSink m_smtpMailSink;
67 private SampleController m_outboundMailingInterceptorController;
68
69 private List m_sendControllers = new ArrayList();
70
71 private JVMResourceSampler m_jvmResourceSampler = null;
72 private SampleController m_jvmResourceController = null;
73
74 private int m_minutesRunning = 0;
75
76 /***
77 * defines the prefix for every mail id generated by the current runner, so they can be distinguished from
78 * next runs. the runner instance is responsable to initializing this value!
79 * NOTE: this construct becomes a problem when two runners are running in parallel
80 */
81 private static String m_messageIdPrefix = null;
82
83 public static String getMessageIdPrefix() {
84 return m_messageIdPrefix + "-";
85 }
86
87 /***
88 * sends messages to James in two ways:
89 * 1. internal users relay to internal or external users using (inbound) SMTP
90 * 2. external users send mail to internal users using (inbound) SMTP
91 *
92 * the correct mail delivery is checked in two ways:
93 * 1. by checking internal users mails using POP3
94 * 2. by checking mail to external users by receiving all mail forwarded by James to outbound/forwarded SMTP
95 *
96 * @param postageConfiguration
97 */
98 public PostageRunner(PostageConfiguration postageConfiguration) {
99 m_messageIdPrefix = "" + System.currentTimeMillis();
100
101 m_postageConfiguration = postageConfiguration;
102
103 int totalMailsPerMin = m_postageConfiguration.getTotalMailsPerMin();
104 int durationMinutes = m_postageConfiguration.getDurationMinutes();
105
106 m_postageConfiguration.addDescriptionItem("mails_per_min", "" + totalMailsPerMin);
107 m_postageConfiguration.addDescriptionItem("totally_running_min", "" + durationMinutes);
108 m_postageConfiguration.addDescriptionItem("totally_mails_target", "" + totalMailsPerMin * durationMinutes);
109
110 m_results.setEnvironmentDescription(m_postageConfiguration.getDescriptionItems());
111 }
112
113 private void execute() {
114 if (m_postageConfiguration != null) m_currentPhase = PHASE_STARTING;
115
116
117 try {
118 setupInternalUserAccounts();
119 setupExternalUserAccounts();
120 setupInboundMailing();
121 setupInboundMailingChecker();
122 setupForwardedMailInterceptor();
123 setupJMXRemoting();
124 prepareResultFile(getCanonicalMailResultFileName());
125 prepareResultFile(getCanonicalJVMStatisticsFileName());
126 prepareResultFile(getCanonicalErrorsFileName());
127 } catch (StartupException e) {
128 log.fatal("could not even start the runner successfully", e);
129 return;
130 }
131
132
133 m_currentPhase = PHASE_RUNNING;
134
135 log.info("starting scenario " + m_postageConfiguration.getId());
136
137
138 startTimer();
139
140
141 try {
142 recordData();
143 } catch (Exception e) {
144 log.error("recording data was aborted!", e);
145 }
146
147
148
149
150
151 log.info("completing by writing data for scenario " + m_postageConfiguration.getId());
152 writeData(false);
153 }
154
155 private void prepareResultFile(String canonicalMailResultFileName) {
156 File writeCandidate = new File(canonicalMailResultFileName);
157 if (writeCandidate.exists()) {
158
159
160 writeCandidate.renameTo(new File(canonicalMailResultFileName + "." + writeCandidate.lastModified()));
161 }
162 }
163
164 /***
165 * for checking the running status of this PostageRunner.
166 * @return one of the values PHASE_CREATED (0), PHASE_STARTING (1), PHASE_RUNNING (2), PHASE_ABORTED (3), PHASE_COMPLETED (4)
167 */
168 public int getCurrentPhase() {
169 return m_currentPhase;
170 }
171
172 public void run() {
173 execute();
174 }
175
176 public PostageRunnerResult getResult() {
177 return m_results;
178 }
179
180 /***
181 * set up a thread issueing one-minute events and finally shutting down data recording when time has run out.
182 */
183 private void startTimer() {
184 new Thread(
185 new Runnable() {
186 public void run() {
187 try {
188 int durationMinutes = m_postageConfiguration.getDurationMinutes();
189 log.info("running for " + durationMinutes + " minute(s)");
190 for (int i = 0; i < durationMinutes; i++) {
191 Thread.sleep(60*1000);
192 oneMinuteCheckpoint();
193 }
194 stopRecording();
195 } catch (InterruptedException e) {
196 ;
197 }
198 }
199
200 }
201 ).start();
202 }
203
204 /***
205 * called after each fully completed minute in the running phase
206 */
207 private void oneMinuteCheckpoint() {
208 m_minutesRunning++;
209 log.info("reached checkpoint after " + m_minutesRunning + " of "
210 + m_postageConfiguration.getDurationMinutes() + " minute(s) running.");
211
212
213 writeData(true);
214 }
215
216 private void stopRecording() {
217 log.info("stopping");
218 if (m_sendControllers != null) {
219 Iterator iterator = m_sendControllers.iterator();
220 while (iterator.hasNext()) {
221 SampleController sendController = (SampleController)iterator.next();
222 sendController.stop();
223 }
224 }
225 if (m_inboundMailingController != null) m_inboundMailingController.stop();
226
227 if (m_outboundMailingInterceptorController != null) m_outboundMailingInterceptorController.stop();
228 m_currentPhase = PHASE_COMPLETED;
229 }
230
231 /***
232 * interrupt the runner from outside
233 */
234 public void terminate() {
235 stopRecording();
236 m_currentPhase = PHASE_ABORTED;
237 writeData(false);
238 }
239
240 private void writeData(boolean flushMatchedMailOnly) {
241 logElapsedData();
242
243 String filenameMailResult = getCanonicalMailResultFileName();
244 String filenameJVMStatistics = getCanonicalJVMStatisticsFileName();
245 String filenameErrors = getCanonicalErrorsFileName();
246 m_results.writeResults(filenameMailResult, filenameJVMStatistics, filenameErrors, flushMatchedMailOnly);
247 }
248
249 public String getCanonicalMailResultFileName() {
250 return "postage_mailResults." + m_postageConfiguration.getId() + ".csv";
251 }
252
253 public String getCanonicalJVMStatisticsFileName() {
254 return "postage_jvmStatistics." + m_postageConfiguration.getId() + ".csv";
255 }
256
257 public String getCanonicalErrorsFileName() {
258 return "postage_errors." + m_postageConfiguration.getId() + ".csv";
259 }
260
261 private void logElapsedData() {
262 log.info("unmatched messages: " + m_results.getUnmatchedMails());
263 log.info("matched messages: " + m_results.getMatchedMails());
264 log.info("valid matches: " + m_results.getValidMails());
265 log.info("recorded errors: " + m_results.getErrorCount());
266 }
267
268 private void recordData() {
269
270 Iterator iterator = m_sendControllers.iterator();
271 while (iterator.hasNext()) {
272 SampleController sendController = (SampleController)iterator.next();
273 sendController.runThreaded();
274 }
275
276 m_inboundMailingController = new SampleController(m_inboundMailingChecker, m_postageConfiguration.getTestserverPOP3FetchesPerMinute());
277 m_inboundMailingController.runThreaded();
278
279 m_outboundMailingInterceptorController = new SampleController(m_smtpMailSink, 10, m_postageConfiguration.getTestserverSMTPForwardingWaitSeconds());
280 m_outboundMailingInterceptorController.runThreaded();
281
282 if (m_jvmResourceSampler != null) {
283 m_jvmResourceController = new SampleController(m_jvmResourceSampler, 4);
284 m_jvmResourceController.runThreaded();
285 }
286
287 while(m_currentPhase == PHASE_RUNNING) {
288 try {
289 Thread.sleep(50);
290 } catch (InterruptedException e) {
291 ;
292 }
293 }
294
295 if (m_currentPhase == PHASE_COMPLETED) {
296
297 log.info("checking all internal accounts for unmatched mail...");
298 m_inboundMailingChecker.doMatchMailForAllUsers();
299 log.info("...done checking internal accounts");
300 } else {
301
302 log.info("skip checking internal accounts for unmatched mail.");
303 }
304 }
305
306 private void setupExternalUserAccounts() {
307 int externalUserCount = m_postageConfiguration.getExternalUsers().getCount();
308 String externalUsernamePrefix = m_postageConfiguration.getExternalUsers().getNamePrefix();
309
310 ArrayList externalUsers = new ArrayList();
311 for (int i = 1; i <= externalUserCount; i++) {
312 String username = externalUsernamePrefix + i;
313 externalUsers.add(username);
314 }
315 m_postageConfiguration.getExternalUsers().setExistingUsers(externalUsers);
316 }
317
318 /***
319 * sets up the profile where mail is send from external users to internal users
320 * @throws StartupException
321 */
322 private void setupInboundMailing() throws StartupException {
323 if (m_postageConfiguration.getTestserverPortSMTPInbound() <= 0) return;
324
325 Iterator profileIterator = m_postageConfiguration.getProfiles().iterator();
326 while (profileIterator.hasNext()) {
327 SendProfile sendProfile = (SendProfile)profileIterator.next();
328 Iterator mailSenderIterator = sendProfile.mailSenderIterator();
329 while (mailSenderIterator.hasNext()) {
330 MailSender mailSender = (MailSender)mailSenderIterator.next();
331 int sendPerMinute = mailSender.getSendPerMinute();
332
333 if (sendPerMinute < 1) continue;
334
335 SMTPClient smtpClient = new SMTPClient(m_postageConfiguration.getTestserverHost(),
336 m_postageConfiguration.getTestserverPortSMTPInbound(),
337 m_postageConfiguration.getInternalUsers(),
338 m_postageConfiguration.getExternalUsers(),
339 m_results,
340 mailSender
341 );
342
343 boolean available = smtpClient.checkAvailability();
344 log.info("availability of inbound mailing " + (available ? "": "NOT ") + "verified");
345 if (!available) continue;
346
347 SampleController sendController = new SampleController(smtpClient, sendPerMinute);
348 m_sendControllers.add(sendController);
349 }
350 }
351
352 }
353
354
355 /***
356 * sets up the part for checking accounts via POP3, which are then aligned with sent test mails
357 * @throws StartupException
358 */
359 private void setupInboundMailingChecker() throws StartupException {
360 if (m_postageConfiguration.getTestserverPortPOP3() <= 0) return;
361
362 m_inboundMailingChecker = new POP3Client(m_postageConfiguration.getTestserverHost(),
363 m_postageConfiguration.getTestserverPortPOP3(),
364 m_postageConfiguration.getInternalUsers(),
365 m_results
366 );
367 m_inboundMailingChecker.checkAvailability();
368 boolean available = m_inboundMailingChecker.checkAvailability();
369 if (available) {
370 log.info("availability of checking for inbound mailing (POP3) verified");
371 }
372 }
373
374 private void setupInternalUserAccounts() throws StartupException {
375 try {
376 String host = m_postageConfiguration.getTestserverHost();
377 int remoteManagerPort = m_postageConfiguration.getTestserverRemoteManagerPort();
378 String remoteManagerUsername = m_postageConfiguration.getTestserverRemoteManagerUsername();
379 String remoteManagerPassword = m_postageConfiguration.getTestserverRemoteManagerPassword();
380 int internalUserCount = m_postageConfiguration.getInternalUsers().getCount();
381 String internalUsernamePrefix = m_postageConfiguration.getInternalUsers().getNamePrefix();
382 String internalPassword = m_postageConfiguration.getInternalUsers().getPassword();
383
384 Set existingUsers = getExistingUsers(host, remoteManagerPort, remoteManagerUsername, remoteManagerPassword);
385
386 RemoteManagerClient remoteManagerClient = new RemoteManagerClient(host, remoteManagerPort, remoteManagerUsername, remoteManagerPassword);
387 remoteManagerClient.login();
388 ArrayList internalUsers = new ArrayList();
389 for (int i = 1; i <= internalUserCount; i++) {
390 String username = internalUsernamePrefix + i;
391 if (existingUsers.contains(username)) {
392 log.info("user already exists: " + username);
393 if (!m_postageConfiguration.isInternalReuseExisting()) {
394 remoteManagerClient.executeCommand("deluser " + username);
395 remoteManagerClient.readAnswer();
396 addUser(remoteManagerClient, username, internalPassword);
397 remoteManagerClient.readAnswer();
398 log.info("user deleted and re-created: " + username);
399 }
400 remoteManagerClient.executeCommand("setpassword " + username + " " + internalPassword);
401 remoteManagerClient.readAnswer();
402 } else {
403 addUser(remoteManagerClient, username, internalPassword);
404 }
405 internalUsers.add(username);
406 }
407 m_postageConfiguration.getInternalUsers().setExistingUsers(internalUsers);
408 remoteManagerClient.disconnect();
409 } catch (Exception e) {
410 throw new StartupException("error setting up internal user accounts", e);
411 }
412 }
413
414 private void setupForwardedMailInterceptor() throws StartupException {
415 SMTPMailSink smtpMailSink = new SMTPMailSink();
416 smtpMailSink.setSmtpListenerPort(m_postageConfiguration.getTestserverPortSMTPForwarding());
417 smtpMailSink.setResults(m_results);
418 try {
419 smtpMailSink.initialize();
420 } catch (Exception e) {
421 throw new StartupException("failed to setup",e);
422 }
423 m_smtpMailSink = smtpMailSink;
424 log.info("forwarded mail interceptor is set up.");
425 }
426
427
428 private void setupJMXRemoting() throws StartupException {
429 boolean jmxAvailable = JVMResourceSampler.isJMXAvailable();
430 int jmxPort = m_postageConfiguration.getTestserverPortJMXRemoting();
431 if (!jmxAvailable || jmxPort <= 0) {
432 return;
433 }
434 JVMResourceSampler jvmResourceSampler = new JVMResourceSampler("localhost", jmxPort, m_results);
435 try {
436 jvmResourceSampler.connectRemoteJamesJMXServer();
437 log.info("connected to remote JMX");
438 m_jvmResourceSampler = jvmResourceSampler;
439 } catch (Exception e) {
440 throw new StartupException("failed to setup JMX remoting for JVM resource sampling", e);
441 }
442 }
443
444 private void addUser(RemoteManagerClient remoteManagerClient, String username, String internalPassword) {
445 remoteManagerClient.executeCommand("adduser " + username + " " + internalPassword);
446 remoteManagerClient.readAnswer();
447 log.info("user created: " + username);
448 }
449
450 /***
451 * aquire a list of all existing internal James accounts
452 * @return Set<String>, each String a username
453 */
454 private Set getExistingUsers(String host, int remoteManagerPort, String remoteManagerUsername, String remoteManagerPassword) {
455 RemoteManagerClient remoteManagerClient = new RemoteManagerClient(host, remoteManagerPort, remoteManagerUsername, remoteManagerPassword);
456 boolean loginSuccess = remoteManagerClient.login();
457 if (!loginSuccess) throw new Error("failed to login to remote manager");
458 List rawUserList = remoteManagerClient.executeCommand("listusers");
459 remoteManagerClient.disconnect();
460
461 Set existingUsers = new LinkedHashSet();
462 Iterator iterator = rawUserList.iterator();
463 while (iterator.hasNext()) {
464 String line = (String)iterator.next();
465 if (!line.startsWith("user: ")) continue;
466
467 existingUsers.add(line.substring(6));
468 }
469 return existingUsers;
470 }
471
472 }