001    /****************************************************************
002     * Licensed to the Apache Software Foundation (ASF) under one   *
003     * or more contributor license agreements.  See the NOTICE file *
004     * distributed with this work for additional information        *
005     * regarding copyright ownership.  The ASF licenses this file   *
006     * to you under the Apache License, Version 2.0 (the            *
007     * "License"); you may not use this file except in compliance   *
008     * with the License.  You may obtain a copy of the License at   *
009     *                                                              *
010     *   http://www.apache.org/licenses/LICENSE-2.0                 *
011     *                                                              *
012     * Unless required by applicable law or agreed to in writing,   *
013     * software distributed under the License is distributed on an  *
014     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
015     * KIND, either express or implied.  See the License for the    *
016     * specific language governing permissions and limitations      *
017     * under the License.                                           *
018     ****************************************************************/
019    
020    package org.apache.james.ai.classic;
021    
022    import java.io.BufferedReader;
023    import java.io.ByteArrayOutputStream;
024    import java.io.StringReader;
025    import java.sql.Connection;
026    import java.util.Enumeration;
027    
028    import javax.annotation.Resource;
029    import javax.mail.Header;
030    import javax.mail.MessagingException;
031    import javax.mail.internet.MimeMessage;
032    import javax.sql.DataSource;
033    
034    import org.apache.mailet.Mail;
035    import org.apache.mailet.base.GenericMailet;
036    
037    /**
038     * <p>
039     * Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet.
040     * </p>
041     * 
042     * <p>
043     * The new token frequencies will be stored in a JDBC database.
044     * </p>
045     * 
046     * <p>
047     * Sample configuration:
048     * </p>
049     * 
050     * <pre>
051     * <code>
052     * &lt;processor name="root"&gt;
053     * 
054     *   &lt;mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder"&gt;
055     *     &lt;repositoryPath&gt; db://maildb &lt;/repositoryPath&gt;
056     *     &lt;feedType&gt;ham&lt;/feedType&gt;
057     *     &lt;!--
058     *       Set this to the maximum message size (in bytes) that a message may have
059     *       to be analyzed (default is 100000).
060     *     --&gt;
061     *     &lt;maxSize&gt;100000&lt;/maxSize&gt;
062     *   &lt;/mailet&gt;
063     * 
064     *   &lt;mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder"&gt;
065     *     &lt;repositoryPath&gt; db://maildb &lt;/repositoryPath&gt;
066     *     &lt;feedType&gt;spam&lt;/feedType&gt;
067     *     &lt;!--
068     *       Set this to the maximum message size (in bytes) that a message may have
069     *       to be analyzed (default is 100000).
070     *     --&gt;
071     *     &lt;maxSize&gt;100000&lt;/maxSize&gt;
072     *   &lt;/mailet&gt;
073     * 
074     * &lt;processor&gt;
075     * </code>
076     * </pre>
077     * 
078     * <p>
079     * The previous example will allow the user to send messages to the server and
080     * use the recipient email address as the indicator for whether the message is
081     * ham or spam.
082     * </p>
083     * 
084     * <p>
085     * Using the example above, send good messages (ham not spam) to the email
086     * address "not.spam@thisdomain.com" to pump good messages into the feeder, and
087     * send spam messages (spam not ham) to the email address "spam@thisdomain.com"
088     * to pump spam messages into the feeder.
089     * </p>
090     * 
091     * <p>
092     * The bayesian database tables will be updated during the training reflecting
093     * the new data
094     * </p>
095     * 
096     * <p>
097     * At the end the mail will be destroyed (ghosted).
098     * </p>
099     * 
100     * <p>
101     * <b>The correct approach is to send the original ham/spam message as an
102     * attachment to another message sent to the feeder; all the headers of the
103     * enveloping message will be removed and only the original message's tokens
104     * will be analyzed.</b>
105     * </p>
106     * 
107     * <p>
108     * After a training session, the frequency <i>Corpus</i> used by
109     * <code>BayesianAnalysis</code> must be rebuilt from the database, in order to
110     * take advantage of the new token frequencies. Every 10 minutes a special
111     * thread in the <code>BayesianAnalysis</code> mailet will check if any change
112     * was made to the database, and rebuild the corpus if necessary.
113     * </p>
114     * 
115     * <p>
116     * Only one message at a time is scanned (the database update activity is
117     * <i>synchronized</i>) in order to avoid too much database locking, as
118     * thousands of rows may be updated just for one message fed.
119     * </p>
120     * 
121     * @see BayesianAnalysis
122     * @see BayesianAnalyzer
123     * @see JDBCBayesianAnalyzer
124     * @since 2.3.0
125     */
126    
127    public class BayesianAnalysisFeeder extends GenericMailet {
128        /**
129         * The JDBCUtil helper class
130         */
131        private final JDBCUtil theJDBCUtil = new JDBCUtil() {
132            protected void delegatedLog(String logString) {
133                log("BayesianAnalysisFeeder: " + logString);
134            }
135        };
136    
137        /**
138         * The JDBCBayesianAnalyzer class that does all the work.
139         */
140        private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
141            protected void delegatedLog(String logString) {
142                log("BayesianAnalysisFeeder: " + logString);
143            }
144        };
145    
146        private DataSource datasource;
147        private String repositoryPath;
148    
149        private String feedType;
150    
151        /**
152         * Return a string describing this mailet.
153         * 
154         * @return a string describing this mailet
155         */
156        public String getMailetInfo() {
157            return "BayesianAnalysisFeeder Mailet";
158        }
159    
160        /**
161         * Holds value of property maxSize.
162         */
163        private int maxSize = 100000;
164    
165        private SystemContext fs;
166    
167        /**
168         * Getter for property maxSize.
169         * 
170         * @return Value of property maxSize.
171         */
172        public int getMaxSize() {
173    
174            return this.maxSize;
175        }
176    
177        @Resource(name = "datasource")
178        public void setDataSource(DataSource datasource) {
179            this.datasource = datasource;
180        }
181    
182        /**
183         * Setter for property maxSize.
184         * 
185         * @param maxSize
186         *            New value of property maxSize.
187         */
188        public void setMaxSize(int maxSize) {
189    
190            this.maxSize = maxSize;
191        }
192    
193        @Resource(name = "filesystem")
194        public void setFileSystem(SystemContext fs) {
195            this.fs = fs;
196        }
197    
198        /**
199         * Mailet initialization routine.
200         * 
201         * @throws MessagingException
202         *             if a problem arises
203         */
204        public void init() throws MessagingException {
205            repositoryPath = getInitParameter("repositoryPath");
206    
207            if (repositoryPath == null) {
208                throw new MessagingException("repositoryPath is null");
209            }
210    
211            feedType = getInitParameter("feedType");
212            if (feedType == null) {
213                throw new MessagingException("feedType is null");
214            }
215    
216            String maxSizeParam = getInitParameter("maxSize");
217            if (maxSizeParam != null) {
218                setMaxSize(Integer.parseInt(maxSizeParam));
219            }
220            log("maxSize: " + getMaxSize());
221    
222            initDb();
223    
224        }
225    
226        private void initDb() throws MessagingException {
227    
228            try {
229                analyzer.initSqlQueries(datasource.getConnection(), fs.readXml("sqlResources.xml"));
230            } catch (Exception e) {
231                throw new MessagingException("Exception initializing queries", e);
232            }
233    
234        }
235    
236        /**
237         * Scans the mail and updates the token frequencies in the database.
238         * 
239         * The method is synchronized in order to avoid too much database locking,
240         * as thousands of rows may be updated just for one message fed.
241         * 
242         * @param mail
243         *            The Mail message to be scanned.
244         */
245        public void service(Mail mail) {
246            boolean dbUpdated = false;
247    
248            mail.setState(Mail.GHOST);
249    
250            ByteArrayOutputStream baos = new ByteArrayOutputStream();
251    
252            Connection conn = null;
253    
254            try {
255    
256                MimeMessage message = mail.getMessage();
257    
258                String messageId = message.getMessageID();
259    
260                if (message.getSize() > getMaxSize()) {
261                    log(messageId + " Feeding HAM/SPAM ignored because message size > " + getMaxSize() + ": " + message.getSize());
262                    return;
263                }
264    
265                clearAllHeaders(message);
266    
267                message.writeTo(baos);
268    
269                BufferedReader br = new BufferedReader(new StringReader(baos.toString()));
270    
271                // this is synchronized to avoid concurrent update of the corpus
272                synchronized (JDBCBayesianAnalyzer.DATABASE_LOCK) {
273    
274                    conn = datasource.getConnection();
275    
276                    if (conn.getAutoCommit()) {
277                        conn.setAutoCommit(false);
278                    }
279    
280                    dbUpdated = true;
281    
282                    // Clear out any existing word/counts etc..
283                    analyzer.clear();
284    
285                    if ("ham".equalsIgnoreCase(feedType)) {
286                        log(messageId + " Feeding HAM");
287                        // Process the stream as ham (not spam).
288                        analyzer.addHam(br);
289    
290                        // Update storage statistics.
291                        analyzer.updateHamTokens(conn);
292                    } else {
293                        log(messageId + " Feeding SPAM");
294                        // Process the stream as spam.
295                        analyzer.addSpam(br);
296    
297                        // Update storage statistics.
298                        analyzer.updateSpamTokens(conn);
299                    }
300    
301                    // Commit our changes if necessary.
302                    if (conn != null && dbUpdated && !conn.getAutoCommit()) {
303                        conn.commit();
304                        dbUpdated = false;
305                        log(messageId + " Training ended successfully");
306                        JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
307                    }
308    
309                }
310    
311            } catch (java.sql.SQLException se) {
312                log("SQLException: " + se.getMessage());
313            } catch (java.io.IOException ioe) {
314                log("IOException: " + ioe.getMessage());
315            } catch (javax.mail.MessagingException me) {
316                log("MessagingException: " + me.getMessage());
317            } finally {
318                // Rollback our changes if necessary.
319                try {
320                    if (conn != null && dbUpdated && !conn.getAutoCommit()) {
321                        conn.rollback();
322                        dbUpdated = false;
323                    }
324                } catch (Exception e) {
325                }
326                theJDBCUtil.closeJDBCConnection(conn);
327            }
328        }
329    
330        private void clearAllHeaders(MimeMessage message) throws javax.mail.MessagingException {
331            @SuppressWarnings("rawtypes")
332            Enumeration headers = message.getAllHeaders();
333    
334            while (headers.hasMoreElements()) {
335                Header header = (Header) headers.nextElement();
336                try {
337                    message.removeHeader(header.getName());
338                } catch (javax.mail.MessagingException me) {
339                }
340            }
341            message.saveChanges();
342        }
343    
344    }