001    /****************************************************************
002     * Licensed to the Apache Software Foundation (ASF) under one   *
003     * or more contributor license agreements.  See the NOTICE file *
004     * distributed with this work for additional information        *
005     * regarding copyright ownership.  The ASF licenses this file   *
006     * to you under the Apache License, Version 2.0 (the            *
007     * "License"); you may not use this file except in compliance   *
008     * with the License.  You may obtain a copy of the License at   *
009     *                                                              *
010     *   http://www.apache.org/licenses/LICENSE-2.0                 *
011     *                                                              *
012     * Unless required by applicable law or agreed to in writing,   *
013     * software distributed under the License is distributed on an  *
014     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
015     * KIND, either express or implied.  See the License for the    *
016     * specific language governing permissions and limitations      *
017     * under the License.                                           *
018     ****************************************************************/
019    
020    package org.apache.james.ai.classic;
021    
022    import java.io.BufferedReader;
023    import java.io.ByteArrayOutputStream;
024    import java.io.StringReader;
025    import java.sql.Connection;
026    import java.text.DecimalFormat;
027    import java.util.Collection;
028    import java.util.Iterator;
029    
030    import javax.annotation.Resource;
031    import javax.mail.MessagingException;
032    import javax.mail.internet.MimeMessage;
033    import javax.sql.DataSource;
034    
035    import org.apache.mailet.Mail;
036    import org.apache.mailet.MailAddress;
037    import org.apache.mailet.base.GenericMailet;
038    import org.apache.mailet.base.RFC2822Headers;
039    
040    /**
041     * <p>
042     * Spam detection mailet using bayesian analysis techniques.
043     * </p>
044     * 
045     * <p>
046     * Sets an email message header indicating the probability that an email message
047     * is SPAM.
048     * </p>
049     * 
050     * <p>
051     * Based upon the principals described in: <a
052     * href="http://www.paulgraham.com/spam.html">A Plan For Spam</a> by Paul
053     * Graham. Extended to Paul Grahams' <a
054     * href="http://paulgraham.com/better.html">Better Bayesian Filtering</a>.
055     * </p>
056     * 
057     * <p>
058     * The analysis capabilities are based on token frequencies (the <i>Corpus</i>)
059     * learned through a training process (see {@link BayesianAnalysisFeeder}) and
060     * stored in a JDBC database. After a training session, the Corpus must be
061     * rebuilt from the database in order to acquire the new frequencies. Every 10
062     * minutes a special thread in this mailet will check if any change was made to
063     * the database by the feeder, and rebuild the corpus if necessary.
064     * </p>
065     * 
066     * <p>
067     * A <code>org.apache.james.spam.probability</code> mail attribute will be
068     * created containing the computed spam probability as a
069     * {@link java.lang.Double}. The <code>headerName</code> message header string
070     * will be created containing such probability in floating point representation.
071     * </p>
072     * 
073     * <p>
074     * Sample configuration:
075     * </p>
076     * 
077     * <pre>
078     * <code>
079     * &lt;mailet match="All" class="BayesianAnalysis"&gt;
080     *   &lt;repositoryPath&gt;db://maildb&lt;/repositoryPath&gt;
081     *   &lt;!--
082     *     Set this to the header name to add with the spam probability
083     *     (default is "X-MessageIsSpamProbability").
084     *   --&gt;
085     *   &lt;headerName&gt;X-MessageIsSpamProbability&lt;/headerName&gt;
086     *   &lt;!--
087     *     Set this to true if you want to ignore messages coming from local senders
088     *     (default is false).
089     *     By local sender we mean a return-path with a local server part (server listed
090     *     in &lt;servernames&gt; in config.xml).
091     *   --&gt;
092     *   &lt;ignoreLocalSender&gt;true&lt;/ignoreLocalSender&gt;
093     *   &lt;!--
094     *     Set this to the maximum message size (in bytes) that a message may have
095     *     to be considered spam (default is 100000).
096     *   --&gt;
097     *   &lt;maxSize&gt;100000&lt;/maxSize&gt;
098     *   &lt;!--
099     *     Set this to false if you not want to tag the message if spam is detected (Default is true).
100     *   --&gt;
101     *   &lt;tagSubject&gt;true&lt;/tagSubject&gt;
102     * &lt;/mailet&gt;
103     * </code>
104     * </pre>
105     * 
106     * <p>
107     * The probability of being spam is pre-pended to the subject if it is &gt; 0.1
108     * (10%).
109     * </p>
110     * 
111     * <p>
112     * The required tables are automatically created if not already there (see
113     * sqlResources.xml). The token field in both the ham and spam tables is <b>case
114     * sensitive</b>.
115     * </p>
116     * 
117     * @see BayesianAnalysisFeeder
118     * @see BayesianAnalyzer
119     * @see JDBCBayesianAnalyzer
120     * @since 2.3.0
121     */
122    
123    public class BayesianAnalysis extends GenericMailet {
124        /**
125         * The JDBCUtil helper class
126         */
127        private final JDBCUtil theJDBCUtil = new JDBCUtil() {
128            protected void delegatedLog(String logString) {
129                log("BayesianAnalysis: " + logString);
130            }
131        };
132    
133        /**
134         * The JDBCBayesianAnalyzer class that does all the work.
135         */
136        private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
137            protected void delegatedLog(String logString) {
138                log("BayesianAnalysis: " + logString);
139            }
140        };
141    
142        private DataSource datasource;
143        private String repositoryPath;
144    
145        private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability";
146        private static final String HEADER_NAME = "X-MessageIsSpamProbability";
147        private static final long CORPUS_RELOAD_INTERVAL = 600000;
148        private String headerName;
149        private boolean ignoreLocalSender = false;
150        private boolean tagSubject = true;
151    
152        /**
153         * Return a string describing this mailet.
154         * 
155         * @return a string describing this mailet
156         */
157        public String getMailetInfo() {
158            return "BayesianAnalysis Mailet";
159        }
160    
161        /**
162         * Holds value of property maxSize.
163         */
164        private int maxSize = 100000;
165    
166        /**
167         * Holds value of property lastCorpusLoadTime.
168         */
169        private long lastCorpusLoadTime;
170    
171        private SystemContext fs;
172    
173        /**
174         * Getter for property maxSize.
175         * 
176         * @return Value of property maxSize.
177         */
178        public int getMaxSize() {
179    
180            return this.maxSize;
181        }
182    
183        /**
184         * Setter for property maxSize.
185         * 
186         * @param maxSize
187         *            New value of property maxSize.
188         */
189        public void setMaxSize(int maxSize) {
190    
191            this.maxSize = maxSize;
192        }
193    
194        /**
195         * Getter for property lastCorpusLoadTime.
196         * 
197         * @return Value of property lastCorpusLoadTime.
198         */
199        public long getLastCorpusLoadTime() {
200    
201            return this.lastCorpusLoadTime;
202        }
203    
204        @Resource(name = "datasource")
205        public void setDataSource(DataSource datasource) {
206            this.datasource = datasource;
207        }
208    
209        @Resource(name = "filesystem")
210        public void setFileSystem(SystemContext fs) {
211            this.fs = fs;
212        }
213    
214        /**
215         * Sets lastCorpusLoadTime to System.currentTimeMillis().
216         */
217        private void touchLastCorpusLoadTime() {
218    
219            this.lastCorpusLoadTime = System.currentTimeMillis();
220        }
221    
222        /**
223         * Mailet initialization routine.
224         * 
225         * @throws MessagingException
226         *             if a problem arises
227         */
228        public void init() throws MessagingException {
229            repositoryPath = getInitParameter("repositoryPath");
230    
231            if (repositoryPath == null) {
232                throw new MessagingException("repositoryPath is null");
233            }
234    
235            headerName = getInitParameter("headerName", HEADER_NAME);
236    
237            ignoreLocalSender = Boolean.valueOf(getInitParameter("ignoreLocalSender")).booleanValue();
238    
239            if (ignoreLocalSender) {
240                log("Will ignore messages coming from local senders");
241            } else {
242                log("Will analyze messages coming from local senders");
243            }
244    
245            String maxSizeParam = getInitParameter("maxSize");
246            if (maxSizeParam != null) {
247                setMaxSize(Integer.parseInt(maxSizeParam));
248            }
249            log("maxSize: " + getMaxSize());
250    
251            String tag = getInitParameter("tagSubject");
252            if (tag != null && tag.equals("false")) {
253                tagSubject = false;
254            }
255    
256            initDb();
257    
258            CorpusLoader corpusLoader = new CorpusLoader(this);
259            corpusLoader.setDaemon(true);
260            corpusLoader.start();
261    
262        }
263    
264        private void initDb() throws MessagingException {
265    
266            try {
267                analyzer.initSqlQueries(datasource.getConnection(), fs.readXml("sqlResources.xml"));
268            } catch (Exception e) {
269                throw new MessagingException("Exception initializing queries", e);
270            }
271    
272            try {
273                loadData(datasource.getConnection());
274            } catch (java.sql.SQLException se) {
275                throw new MessagingException("SQLException loading data", se);
276            }
277        }
278    
279        /**
280         * Scans the mail and determines the spam probability.
281         * 
282         * @param mail
283         *            The Mail message to be scanned.
284         * @throws MessagingException
285         *             if a problem arises
286         */
287        public void service(Mail mail) throws MessagingException {
288    
289            try {
290                MimeMessage message = mail.getMessage();
291    
292                if (ignoreLocalSender) {
293                    // ignore the message if the sender is local
294                    if (mail.getSender() != null && getMailetContext().isLocalServer(mail.getSender().getDomain())) {
295                        return;
296                    }
297                }
298    
299                String[] headerArray = message.getHeader(headerName);
300                // ignore the message if already analyzed
301                if (headerArray != null && headerArray.length > 0) {
302                    return;
303                }
304    
305                ByteArrayOutputStream baos = new ByteArrayOutputStream();
306    
307                double probability;
308    
309                if (message.getSize() < getMaxSize()) {
310                    message.writeTo(baos);
311                    probability = analyzer.computeSpamProbability(new BufferedReader(new StringReader(baos.toString())));
312                } else {
313                    probability = 0.0;
314                }
315    
316                mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(probability));
317                message.setHeader(headerName, Double.toString(probability));
318    
319                DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat.getInstance();
320                probabilityForm.applyPattern("##0.##%");
321                String probabilityString = probabilityForm.format(probability);
322    
323                String senderString;
324                if (mail.getSender() == null) {
325                    senderString = "null";
326                } else {
327                    senderString = mail.getSender().toString();
328                }
329                if (probability > 0.1) {
330                    @SuppressWarnings("unchecked")
331                    final Collection<MailAddress> recipients = mail.getRecipients();
332                    log(headerName + ": " + probabilityString + "; From: " + senderString + "; Recipient(s): " + getAddressesString(recipients));
333    
334                    // Check if we should tag the subject
335                    if (tagSubject) {
336                        appendToSubject(message, " [" + probabilityString + (probability > 0.9 ? " SPAM" : " spam") + "]");
337                    }
338                }
339    
340                saveChanges(message);
341    
342            } catch (Exception e) {
343                log("Exception: " + e.getMessage(), e);
344                throw new MessagingException("Exception thrown", e);
345            }
346        }
347    
348        private void loadData(Connection conn) throws java.sql.SQLException {
349    
350            try {
351                // this is synchronized to avoid concurrent update of the corpus
352                synchronized (JDBCBayesianAnalyzer.DATABASE_LOCK) {
353                    analyzer.tokenCountsClear();
354                    analyzer.loadHamNSpam(conn);
355                    analyzer.buildCorpus();
356                    analyzer.tokenCountsClear();
357                }
358    
359                log("BayesianAnalysis Corpus loaded");
360    
361                touchLastCorpusLoadTime();
362    
363            } finally {
364                if (conn != null) {
365                    theJDBCUtil.closeJDBCConnection(conn);
366                }
367            }
368    
369        }
370    
371        private String getAddressesString(Collection<MailAddress> addresses) {
372            if (addresses == null) {
373                return "null";
374            }
375    
376            Iterator<MailAddress> iter = addresses.iterator();
377            StringBuffer sb = new StringBuffer();
378            sb.append('[');
379            for (int i = 0; iter.hasNext(); i++) {
380                sb.append(iter.next());
381                if (i + 1 < addresses.size()) {
382                    sb.append(", ");
383                }
384            }
385            sb.append(']');
386            return sb.toString();
387        }
388    
389        private void appendToSubject(MimeMessage message, String toAppend) {
390            try {
391                String subject = message.getSubject();
392    
393                if (subject == null) {
394                    message.setSubject(toAppend, "iso-8859-1");
395                } else {
396                    message.setSubject(toAppend + " " + subject, "iso-8859-1");
397                }
398            } catch (MessagingException ex) {
399            }
400        }
401    
402        /**
403         * Saves changes resetting the original message id.
404         */
405        private void saveChanges(MimeMessage message) throws MessagingException {
406            String messageId = message.getMessageID();
407            message.saveChanges();
408            if (messageId != null) {
409                message.setHeader(RFC2822Headers.MESSAGE_ID, messageId);
410            }
411        }
412    
413        private static class CorpusLoader extends Thread {
414    
415            private BayesianAnalysis analysis;
416    
417            private CorpusLoader(BayesianAnalysis analysis) {
418                super("BayesianAnalysis Corpus Loader");
419                this.analysis = analysis;
420            }
421    
422            /**
423             * Thread entry point.
424             */
425            public void run() {
426                analysis.log("CorpusLoader thread started: will wake up every " + CORPUS_RELOAD_INTERVAL + " ms");
427    
428                try {
429                    Thread.sleep(CORPUS_RELOAD_INTERVAL);
430    
431                    while (true) {
432                        if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer.getLastDatabaseUpdateTime()) {
433                            analysis.log("Reloading Corpus ...");
434                            try {
435                                analysis.loadData(analysis.datasource.getConnection());
436                                analysis.log("Corpus reloaded");
437                            } catch (java.sql.SQLException se) {
438                                analysis.log("SQLException: ", se);
439                            }
440    
441                        }
442    
443                        if (Thread.interrupted()) {
444                            break;
445                        }
446                        Thread.sleep(CORPUS_RELOAD_INTERVAL);
447                    }
448                } catch (InterruptedException ex) {
449                    interrupt();
450                }
451            }
452    
453        }
454    
455    }