001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * Copyright 2004 Protique Ltd 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 package org.activemq.store.journal; 020 021 import java.util.ArrayList; 022 import java.util.Collections; 023 import java.util.HashSet; 024 import java.util.Iterator; 025 import java.util.LinkedHashMap; 026 027 import javax.jms.JMSException; 028 029 import org.activeio.journal.RecordLocation; 030 import org.activemq.message.ActiveMQMessage; 031 import org.activemq.message.MessageAck; 032 import org.activemq.service.MessageIdentity; 033 import org.activemq.service.Transaction; 034 import org.activemq.service.TransactionManager; 035 import org.activemq.service.TransactionTask; 036 import org.activemq.store.MessageStore; 037 import org.activemq.store.RecoveryListener; 038 import org.activemq.store.cache.CacheMessageStore; 039 import org.activemq.store.cache.CacheMessageStoreAware; 040 import org.activemq.util.Callback; 041 import org.activemq.util.TransactionTemplate; 042 import org.apache.commons.logging.Log; 043 import org.apache.commons.logging.LogFactory; 044 045 /** 046 * A MessageStore that uses a Journal to store it's messages. 047 * 048 * @version $Revision: 1.1 $ 049 */ 050 public class JournalMessageStore implements MessageStore, CacheMessageStoreAware { 051 052 private static final Log log = LogFactory.getLog(JournalMessageStore.class); 053 protected final JournalPersistenceAdapter peristenceAdapter; 054 protected final MessageStore longTermStore; 055 protected final String destinationName; 056 protected final TransactionTemplate transactionTemplate; 057 058 private LinkedHashMap addedMessageIds = new LinkedHashMap(); 059 private ArrayList removedMessageLocations = new ArrayList(); 060 protected HashSet inFlightTxLocations = new HashSet(); 061 protected RecordLocation lastLocation; 062 063 /** A MessageStore that we can use to retreive messages quickly. */ 064 private MessageStore cacheMessageStore = this; 065 066 protected final JournalTransactionStore transactionStore; 067 068 private LinkedHashMap cpAddedMessageIds; 069 070 int removedFromJournal; 071 072 public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, String destinationName) { 073 this.peristenceAdapter = adapter; 074 this.transactionStore = this.peristenceAdapter.getTransactionStore(); 075 this.longTermStore = checkpointStore; 076 this.destinationName = destinationName; 077 this.transactionTemplate = new TransactionTemplate(adapter); 078 } 079 080 /** 081 * Not synchronized since the Journal has better throughput if you increase 082 * the number of conncurrent writes that it is doing. 083 */ 084 public void addMessage(final ActiveMQMessage message) throws JMSException { 085 final boolean debug = log.isDebugEnabled(); 086 final RecordLocation location = peristenceAdapter.writePacket(destinationName, message, message.isReceiptRequired()); 087 if( !TransactionManager.isCurrentTransaction() ) { 088 if( debug ) 089 log.debug("Journalled message add: "+message.getJMSMessageID()+" at "+location); 090 addMessage(message, location); 091 } else { 092 if( debug ) 093 log.debug("Journalled in flight message add: "+message.getJMSMessageID()+" at "+location); 094 synchronized (this) { 095 inFlightTxLocations.add(location); 096 } 097 final Transaction tx = TransactionManager.getContexTransaction(); 098 transactionStore.addMessage(this, message, location); 099 tx.addPostCommitTask(new TransactionTask() { 100 public void execute() throws Throwable { 101 if( debug ) 102 log.debug("In flight message add commit: "+message.getJMSMessageID()+" at "+location); 103 synchronized (JournalMessageStore.this) { 104 inFlightTxLocations.remove(location); 105 addMessage(message, location); 106 } 107 } 108 }); 109 tx.addPostRollbackTask(new TransactionTask(){ 110 public void execute() throws Throwable { 111 if( debug ) 112 log.debug("In flight message add rollback: "+message.getJMSMessageID()+" at "+location); 113 // TODO Auto-generated method stub 114 synchronized (JournalMessageStore.this) { 115 inFlightTxLocations.remove(location); 116 } 117 } 118 }); 119 } 120 } 121 122 /** 123 * @param message 124 * @param location 125 */ 126 private void addMessage(final ActiveMQMessage message, final RecordLocation location) { 127 synchronized (this) { 128 lastLocation=location; 129 MessageIdentity id = message.getJMSMessageIdentity(); 130 addedMessageIds.put(id, location); 131 } 132 } 133 134 /** 135 */ 136 public void removeMessage(final MessageAck ack) throws JMSException { 137 138 final boolean debug = log.isDebugEnabled(); 139 final RecordLocation location = peristenceAdapter.writePacket(destinationName, ack, ack.isReceiptRequired()); 140 if( !TransactionManager.isCurrentTransaction() ) { 141 if( debug ) 142 log.debug("Journalled message remove: "+ack.getMessageID()+" at "+location); 143 removeMessage(ack, location); 144 } else { 145 if( debug ) 146 log.debug("Journalled in flight message remove: "+ack.getMessageID()+" at "+location); 147 148 synchronized( this ) { 149 inFlightTxLocations.add(location); 150 } 151 final Transaction tx = TransactionManager.getContexTransaction(); 152 transactionStore.removeMessage(this, ack, location); 153 tx.addPostCommitTask(new TransactionTask(){ 154 public void execute() throws Throwable { 155 if( debug ) 156 log.debug("In flight message remove commit: "+ack.getMessageID()+" at "+location); 157 158 synchronized (JournalMessageStore.this) { 159 inFlightTxLocations.remove(location); 160 removeMessage(ack, location); 161 } 162 } 163 }); 164 tx.addPostRollbackTask(new TransactionTask(){ 165 public void execute() throws Throwable { 166 // TODO Auto-generated method stub 167 if( debug ) 168 log.debug("In flight message remove rollback: "+ack.getMessageID()+" at "+location); 169 synchronized (JournalMessageStore.this) { 170 inFlightTxLocations.remove(location); 171 } 172 } 173 }); 174 } 175 } 176 177 /** 178 * @param ack 179 * @param location 180 */ 181 private void removeMessage(final MessageAck ack, final RecordLocation location) { 182 synchronized (this) { 183 lastLocation=location; 184 MessageIdentity id = ack.getMessageIdentity(); 185 RecordLocation msgLocation = (RecordLocation) addedMessageIds.remove(id); 186 if (msgLocation == null) { 187 removedMessageLocations.add(ack); 188 } else { 189 removedFromJournal++; 190 } 191 } 192 } 193 194 /** 195 * @return 196 * @throws JMSException 197 */ 198 public RecordLocation checkpoint() throws JMSException { 199 200 RecordLocation rc; 201 final ArrayList cpRemovedMessageLocations; 202 final ArrayList cpActiveJournalLocations; 203 204 // swap out the message hash maps.. 205 synchronized (this) { 206 cpAddedMessageIds = this.addedMessageIds; 207 cpRemovedMessageLocations = this.removedMessageLocations; 208 209 this.inFlightTxLocations.removeAll(this.removedMessageLocations); 210 this.inFlightTxLocations.removeAll(this.addedMessageIds.values()); 211 cpActiveJournalLocations=new ArrayList(inFlightTxLocations); 212 213 this.addedMessageIds = new LinkedHashMap(); 214 this.removedMessageLocations = new ArrayList(); 215 log.debug("removedFromJournal="+removedFromJournal); 216 removedFromJournal=0; 217 } 218 219 final boolean debug = log.isDebugEnabled(); 220 if( debug ) 221 log.debug("Checkpoint: "+destinationName); 222 223 224 final int messagesAdded[]=new int[]{0}; 225 final int messagesRemoved[]=new int[]{0}; 226 227 transactionTemplate.run(new Callback() { 228 public void execute() throws Throwable { 229 230 // Checkpoint the added messages. 231 Iterator iterator = cpAddedMessageIds.keySet().iterator(); 232 while (iterator.hasNext()) { 233 MessageIdentity identity = (MessageIdentity) iterator.next(); 234 if( debug ) 235 log.debug("Adding: "+identity.getMessageID()); 236 ActiveMQMessage msg = getCacheMessage(identity); 237 // Pull it out of the journal if we have to. 238 if (msg == null) { 239 RecordLocation location = (RecordLocation) cpAddedMessageIds.get(identity); 240 msg = (ActiveMQMessage) peristenceAdapter.readPacket((RecordLocation) location); 241 } 242 if( msg != null ) { 243 try { 244 longTermStore.addMessage(msg); 245 messagesAdded[0]++; 246 } catch (Throwable e) { 247 log.warn("Message could not be added to long term store: " + e.getMessage(), e); 248 } 249 } else { 250 log.warn("Journal could not reload message: " + identity); 251 } 252 } 253 254 // Checkpoint the removed messages. 255 iterator = cpRemovedMessageLocations.iterator(); 256 while (iterator.hasNext()) { 257 try { 258 MessageAck ack = (MessageAck) iterator.next(); 259 if( debug ) 260 log.debug("Removing: "+ack.getMessageID()); 261 longTermStore.removeMessage(ack); 262 messagesRemoved[0]++; 263 } catch (Throwable e) { 264 log.debug("Message could not be removed from long term store: " + e.getMessage(), e); 265 } 266 } 267 } 268 269 }); 270 271 log.debug("Added "+messagesAdded[0]+" message(s) and removed "+messagesRemoved[0]+" message(s). removedFromJournal="+removedFromJournal); 272 synchronized (this) { 273 cpAddedMessageIds = null; 274 } 275 276 Collections.sort(cpActiveJournalLocations); 277 if( debug ) 278 log.debug("In flight journal locations: "+cpActiveJournalLocations); 279 280 if( cpActiveJournalLocations.size() > 0 ) { 281 return (RecordLocation) cpActiveJournalLocations.get(0); 282 } else { 283 return lastLocation; 284 } 285 } 286 287 private ActiveMQMessage getCacheMessage(MessageIdentity identity) throws JMSException { 288 return cacheMessageStore.getMessage(identity); 289 } 290 291 /** 292 * 293 */ 294 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException { 295 ActiveMQMessage answer = null; 296 297 Object location; 298 synchronized (this) { 299 location = addedMessageIds.get(identity); 300 if( location==null && cpAddedMessageIds!=null ) 301 location = cpAddedMessageIds.get(identity); 302 } 303 304 // Do we have a still have it in the journal? 305 if (location != null ) { 306 try { 307 answer = (ActiveMQMessage)peristenceAdapter.readPacket((RecordLocation) location); 308 if (answer != null) 309 return answer; 310 } catch (Throwable e) { 311 // We could have had an async checkpoint and thus we cannot read that location anymore, 312 // but now the message should be in the long term store. 313 } 314 } 315 316 // If all else fails try the long term message store. 317 return longTermStore.getMessage(identity); 318 } 319 320 /** 321 * Replays the checkpointStore first as those messages are the oldest ones, 322 * then messages are replayed from the transaction log and then the cache is 323 * updated. 324 * 325 * @param listener 326 * @throws JMSException 327 */ 328 public void recover(final RecoveryListener listener) throws JMSException { 329 peristenceAdapter.checkpoint(true); 330 longTermStore.recover(listener); 331 } 332 333 public void start() throws JMSException { 334 longTermStore.start(); 335 } 336 337 public void stop() throws JMSException { 338 longTermStore.stop(); 339 } 340 341 /** 342 * @return Returns the longTermStore. 343 */ 344 public MessageStore getLongTermMessageStore() { 345 return longTermStore; 346 } 347 348 /** 349 * @see org.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.activemq.store.cache.CacheMessageStore) 350 */ 351 public void setCacheMessageStore(CacheMessageStore store) { 352 cacheMessageStore = store; 353 // Propagate the setCacheMessageStore method call to the longTermStore 354 // if possible. 355 if (longTermStore instanceof CacheMessageStoreAware) { 356 ((CacheMessageStoreAware) longTermStore).setCacheMessageStore(store); 357 } 358 } 359 360 /** 361 * @see org.activemq.store.MessageStore#removeAllMessages() 362 */ 363 public void removeAllMessages() throws JMSException { 364 peristenceAdapter.checkpoint(true); 365 longTermStore.removeAllMessages(); 366 } 367 368 public void replayAddMessage(ActiveMQMessage msg) { 369 try { 370 // Only add the message if it has not already been added. 371 ActiveMQMessage t = longTermStore.getMessage(msg.getJMSMessageIdentity()); 372 if( t==null ) { 373 longTermStore.addMessage(msg); 374 } 375 } 376 catch (Throwable e) { 377 log.debug("Could not replay add for message '" + msg.getJMSMessageIdentity().getMessageID() + "'. Message may have already been added. reason: " + e); 378 } 379 } 380 381 public void replayRemoveMessage(MessageAck ack) { 382 try { 383 // Only remove the message if it has not already been removed. 384 ActiveMQMessage t = longTermStore.getMessage(ack.getMessageIdentity()); 385 if( t!=null ) { 386 longTermStore.removeMessage(ack); 387 } 388 } 389 catch (Throwable e) { 390 log.debug("Could not replay acknowledge for message '" + ack.getMessageID() + "'. Message may have already been acknowledged. reason: " + e); 391 } 392 } 393 394 }