001    /** 
002     * 
003     * Copyright 2004 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.store.jdbc;
019    
020    import java.sql.Connection;
021    import java.sql.SQLException;
022    import java.util.Map;
023    
024    import javax.jms.JMSException;
025    import javax.sql.DataSource;
026    
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    import org.activemq.broker.BrokerContainer;
030    import org.activemq.io.WireFormat;
031    import org.activemq.io.impl.StatelessDefaultWireFormat;
032    import org.activemq.store.MessageStore;
033    import org.activemq.store.PersistenceAdapter;
034    import org.activemq.store.TopicMessageStore;
035    import org.activemq.store.TransactionStore;
036    import org.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
037    import org.activemq.store.vm.VMTransactionStore;
038    import org.activemq.util.FactoryFinder;
039    import org.activemq.util.JMSExceptionHelper;
040    import org.activemq.service.DeadLetterPolicy;
041    import org.activemq.service.MessageIdentity;
042    import org.activemq.store.jdbc.JDBCAdapter.ExpiredMessageResultHandler;
043    import org.activemq.message.ActiveMQMessage;
044    
045    import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
046    import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
047    
048    /**
049     * A {@link PersistenceAdapter} implementation using JDBC for
050     * persistence storage.
051     *
052     * This persistence adapter will correctly remember prepared XA transactions,
053     * but it will not keep track of local transaction commits so that operations
054     * performed against the Message store are done as a single uow. 
055     *
056     * @version $Revision: 1.1 $
057     */
058    public class JDBCPersistenceAdapter implements PersistenceAdapter {
059    
060        private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
061        private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/activemq/store/jdbc/");
062    
063        private WireFormat wireFormat = new StatelessDefaultWireFormat();
064        private DataSource dataSource;
065        private JDBCAdapter adapter;
066            private String adapterClass;
067            private VMTransactionStore transactionStore;
068        private boolean dropTablesOnStartup=false;
069        private ClockDaemon clockDaemon;
070        private Object clockTicket;
071        private DeadLetterPolicy deadLetterPolicy;
072        private BrokerContainer brokerContainer;
073        private boolean autoCleanupExpiredMessages=true;
074        private boolean deleteExpiredMessages=true;
075        private long cleanupRepeatInterval=1000*60*5; // by default, run the cleanup process every 5 minutes
076        private int cleanupPeriod = 1000 * 60 * 5;
077        private String tablePrefix = "";
078    
079        public JDBCPersistenceAdapter() {
080        }
081    
082        public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
083            this.dataSource = ds;
084            this.wireFormat = wireFormat;
085        }
086    
087        public Map getInitialDestinations() {
088            return null;
089        }
090    
091        public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
092            if (adapter == null) {
093                throw new IllegalStateException("Not started");
094            }
095            MessageStore store = new JDBCMessageStore(this, adapter, wireFormat.copy(), destinationName);
096            if( transactionStore!=null ) {
097                store = transactionStore.proxy(store);
098            }
099            return store;
100        }
101    
102        public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
103            if (adapter == null) {
104                throw new IllegalStateException("Not started");
105            }
106            TopicMessageStore store = new JDBCTopicMessageStore(this, adapter, wireFormat.copy(), destinationName);
107            if( transactionStore!=null ) {
108                store = transactionStore.proxy(store);
109            }
110            return store;
111        }
112    
113        public TransactionStore createTransactionStore() throws JMSException {
114            if (adapter == null) {
115                throw new IllegalStateException("Not started");
116            }
117            if( this.transactionStore == null ) {
118                this.transactionStore = new VMTransactionStore();
119            }
120            return this.transactionStore;
121        }
122    
123        public void beginTransaction() throws JMSException {
124            try {
125                Connection c = dataSource.getConnection();
126                c.setAutoCommit(false);
127                TransactionContext.pushConnection(c);
128            }
129            catch (SQLException e) {
130                throw JMSExceptionHelper.newJMSException("Failed to create transaction: " + e, e);
131            }
132        }
133    
134        public void commitTransaction() throws JMSException {
135            Connection c = TransactionContext.popConnection();
136            if (c == null) {
137                log.warn("Commit while no transaction in progress");
138            }
139            else {
140                try {
141                    c.commit();
142                }
143                catch (SQLException e) {
144                    throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + c + ": " + e, e);
145                }
146                finally {
147                    try {
148                        c.close();
149                    }
150                    catch (Throwable e) {
151                    }
152                }
153            }
154        }
155    
156        public void rollbackTransaction() {
157            Connection c = TransactionContext.popConnection();
158            try {
159                c.rollback();
160            }
161            catch (SQLException e) {
162                log.warn("Cannot rollback transaction due to: " + e, e);
163            }
164            finally {
165                try {
166                    c.close();
167                }
168                catch (Throwable e) {
169                }
170            }
171        }
172    
173    
174        public void start() throws JMSException {
175            beginTransaction();
176            Connection c = null;
177            try {
178                // Load the right adapter for the database
179                adapter = null;
180    
181                try {
182                    c = getConnection();
183                }
184                catch (SQLException e) {
185                    throw JMSExceptionHelper.newJMSException("Could not get a database connection: "+e,e);                                 
186                }
187    
188                // If the adapter class is not specified.. try to dectect they right type by getting
189                // info from the database.
190                if( adapterClass == null ) {
191                    
192                    try {
193                    
194                            // Make the filename file system safe.
195                            String driverName = c.getMetaData().getDriverName();
196                            driverName = driverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
197            
198                            try {
199                                    adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(driverName);
200                            log.info("Database driver recognized: [" + driverName + "]");
201                            }
202                            catch (Throwable e) {
203                                log.warn("Database driver NOT recognized: [" + driverName + "].  Will use default JDBC implementation.");
204                            }
205                                    
206                        }
207                        catch (SQLException e) {
208                        log.warn("JDBC error occured while trying to detect database type.  Will use default JDBC implementation: "+e.getMessage());
209                        log.debug("Reason: " + e, e);                    
210                        }
211                        
212                } else {
213                    try {
214                            Class clazz = JDBCPersistenceAdapter.class.getClassLoader().loadClass(adapterClass);
215                            adapter = (DefaultJDBCAdapter)clazz.newInstance();
216                    }
217                    catch (Throwable e) {
218                        log.warn("Invalid JDBC adapter class class (" + adapterClass + ").  Will use default JDBC implementation.");
219                        log.debug("Reason: " + e, e);
220                    }
221                }
222                
223                // Use the default JDBC adapter if the 
224                // Database type is not recognized.
225                if (adapter == null) {
226                    adapter = new DefaultJDBCAdapter();
227                }
228    
229                adapter.getStatementProvider().setTablePrefix(tablePrefix);
230                
231                if( dropTablesOnStartup ) {
232                    try {
233                        adapter.doDropTables(c);
234                    }
235                    catch (SQLException e) {
236                        log.warn("Cannot drop tables due to: " + e, e);
237                    }
238                }
239                try {
240                    adapter.doCreateTables(c);
241                }
242                catch (SQLException e) {
243                    log.warn("Cannot create tables due to: " + e, e);
244                }
245                adapter.initSequenceGenerator(c);
246    
247            }
248            finally {
249                commitTransaction();
250            }
251            
252            if (isAutoCleanupExpiredMessages()) {
253                    // Cleanup the db periodically.
254                    clockTicket = getClockDaemon().executePeriodically(getCleanupRepeatInterval(), new Runnable() {
255                        public void run() {
256                            try {
257                                    cleanup();
258                            } catch (SQLException sqle) {
259                                    log.error("Error in cleanup due to: " + sqle, sqle);
260                            }
261                        }
262                    }, false);
263            }
264        }
265    
266        public void cleanup() throws SQLException {         
267            final Connection c = getConnection();
268            try {           
269                log.debug("Cleaning up old messages in the database");
270                adapter.doDeleteOldMessages(c);
271                adapter.doGetExpiredMessages(c, new ExpiredMessageResultHandler() {
272                    public void onMessage(long seq, String container, String messageID, boolean isSentToDeadLetter) {
273                            try {
274                            // restore the message from the db
275                            MessageStore messageStore = createQueueMessageStore(container);
276                            MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq));
277                            ActiveMQMessage message = messageStore.getMessage(messageIdentity);
278                          if (message != null){
279                            log.debug("Cleaning up old message in the database: " + message.toString());
280                            if (message.isExpired() && !isSentToDeadLetter) {
281                                    // send a dead letter
282                                            sendToDeadLetter(message);
283                          }else {
284                            log.warn("could not find message from store with identity: " + messageIdentity + " in cleanup");
285                          }
286                            }
287                                    // clean up old message, use original identity
288                                    cleanupOldMessage(c, new MessageIdentity(messageID, new Long(seq)));
289                            } catch (JMSException jmse) {
290                                    log.warn("Cleanup expired message failed due to: " + jmse, jmse);
291                            } catch (SQLException sqle) {
292                                    log.warn("Cleanup expired message failed due to: " + sqle, sqle);
293                            }
294                    }
295                });
296            } catch (JMSException e) {
297                log.warn("Old message cleanup failed due to: " + e, e);
298            } catch (SQLException e) {
299                log.warn("Old message cleanup failed due to: " + e, e);
300            } finally {
301                if (c!= null) returnConnection(c);
302                log.debug("Cleanup done.");
303            }
304        }
305        
306        protected void sendToDeadLetter(ActiveMQMessage message) throws JMSException {
307            // send a dead letter if the dead letter policy is enabled
308            if (getBrokerContainer()!=null) {
309                    DeadLetterPolicy deadLetterPolicy = getBrokerContainer().getBroker().getDeadLetterPolicy();
310                    if (deadLetterPolicy != null && deadLetterPolicy.isDeadLetterEnabled()) {
311                            deadLetterPolicy.sendToDeadLetter(message);
312                    }
313            }
314        }
315        
316        public void cleanupOldMessage(Connection c, MessageIdentity messageIdentity) throws JMSException, SQLException {
317            if (getDeleteExpiredMessages()==true) {
318                    adapter.doDeleteExpiredMessage(c, messageIdentity);
319            }
320        }
321    
322        /** 
323         * Ensures that no previous dead letter was already sent for this message
324         */
325        public boolean deadLetterAlreadySent(long seq, boolean useDatabaseLocking) {
326            final BooleanWrapper alreadySentToDeadLetter = new BooleanWrapper(true);
327            try {
328                    beginTransaction();
329                    Connection c = getConnection();
330                    // fetch the message from the persistent store
331                    getJDBCAdapter().doGetMessageForUpdate(c, seq, useDatabaseLocking, new ExpiredMessageResultHandler() {
332                    public void onMessage(long seq, String container, String messageID, boolean isSentToDeadLetter) {
333                            if (!isSentToDeadLetter) {
334                                    alreadySentToDeadLetter.setValue(false);
335                            }
336                    }
337                    });
338                    if (!alreadySentToDeadLetter.getValue()) {
339                            // if not already sent, set the deadletter flag in the db
340                            getJDBCAdapter().doSetDeadLetterFlag(c, seq);
341                    }
342                    commitTransaction();
343                    return alreadySentToDeadLetter.getValue();
344            } catch (Exception e) {
345                    log.error("Could not get a database connection due to: " + e, e);
346                    rollbackTransaction();
347                    return true; // avoid sending a dead letter in case there is a problem 
348            }
349        }
350        
351        private class BooleanWrapper {
352            boolean value;
353            BooleanWrapper(boolean value) {
354                    setValue(value);
355            }
356            boolean getValue() {
357                    return value;
358            }
359            void setValue(boolean value) {
360                    this.value = value;
361            }
362        }
363        
364        public void setClockDaemon(ClockDaemon clockDaemon) {
365            this.clockDaemon = clockDaemon;
366        }
367    
368        public ClockDaemon getClockDaemon() {
369            if (clockDaemon == null) {
370                clockDaemon = new ClockDaemon();
371                clockDaemon.setThreadFactory(new ThreadFactory() {
372                    public Thread newThread(Runnable runnable) {
373                        Thread thread = new Thread(runnable, "Cleanup Timmer");
374                        thread.setDaemon(true);
375                        return thread;
376                    }
377                });
378            }
379            return clockDaemon;
380        }
381    
382        public synchronized void stop() throws JMSException {
383            if (clockTicket != null) {
384                // Stop the periodical cleanup.
385                ClockDaemon.cancel(clockTicket);
386                clockTicket=null;
387                clockDaemon.shutDown();
388            }
389        }
390        
391        public BrokerContainer getBrokerContainer() {
392            return brokerContainer;
393        }
394    
395        public void setBrokerContainer(BrokerContainer brokerContainer) {
396            this.brokerContainer = brokerContainer;
397        }
398    
399        public DataSource getDataSource() {
400            return dataSource;
401        }
402    
403        public void setDataSource(DataSource dataSource) {
404            this.dataSource = dataSource;
405        }
406    
407        public WireFormat getWireFormat() {
408            return wireFormat;
409        }
410    
411        public void setWireFormat(WireFormat wireFormat) {
412            this.wireFormat = wireFormat;
413        }
414    
415        public Connection getConnection() throws SQLException {
416            Connection answer = TransactionContext.peekConnection();
417            if (answer == null) {
418                answer = dataSource.getConnection();
419                answer.setAutoCommit(true);
420            }
421            return answer;
422        }
423    
424        public void returnConnection(Connection connection) {
425            if (connection == null) {
426                return;
427            }
428            Connection peek = TransactionContext.peekConnection();
429            if (peek != connection) {
430                try {
431                    connection.close();
432                }
433                catch (SQLException e) {
434                }
435            }
436        }
437            
438        /**
439             * @return Returns the adapterClass.
440             */
441            public String getAdapterClass() {
442                    return adapterClass;
443            }
444            
445            /**
446             * @param adapterClass The adapterClass to set.
447             */
448            public void setAdapterClass(String adapterClass) {
449                    this.adapterClass = adapterClass;
450            }
451            
452            public JDBCAdapter getJDBCAdapter() {
453                    return adapter;
454            }
455            
456        /**
457         * @return Returns the dropTablesOnStartup.
458         */
459        public boolean getDropTablesOnStartup() {
460            return dropTablesOnStartup;
461        }
462        /**
463         * @param dropTablesOnStartup The dropTablesOnStartup to set.
464         */
465        public void setDropTablesOnStartup(boolean dropTablesOnStartup) {
466            this.dropTablesOnStartup = dropTablesOnStartup;
467        }
468        
469        public DeadLetterPolicy getDeadLetterPolicy() {
470            return this.deadLetterPolicy;
471        }
472        
473        public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
474            this.deadLetterPolicy = deadLetterPolicy;
475        }
476        
477        public boolean getDeleteExpiredMessages() {
478            return deleteExpiredMessages;
479        }
480        
481        public void setDeleteExpiredMessages(boolean deleteExpiredMessages) {
482            this.deleteExpiredMessages = deleteExpiredMessages;
483        }
484            /**
485             * @return Returns the autoCleanupExpiredMessages.
486             */
487            public boolean isAutoCleanupExpiredMessages() {
488                    return autoCleanupExpiredMessages;
489            }
490            /**
491             * @param autoCleanupExpiredMessages The autoCleanupExpiredMessages to set.
492             */
493            public void setAutoCleanupExpiredMessages(boolean autoCleanupExpiredMessages) {
494                    this.autoCleanupExpiredMessages = autoCleanupExpiredMessages;
495            }
496            /**
497             * @return Returns the cleanupRepeatInterval.
498             */
499            public long getCleanupRepeatInterval() {
500                    return cleanupRepeatInterval;
501            }
502            /**
503             * @param cleanupRepeatInterval The cleanupRepeatInterval to set.
504             */
505            public void setCleanupRepeatInterval(long cleanupRepeatInterval) {
506                    this.cleanupRepeatInterval = cleanupRepeatInterval;
507            }
508    
509        public int getCleanupPeriod() {
510            return cleanupPeriod;
511        }
512    
513        public void setCleanupPeriod(int cleanupPeriod) {
514            this.cleanupPeriod = cleanupPeriod;
515        }
516    
517        public String getTablePrefix() {
518            return tablePrefix;
519        }
520    
521        public void setTablePrefix(String tablePrefix) {
522            this.tablePrefix = tablePrefix;
523        }
524    }