View Javadoc

1   /*****************************************************************
2    * Licensed to the Apache Software Foundation (ASF) under one   *
3    * or more contributor license agreements.  See the NOTICE file *
4    * distributed with this work for additional information        *
5    * regarding copyright ownership.  The ASF licenses this file   *
6    * to you under the Apache License, Version 2.0 (the            *
7    * "License"); you may not use this file except in compliance   *
8    * with the License.  You may obtain a copy of the License at   *
9    *                                                              *
10   *   http://www.apache.org/licenses/LICENSE-2.0                 *
11   *                                                              *
12   * Unless required by applicable law or agreed to in writing,   *
13   * software distributed under the License is distributed on an  *
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
15   * KIND, either express or implied.  See the License for the    *
16   * specific language governing permissions and limitations      *
17   * under the License.                                           *
18   ****************************************************************/
19  
20  
21  package org.apache.james.postage;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.james.postage.client.POP3Client;
26  import org.apache.james.postage.client.RemoteManagerClient;
27  import org.apache.james.postage.client.SMTPClient;
28  import org.apache.james.postage.configuration.MailSender;
29  import org.apache.james.postage.configuration.PostageConfiguration;
30  import org.apache.james.postage.configuration.SendProfile;
31  import org.apache.james.postage.execution.SampleController;
32  import org.apache.james.postage.jmx.JVMResourceSampler;
33  import org.apache.james.postage.result.PostageRunnerResult;
34  import org.apache.james.postage.result.PostageRunnerResultImpl;
35  import org.apache.james.postage.smtpserver.SMTPMailSink;
36  
37  import java.io.File;
38  import java.util.ArrayList;
39  import java.util.Iterator;
40  import java.util.LinkedHashSet;
41  import java.util.List;
42  import java.util.Set;
43  
44  /***
45   * central controlling class for the testing process. starts all workers, collects data and stops when time is out.<br/>
46   * relates to one and only one Scenario section from the configuration file.
47   */
48  public class PostageRunner implements Runnable {
49  
50      private static Log log = LogFactory.getLog(PostageRunner.class);
51  
52      public static final int PHASE_CREATED    = 0;
53      public static final int PHASE_STARTING   = 1;
54      public static final int PHASE_RUNNING    = 2;
55      public static final int PHASE_ABORTED    = 3;
56      public static final int PHASE_COMPLETED  = 4;
57  
58      private int m_currentPhase = PHASE_CREATED;
59  
60      private final PostageConfiguration m_postageConfiguration;
61      private final PostageRunnerResult m_results = new PostageRunnerResultImpl();
62  
63      private POP3Client m_inboundMailingChecker;
64      private SampleController m_inboundMailingController;
65  
66      private SMTPMailSink m_smtpMailSink;
67      private SampleController m_outboundMailingInterceptorController;
68  
69      private List m_sendControllers = new ArrayList();
70  
71      private JVMResourceSampler m_jvmResourceSampler = null;
72      private SampleController m_jvmResourceController = null;
73  
74      private int  m_minutesRunning = 0;
75  
76      /***
77       * defines the prefix for every mail id generated by the current runner, so they can be distinguished from
78       * next runs. the runner instance is responsable to initializing this value!
79       * NOTE: this construct becomes a problem when two runners are running in parallel
80       */
81      private static String m_messageIdPrefix = null;
82  
83      public static String getMessageIdPrefix() {
84          return m_messageIdPrefix + "-";
85      }
86  
87      /***
88       * sends messages to James in two ways:
89       * 1. internal users relay to internal or external users using (inbound) SMTP
90       * 2. external users send mail to internal users using (inbound) SMTP
91       *
92       * the correct mail delivery is checked in two ways:
93       * 1. by checking internal users mails using POP3
94       * 2. by checking mail to external users by receiving all mail forwarded by James to outbound/forwarded SMTP
95       *
96       * @param postageConfiguration
97       */
98      public PostageRunner(PostageConfiguration postageConfiguration) {
99          m_messageIdPrefix = "" + System.currentTimeMillis();
100 
101         m_postageConfiguration = postageConfiguration;
102 
103         int totalMailsPerMin = m_postageConfiguration.getTotalMailsPerMin();
104         int durationMinutes = m_postageConfiguration.getDurationMinutes();
105 
106         m_postageConfiguration.addDescriptionItem("mails_per_min", "" + totalMailsPerMin);
107         m_postageConfiguration.addDescriptionItem("totally_running_min", "" + durationMinutes);
108         m_postageConfiguration.addDescriptionItem("totally_mails_target", "" + totalMailsPerMin * durationMinutes);
109 
110         m_results.setEnvironmentDescription(m_postageConfiguration.getDescriptionItems());
111     }
112 
113     private void execute() {
114         if (m_postageConfiguration != null) m_currentPhase = PHASE_STARTING;
115 
116         // do initialisation, check if all services can be connected
117         try {
118             setupInternalUserAccounts();
119             setupExternalUserAccounts();
120             setupInboundMailing();
121             setupInboundMailingChecker();
122             setupForwardedMailInterceptor();
123             setupJMXRemoting();
124             prepareResultFile(getCanonicalMailResultFileName());
125             prepareResultFile(getCanonicalJVMStatisticsFileName());
126             prepareResultFile(getCanonicalErrorsFileName());
127         } catch (StartupException e) {
128             log.fatal("could not even start the runner successfully", e);
129             return;
130         }
131 
132 
133         m_currentPhase = PHASE_RUNNING;
134 
135         log.info("starting scenario " + m_postageConfiguration.getId());
136 
137         // fork the timeout controller thread. it issues the oneMinute checkpoint event, too.
138         startTimer();
139 
140         // start all threads
141         try {
142             recordData();
143         } catch (Exception e) {
144             log.error("recording data was aborted!", e);
145         }
146 
147         // has to be set by method stopRecording()
148         // m_currentPhase = PHASE_COMPLETED;
149 
150         // writeMatchedMailResults (remaining) collected data
151         log.info("completing by writing data for scenario " + m_postageConfiguration.getId());
152         writeData(false);
153     }
154 
155     private void prepareResultFile(String canonicalMailResultFileName) {
156         File writeCandidate = new File(canonicalMailResultFileName);
157         if (writeCandidate.exists()) {
158             // rename existing result file from previous run
159             // to something like "result___.cvs.64906993" to make place for new results
160             writeCandidate.renameTo(new File(canonicalMailResultFileName + "." + writeCandidate.lastModified()));
161         }
162     }
163 
164     /***
165      * for checking the running status of this PostageRunner.
166      * @return one of the values PHASE_CREATED (0), PHASE_STARTING (1), PHASE_RUNNING (2), PHASE_ABORTED (3), PHASE_COMPLETED (4)
167      */
168     public int getCurrentPhase() {
169         return m_currentPhase;
170     }
171 
172     public void run() {
173         execute();
174     }
175 
176     public PostageRunnerResult getResult() {
177         return m_results;
178     }
179 
180     /***
181      * set up a thread issueing one-minute events and finally shutting down data recording when time has run out.
182      */
183     private void startTimer() {
184         new Thread(
185                 new Runnable() {
186                     public void run() {
187                         try {
188                             int durationMinutes = m_postageConfiguration.getDurationMinutes();
189                             log.info("running for " + durationMinutes + " minute(s)");
190                             for (int i = 0; i < durationMinutes; i++) {
191                                 Thread.sleep(60*1000);
192                                 oneMinuteCheckpoint();
193                             }
194                             stopRecording();
195                         } catch (InterruptedException e) {
196                             ; // exit
197                         }
198                     }
199 
200                 }
201         ).start();
202     }
203 
204     /***
205      * called after each fully completed minute in the running phase
206      */
207     private void oneMinuteCheckpoint() {
208         m_minutesRunning++;
209         log.info("reached checkpoint after " + m_minutesRunning + " of "
210                   + m_postageConfiguration.getDurationMinutes() + " minute(s) running.");
211 
212         //TODO do this in a separate thread?
213         writeData(true);
214     }
215 
216     private void stopRecording() {
217         log.info("stopping");
218         if (m_sendControllers != null) {
219             Iterator iterator = m_sendControllers.iterator();
220             while (iterator.hasNext()) {
221                 SampleController sendController = (SampleController)iterator.next();
222                 sendController.stop();
223             }
224         }
225         if (m_inboundMailingController != null) m_inboundMailingController.stop();
226 
227         if (m_outboundMailingInterceptorController != null) m_outboundMailingInterceptorController.stop();
228         m_currentPhase = PHASE_COMPLETED;
229     }
230 
231     /***
232      * interrupt the runner from outside
233      */
234     public void terminate() {
235         stopRecording();
236         m_currentPhase = PHASE_ABORTED;
237         writeData(false);
238     }
239 
240     private void writeData(boolean flushMatchedMailOnly) {
241         logElapsedData();
242 
243         String filenameMailResult = getCanonicalMailResultFileName();
244         String filenameJVMStatistics = getCanonicalJVMStatisticsFileName();
245         String filenameErrors = getCanonicalErrorsFileName();
246         m_results.writeResults(filenameMailResult, filenameJVMStatistics, filenameErrors, flushMatchedMailOnly);
247     }
248 
249     public String getCanonicalMailResultFileName() {
250         return "postage_mailResults." + m_postageConfiguration.getId() + ".csv";
251     }
252 
253     public String getCanonicalJVMStatisticsFileName() {
254         return "postage_jvmStatistics." + m_postageConfiguration.getId() + ".csv";
255     }
256 
257     public String getCanonicalErrorsFileName() {
258         return "postage_errors." + m_postageConfiguration.getId() + ".csv";
259     }
260 
261     private void logElapsedData() {
262         log.info("unmatched messages: " + m_results.getUnmatchedMails());
263         log.info("matched messages:   " + m_results.getMatchedMails());
264         log.info("valid matches:      " + m_results.getValidMails());
265         log.info("recorded errors:    " + m_results.getErrorCount());
266     }
267 
268     private void recordData() {
269 
270         Iterator iterator = m_sendControllers.iterator();
271         while (iterator.hasNext()) {
272             SampleController sendController = (SampleController)iterator.next();
273             sendController.runThreaded();
274         }
275 
276         m_inboundMailingController = new SampleController(m_inboundMailingChecker, m_postageConfiguration.getTestserverPOP3FetchesPerMinute());
277         m_inboundMailingController.runThreaded();
278 
279         m_outboundMailingInterceptorController = new SampleController(m_smtpMailSink, 10, m_postageConfiguration.getTestserverSMTPForwardingWaitSeconds());
280         m_outboundMailingInterceptorController.runThreaded();
281 
282         if (m_jvmResourceSampler != null) {
283             m_jvmResourceController = new SampleController(m_jvmResourceSampler, 4);
284             m_jvmResourceController.runThreaded();
285         }
286 
287         while(m_currentPhase == PHASE_RUNNING) {
288             try {
289                 Thread.sleep(50);
290             } catch (InterruptedException e) {
291                 ; // leave
292             }
293         }
294 
295         if (m_currentPhase == PHASE_COMPLETED) {
296             // walk through all internal users and check for un-matched mails
297             log.info("checking all internal accounts for unmatched mail...");
298             m_inboundMailingChecker.doMatchMailForAllUsers();
299             log.info("...done checking internal accounts");
300         } else {
301             // if we didn't COMPLETE, we'd better skip this
302             log.info("skip checking internal accounts for unmatched mail.");
303         }
304     }
305 
306     private void setupExternalUserAccounts() {
307         int externalUserCount = m_postageConfiguration.getExternalUsers().getCount();
308         String externalUsernamePrefix = m_postageConfiguration.getExternalUsers().getNamePrefix();
309 
310         ArrayList externalUsers = new ArrayList();
311         for (int i = 1; i <= externalUserCount; i++) {
312             String username = externalUsernamePrefix + i;
313             externalUsers.add(username);
314         }
315         m_postageConfiguration.getExternalUsers().setExistingUsers(externalUsers);
316     }
317 
318     /***
319      * sets up the profile where mail is send from external users to internal users
320      * @throws StartupException
321      */
322     private void setupInboundMailing() throws StartupException {
323         if (m_postageConfiguration.getTestserverPortSMTPInbound() <= 0) return;
324 
325         Iterator profileIterator = m_postageConfiguration.getProfiles().iterator();
326         while (profileIterator.hasNext()) {
327             SendProfile sendProfile = (SendProfile)profileIterator.next();
328             Iterator mailSenderIterator = sendProfile.mailSenderIterator();
329             while (mailSenderIterator.hasNext()) {
330                 MailSender mailSender = (MailSender)mailSenderIterator.next();
331                 int sendPerMinute = mailSender.getSendPerMinute();
332 
333                 if (sendPerMinute < 1) continue;
334 
335                 SMTPClient smtpClient = new SMTPClient(m_postageConfiguration.getTestserverHost(),
336                         m_postageConfiguration.getTestserverPortSMTPInbound(),
337                         m_postageConfiguration.getInternalUsers(),
338                         m_postageConfiguration.getExternalUsers(),
339                         m_results,
340                         mailSender
341                 );
342 
343                 boolean available = smtpClient.checkAvailability();
344                 log.info("availability of inbound mailing " + (available ? "": "NOT ") + "verified");
345                 if (!available) continue;
346 
347                 SampleController sendController = new SampleController(smtpClient, sendPerMinute);
348                 m_sendControllers.add(sendController);
349             }
350         }
351 
352     }
353 
354 
355     /***
356      * sets up the part for checking accounts via POP3, which are then aligned with sent test mails
357      * @throws StartupException
358      */
359     private void setupInboundMailingChecker() throws StartupException {
360         if (m_postageConfiguration.getTestserverPortPOP3() <= 0) return;
361 
362         m_inboundMailingChecker = new POP3Client(m_postageConfiguration.getTestserverHost(),
363                 m_postageConfiguration.getTestserverPortPOP3(),
364                 m_postageConfiguration.getInternalUsers(),
365                 m_results
366         );
367         m_inboundMailingChecker.checkAvailability();
368         boolean available = m_inboundMailingChecker.checkAvailability();
369         if (available) {
370             log.info("availability of checking for inbound mailing (POP3) verified");
371         }
372     }
373 
374     private void setupInternalUserAccounts() throws StartupException {
375         try {
376             String host = m_postageConfiguration.getTestserverHost();
377             int remoteManagerPort = m_postageConfiguration.getTestserverRemoteManagerPort();
378             String remoteManagerUsername = m_postageConfiguration.getTestserverRemoteManagerUsername();
379             String remoteManagerPassword = m_postageConfiguration.getTestserverRemoteManagerPassword();
380             int internalUserCount = m_postageConfiguration.getInternalUsers().getCount();
381             String internalUsernamePrefix = m_postageConfiguration.getInternalUsers().getNamePrefix();
382             String internalPassword = m_postageConfiguration.getInternalUsers().getPassword();
383 
384             Set existingUsers = getExistingUsers(host, remoteManagerPort, remoteManagerUsername, remoteManagerPassword);
385 
386             RemoteManagerClient remoteManagerClient = new RemoteManagerClient(host, remoteManagerPort, remoteManagerUsername, remoteManagerPassword);
387             remoteManagerClient.login();
388             ArrayList internalUsers = new ArrayList();
389             for (int i = 1; i <= internalUserCount; i++) {
390                 String username = internalUsernamePrefix + i;
391                 if (existingUsers.contains(username)) {
392                     log.info("user already exists: " + username);
393                     if (!m_postageConfiguration.isInternalReuseExisting()) {
394                         remoteManagerClient.executeCommand("deluser " + username);
395                         remoteManagerClient.readAnswer();
396                         addUser(remoteManagerClient, username, internalPassword);
397                         remoteManagerClient.readAnswer();
398                         log.info("user deleted and re-created: " + username);
399                     }
400                     remoteManagerClient.executeCommand("setpassword " + username + " " + internalPassword);
401                     remoteManagerClient.readAnswer();
402                 } else {
403                     addUser(remoteManagerClient, username, internalPassword);
404                 }
405                 internalUsers.add(username);
406             }
407             m_postageConfiguration.getInternalUsers().setExistingUsers(internalUsers);
408             remoteManagerClient.disconnect();
409         } catch (Exception e) {
410             throw new StartupException("error setting up internal user accounts", e);
411         }
412     }
413 
414     private void setupForwardedMailInterceptor() throws StartupException {
415         SMTPMailSink smtpMailSink = new SMTPMailSink();
416         smtpMailSink.setSmtpListenerPort(m_postageConfiguration.getTestserverPortSMTPForwarding());
417         smtpMailSink.setResults(m_results);
418         try {
419             smtpMailSink.initialize();
420         } catch (Exception e) {
421             throw new StartupException("failed to setup",e);
422         }
423         m_smtpMailSink = smtpMailSink;
424         log.info("forwarded mail interceptor is set up.");
425     }
426 
427 
428     private void setupJMXRemoting() throws StartupException {
429         boolean jmxAvailable = JVMResourceSampler.isJMXAvailable();
430         int jmxPort = m_postageConfiguration.getTestserverPortJMXRemoting();
431         if (!jmxAvailable || jmxPort <= 0) {
432             return;
433         }
434         JVMResourceSampler jvmResourceSampler = new JVMResourceSampler("localhost", jmxPort, m_results);
435         try {
436             jvmResourceSampler.connectRemoteJamesJMXServer();
437             log.info("connected to remote JMX");
438             m_jvmResourceSampler = jvmResourceSampler;
439         } catch (Exception e) {
440             throw new StartupException("failed to setup JMX remoting for JVM resource sampling", e);
441         }
442     }
443 
444     private void addUser(RemoteManagerClient remoteManagerClient, String username, String internalPassword) {
445         remoteManagerClient.executeCommand("adduser " + username + " " + internalPassword);
446         remoteManagerClient.readAnswer();
447         log.info("user created: " + username);
448     }
449 
450     /***
451      * aquire a list of all existing internal James accounts
452      * @return Set<String>, each String a username
453      */
454     private Set getExistingUsers(String host, int remoteManagerPort, String remoteManagerUsername, String remoteManagerPassword) {
455         RemoteManagerClient remoteManagerClient = new RemoteManagerClient(host, remoteManagerPort, remoteManagerUsername, remoteManagerPassword);
456         boolean loginSuccess = remoteManagerClient.login();
457         if (!loginSuccess) throw new Error("failed to login to remote manager");
458         List rawUserList = remoteManagerClient.executeCommand("listusers");
459         remoteManagerClient.disconnect();
460 
461         Set existingUsers = new LinkedHashSet();
462         Iterator iterator = rawUserList.iterator();
463         while (iterator.hasNext()) {
464             String line = (String)iterator.next();
465             if (!line.startsWith("user: ")) continue;
466 
467             existingUsers.add(line.substring(6));
468         }
469         return existingUsers;
470     }
471 
472 }