001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.store.jdbc; 019 020 import java.sql.Connection; 021 import java.sql.SQLException; 022 import java.util.Map; 023 024 import javax.jms.JMSException; 025 import javax.sql.DataSource; 026 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.activemq.broker.BrokerContainer; 030 import org.activemq.io.WireFormat; 031 import org.activemq.io.impl.StatelessDefaultWireFormat; 032 import org.activemq.store.MessageStore; 033 import org.activemq.store.PersistenceAdapter; 034 import org.activemq.store.TopicMessageStore; 035 import org.activemq.store.TransactionStore; 036 import org.activemq.store.jdbc.adapter.DefaultJDBCAdapter; 037 import org.activemq.store.vm.VMTransactionStore; 038 import org.activemq.util.FactoryFinder; 039 import org.activemq.util.JMSExceptionHelper; 040 import org.activemq.service.DeadLetterPolicy; 041 import org.activemq.service.MessageIdentity; 042 import org.activemq.store.jdbc.JDBCAdapter.ExpiredMessageResultHandler; 043 import org.activemq.message.ActiveMQMessage; 044 045 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; 046 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 047 048 /** 049 * A {@link PersistenceAdapter} implementation using JDBC for 050 * persistence storage. 051 * 052 * This persistence adapter will correctly remember prepared XA transactions, 053 * but it will not keep track of local transaction commits so that operations 054 * performed against the Message store are done as a single uow. 055 * 056 * @version $Revision: 1.1 $ 057 */ 058 public class JDBCPersistenceAdapter implements PersistenceAdapter { 059 060 private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class); 061 private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/activemq/store/jdbc/"); 062 063 private WireFormat wireFormat = new StatelessDefaultWireFormat(); 064 private DataSource dataSource; 065 private JDBCAdapter adapter; 066 private String adapterClass; 067 private VMTransactionStore transactionStore; 068 private boolean dropTablesOnStartup=false; 069 private ClockDaemon clockDaemon; 070 private Object clockTicket; 071 private DeadLetterPolicy deadLetterPolicy; 072 private BrokerContainer brokerContainer; 073 private boolean autoCleanupExpiredMessages=true; 074 private boolean deleteExpiredMessages=true; 075 private long cleanupRepeatInterval=1000*60*5; // by default, run the cleanup process every 5 minutes 076 private int cleanupPeriod = 1000 * 60 * 5; 077 private String tablePrefix = ""; 078 079 public JDBCPersistenceAdapter() { 080 } 081 082 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) { 083 this.dataSource = ds; 084 this.wireFormat = wireFormat; 085 } 086 087 public Map getInitialDestinations() { 088 return null; 089 } 090 091 public MessageStore createQueueMessageStore(String destinationName) throws JMSException { 092 if (adapter == null) { 093 throw new IllegalStateException("Not started"); 094 } 095 MessageStore store = new JDBCMessageStore(this, adapter, wireFormat.copy(), destinationName); 096 if( transactionStore!=null ) { 097 store = transactionStore.proxy(store); 098 } 099 return store; 100 } 101 102 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException { 103 if (adapter == null) { 104 throw new IllegalStateException("Not started"); 105 } 106 TopicMessageStore store = new JDBCTopicMessageStore(this, adapter, wireFormat.copy(), destinationName); 107 if( transactionStore!=null ) { 108 store = transactionStore.proxy(store); 109 } 110 return store; 111 } 112 113 public TransactionStore createTransactionStore() throws JMSException { 114 if (adapter == null) { 115 throw new IllegalStateException("Not started"); 116 } 117 if( this.transactionStore == null ) { 118 this.transactionStore = new VMTransactionStore(); 119 } 120 return this.transactionStore; 121 } 122 123 public void beginTransaction() throws JMSException { 124 try { 125 Connection c = dataSource.getConnection(); 126 c.setAutoCommit(false); 127 TransactionContext.pushConnection(c); 128 } 129 catch (SQLException e) { 130 throw JMSExceptionHelper.newJMSException("Failed to create transaction: " + e, e); 131 } 132 } 133 134 public void commitTransaction() throws JMSException { 135 Connection c = TransactionContext.popConnection(); 136 if (c == null) { 137 log.warn("Commit while no transaction in progress"); 138 } 139 else { 140 try { 141 c.commit(); 142 } 143 catch (SQLException e) { 144 throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + c + ": " + e, e); 145 } 146 finally { 147 try { 148 c.close(); 149 } 150 catch (Throwable e) { 151 } 152 } 153 } 154 } 155 156 public void rollbackTransaction() { 157 Connection c = TransactionContext.popConnection(); 158 try { 159 c.rollback(); 160 } 161 catch (SQLException e) { 162 log.warn("Cannot rollback transaction due to: " + e, e); 163 } 164 finally { 165 try { 166 c.close(); 167 } 168 catch (Throwable e) { 169 } 170 } 171 } 172 173 174 public void start() throws JMSException { 175 beginTransaction(); 176 Connection c = null; 177 try { 178 // Load the right adapter for the database 179 adapter = null; 180 181 try { 182 c = getConnection(); 183 } 184 catch (SQLException e) { 185 throw JMSExceptionHelper.newJMSException("Could not get a database connection: "+e,e); 186 } 187 188 // If the adapter class is not specified.. try to dectect they right type by getting 189 // info from the database. 190 if( adapterClass == null ) { 191 192 try { 193 194 // Make the filename file system safe. 195 String driverName = c.getMetaData().getDriverName(); 196 driverName = driverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(); 197 198 try { 199 adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(driverName); 200 log.info("Database driver recognized: [" + driverName + "]"); 201 } 202 catch (Throwable e) { 203 log.warn("Database driver NOT recognized: [" + driverName + "]. Will use default JDBC implementation."); 204 } 205 206 } 207 catch (SQLException e) { 208 log.warn("JDBC error occured while trying to detect database type. Will use default JDBC implementation: "+e.getMessage()); 209 log.debug("Reason: " + e, e); 210 } 211 212 } else { 213 try { 214 Class clazz = JDBCPersistenceAdapter.class.getClassLoader().loadClass(adapterClass); 215 adapter = (DefaultJDBCAdapter)clazz.newInstance(); 216 } 217 catch (Throwable e) { 218 log.warn("Invalid JDBC adapter class class (" + adapterClass + "). Will use default JDBC implementation."); 219 log.debug("Reason: " + e, e); 220 } 221 } 222 223 // Use the default JDBC adapter if the 224 // Database type is not recognized. 225 if (adapter == null) { 226 adapter = new DefaultJDBCAdapter(); 227 } 228 229 adapter.getStatementProvider().setTablePrefix(tablePrefix); 230 231 if( dropTablesOnStartup ) { 232 try { 233 adapter.doDropTables(c); 234 } 235 catch (SQLException e) { 236 log.warn("Cannot drop tables due to: " + e, e); 237 } 238 } 239 try { 240 adapter.doCreateTables(c); 241 } 242 catch (SQLException e) { 243 log.warn("Cannot create tables due to: " + e, e); 244 } 245 adapter.initSequenceGenerator(c); 246 247 } 248 finally { 249 commitTransaction(); 250 } 251 252 if (isAutoCleanupExpiredMessages()) { 253 // Cleanup the db periodically. 254 clockTicket = getClockDaemon().executePeriodically(getCleanupRepeatInterval(), new Runnable() { 255 public void run() { 256 try { 257 cleanup(); 258 } catch (SQLException sqle) { 259 log.error("Error in cleanup due to: " + sqle, sqle); 260 } 261 } 262 }, false); 263 } 264 } 265 266 public void cleanup() throws SQLException { 267 final Connection c = getConnection(); 268 try { 269 log.debug("Cleaning up old messages in the database"); 270 adapter.doDeleteOldMessages(c); 271 adapter.doGetExpiredMessages(c, new ExpiredMessageResultHandler() { 272 public void onMessage(long seq, String container, String messageID, boolean isSentToDeadLetter) { 273 try { 274 // restore the message from the db 275 MessageStore messageStore = createQueueMessageStore(container); 276 MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq)); 277 ActiveMQMessage message = messageStore.getMessage(messageIdentity); 278 if (message != null){ 279 log.debug("Cleaning up old message in the database: " + message.toString()); 280 if (message.isExpired() && !isSentToDeadLetter) { 281 // send a dead letter 282 sendToDeadLetter(message); 283 }else { 284 log.warn("could not find message from store with identity: " + messageIdentity + " in cleanup"); 285 } 286 } 287 // clean up old message, use original identity 288 cleanupOldMessage(c, new MessageIdentity(messageID, new Long(seq))); 289 } catch (JMSException jmse) { 290 log.warn("Cleanup expired message failed due to: " + jmse, jmse); 291 } catch (SQLException sqle) { 292 log.warn("Cleanup expired message failed due to: " + sqle, sqle); 293 } 294 } 295 }); 296 } catch (JMSException e) { 297 log.warn("Old message cleanup failed due to: " + e, e); 298 } catch (SQLException e) { 299 log.warn("Old message cleanup failed due to: " + e, e); 300 } finally { 301 if (c!= null) returnConnection(c); 302 log.debug("Cleanup done."); 303 } 304 } 305 306 protected void sendToDeadLetter(ActiveMQMessage message) throws JMSException { 307 // send a dead letter if the dead letter policy is enabled 308 if (getBrokerContainer()!=null) { 309 DeadLetterPolicy deadLetterPolicy = getBrokerContainer().getBroker().getDeadLetterPolicy(); 310 if (deadLetterPolicy != null && deadLetterPolicy.isDeadLetterEnabled()) { 311 deadLetterPolicy.sendToDeadLetter(message); 312 } 313 } 314 } 315 316 public void cleanupOldMessage(Connection c, MessageIdentity messageIdentity) throws JMSException, SQLException { 317 if (getDeleteExpiredMessages()==true) { 318 adapter.doDeleteExpiredMessage(c, messageIdentity); 319 } 320 } 321 322 /** 323 * Ensures that no previous dead letter was already sent for this message 324 */ 325 public boolean deadLetterAlreadySent(long seq, boolean useDatabaseLocking) { 326 final BooleanWrapper alreadySentToDeadLetter = new BooleanWrapper(true); 327 try { 328 beginTransaction(); 329 Connection c = getConnection(); 330 // fetch the message from the persistent store 331 getJDBCAdapter().doGetMessageForUpdate(c, seq, useDatabaseLocking, new ExpiredMessageResultHandler() { 332 public void onMessage(long seq, String container, String messageID, boolean isSentToDeadLetter) { 333 if (!isSentToDeadLetter) { 334 alreadySentToDeadLetter.setValue(false); 335 } 336 } 337 }); 338 if (!alreadySentToDeadLetter.getValue()) { 339 // if not already sent, set the deadletter flag in the db 340 getJDBCAdapter().doSetDeadLetterFlag(c, seq); 341 } 342 commitTransaction(); 343 return alreadySentToDeadLetter.getValue(); 344 } catch (Exception e) { 345 log.error("Could not get a database connection due to: " + e, e); 346 rollbackTransaction(); 347 return true; // avoid sending a dead letter in case there is a problem 348 } 349 } 350 351 private class BooleanWrapper { 352 boolean value; 353 BooleanWrapper(boolean value) { 354 setValue(value); 355 } 356 boolean getValue() { 357 return value; 358 } 359 void setValue(boolean value) { 360 this.value = value; 361 } 362 } 363 364 public void setClockDaemon(ClockDaemon clockDaemon) { 365 this.clockDaemon = clockDaemon; 366 } 367 368 public ClockDaemon getClockDaemon() { 369 if (clockDaemon == null) { 370 clockDaemon = new ClockDaemon(); 371 clockDaemon.setThreadFactory(new ThreadFactory() { 372 public Thread newThread(Runnable runnable) { 373 Thread thread = new Thread(runnable, "Cleanup Timmer"); 374 thread.setDaemon(true); 375 return thread; 376 } 377 }); 378 } 379 return clockDaemon; 380 } 381 382 public synchronized void stop() throws JMSException { 383 if (clockTicket != null) { 384 // Stop the periodical cleanup. 385 ClockDaemon.cancel(clockTicket); 386 clockTicket=null; 387 clockDaemon.shutDown(); 388 } 389 } 390 391 public BrokerContainer getBrokerContainer() { 392 return brokerContainer; 393 } 394 395 public void setBrokerContainer(BrokerContainer brokerContainer) { 396 this.brokerContainer = brokerContainer; 397 } 398 399 public DataSource getDataSource() { 400 return dataSource; 401 } 402 403 public void setDataSource(DataSource dataSource) { 404 this.dataSource = dataSource; 405 } 406 407 public WireFormat getWireFormat() { 408 return wireFormat; 409 } 410 411 public void setWireFormat(WireFormat wireFormat) { 412 this.wireFormat = wireFormat; 413 } 414 415 public Connection getConnection() throws SQLException { 416 Connection answer = TransactionContext.peekConnection(); 417 if (answer == null) { 418 answer = dataSource.getConnection(); 419 answer.setAutoCommit(true); 420 } 421 return answer; 422 } 423 424 public void returnConnection(Connection connection) { 425 if (connection == null) { 426 return; 427 } 428 Connection peek = TransactionContext.peekConnection(); 429 if (peek != connection) { 430 try { 431 connection.close(); 432 } 433 catch (SQLException e) { 434 } 435 } 436 } 437 438 /** 439 * @return Returns the adapterClass. 440 */ 441 public String getAdapterClass() { 442 return adapterClass; 443 } 444 445 /** 446 * @param adapterClass The adapterClass to set. 447 */ 448 public void setAdapterClass(String adapterClass) { 449 this.adapterClass = adapterClass; 450 } 451 452 public JDBCAdapter getJDBCAdapter() { 453 return adapter; 454 } 455 456 /** 457 * @return Returns the dropTablesOnStartup. 458 */ 459 public boolean getDropTablesOnStartup() { 460 return dropTablesOnStartup; 461 } 462 /** 463 * @param dropTablesOnStartup The dropTablesOnStartup to set. 464 */ 465 public void setDropTablesOnStartup(boolean dropTablesOnStartup) { 466 this.dropTablesOnStartup = dropTablesOnStartup; 467 } 468 469 public DeadLetterPolicy getDeadLetterPolicy() { 470 return this.deadLetterPolicy; 471 } 472 473 public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) { 474 this.deadLetterPolicy = deadLetterPolicy; 475 } 476 477 public boolean getDeleteExpiredMessages() { 478 return deleteExpiredMessages; 479 } 480 481 public void setDeleteExpiredMessages(boolean deleteExpiredMessages) { 482 this.deleteExpiredMessages = deleteExpiredMessages; 483 } 484 /** 485 * @return Returns the autoCleanupExpiredMessages. 486 */ 487 public boolean isAutoCleanupExpiredMessages() { 488 return autoCleanupExpiredMessages; 489 } 490 /** 491 * @param autoCleanupExpiredMessages The autoCleanupExpiredMessages to set. 492 */ 493 public void setAutoCleanupExpiredMessages(boolean autoCleanupExpiredMessages) { 494 this.autoCleanupExpiredMessages = autoCleanupExpiredMessages; 495 } 496 /** 497 * @return Returns the cleanupRepeatInterval. 498 */ 499 public long getCleanupRepeatInterval() { 500 return cleanupRepeatInterval; 501 } 502 /** 503 * @param cleanupRepeatInterval The cleanupRepeatInterval to set. 504 */ 505 public void setCleanupRepeatInterval(long cleanupRepeatInterval) { 506 this.cleanupRepeatInterval = cleanupRepeatInterval; 507 } 508 509 public int getCleanupPeriod() { 510 return cleanupPeriod; 511 } 512 513 public void setCleanupPeriod(int cleanupPeriod) { 514 this.cleanupPeriod = cleanupPeriod; 515 } 516 517 public String getTablePrefix() { 518 return tablePrefix; 519 } 520 521 public void setTablePrefix(String tablePrefix) { 522 this.tablePrefix = tablePrefix; 523 } 524 }