001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * Copyright 2004 Protique Ltd 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 package org.activemq.store.journal; 020 021 import java.util.HashMap; 022 import java.util.Iterator; 023 024 import javax.jms.JMSException; 025 026 import org.activeio.journal.RecordLocation; 027 import org.activemq.message.ConsumerInfo; 028 import org.activemq.service.MessageIdentity; 029 import org.activemq.service.SubscriberEntry; 030 import org.activemq.service.Transaction; 031 import org.activemq.service.TransactionManager; 032 import org.activemq.service.TransactionTask; 033 import org.activemq.store.RecoveryListener; 034 import org.activemq.store.TopicMessageStore; 035 import org.activemq.util.Callback; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * A MessageStore that uses a Journal to store it's messages. 041 * 042 * @version $Revision: 1.1 $ 043 */ 044 public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore { 045 private static final Log log = LogFactory.getLog(JournalTopicMessageStore.class); 046 047 private TopicMessageStore longTermStore; 048 private HashMap ackedLastAckLocations = new HashMap(); 049 050 public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, String destinationName) { 051 super(adapter, checkpointStore, destinationName); 052 this.longTermStore = checkpointStore; 053 } 054 055 public void recoverSubscription(String subscriptionId, MessageIdentity lastDispatchedMessage, RecoveryListener listener) throws JMSException { 056 peristenceAdapter.checkpoint(true); 057 longTermStore.recoverSubscription(subscriptionId, lastDispatchedMessage, listener); 058 } 059 060 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException { 061 return longTermStore.getSubscriberEntry(info); 062 } 063 064 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException { 065 peristenceAdapter.checkpoint(true); 066 longTermStore.setSubscriberEntry(info, subscriberEntry); 067 } 068 069 public MessageIdentity getLastestMessageIdentity() throws JMSException { 070 return longTermStore.getLastestMessageIdentity(); 071 } 072 073 public void incrementMessageCount(MessageIdentity messageId) throws JMSException { 074 longTermStore.incrementMessageCount(messageId); 075 } 076 077 public void decrementMessageCountAndMaybeDelete(MessageIdentity messageId) throws JMSException { 078 longTermStore.decrementMessageCountAndMaybeDelete(messageId); 079 } 080 081 /** 082 */ 083 public void setLastAcknowledgedMessageIdentity(final String subscription, final MessageIdentity messageIdentity) throws JMSException { 084 final boolean debug = log.isDebugEnabled(); 085 final RecordLocation location = peristenceAdapter.writePacket(destinationName, subscription, messageIdentity, false); 086 if( !TransactionManager.isCurrentTransaction() ) { 087 if( debug ) 088 log.debug("Journalled acknowledge: "+messageIdentity.getMessageID()+" at "+location); 089 acknowledge(subscription, messageIdentity, location); 090 } else { 091 if( debug ) 092 log.debug("Journalled in flight acknowledge: "+messageIdentity.getMessageID()+" at "+location); 093 094 synchronized (this) { 095 inFlightTxLocations.add(location); 096 } 097 final Transaction tx = TransactionManager.getContexTransaction(); 098 JournalAck ack = new JournalAck(destinationName,subscription,messageIdentity.getMessageID(), tx.getTransactionId()); 099 transactionStore.acknowledge(this, ack, location); 100 tx.addPostCommitTask(new TransactionTask(){ 101 public void execute() throws Throwable { 102 if( debug ) 103 log.debug("In flight acknowledge commit: "+messageIdentity.getMessageID()+" at "+location); 104 105 synchronized (JournalTopicMessageStore.this) { 106 inFlightTxLocations.remove(location); 107 acknowledge(subscription, messageIdentity, location); 108 } 109 } 110 }); 111 tx.addPostRollbackTask(new TransactionTask(){ 112 public void execute() throws Throwable { 113 if( debug ) 114 log.debug("In flight acknowledge rollback: "+messageIdentity.getMessageID()+" at "+location); 115 // TODO Auto-generated method stub 116 synchronized (JournalTopicMessageStore.this) { 117 inFlightTxLocations.remove(location); 118 } 119 } 120 }); 121 122 } 123 } 124 125 private void acknowledge(String subscription, MessageIdentity messageIdentity, RecordLocation location) { 126 synchronized(this) { 127 lastLocation = location; 128 ackedLastAckLocations.put(subscription,messageIdentity); 129 } 130 } 131 132 public RecordLocation checkpoint() throws JMSException { 133 134 // swap the acks before check pointing the added messages since we don't want to ack 135 // a message that has not been checkpointed yet. 136 final HashMap cpAckedLastAckLocations; 137 synchronized(this) { 138 cpAckedLastAckLocations = this.ackedLastAckLocations; 139 this.ackedLastAckLocations = new HashMap(); 140 } 141 142 // Check point the added messages. 143 RecordLocation rc = super.checkpoint(); 144 145 if( log.isDebugEnabled() ) { 146 log.debug("Checkpoint acknowledgments: "+cpAckedLastAckLocations); 147 } 148 149 transactionTemplate.run(new Callback() { 150 public void execute() throws Throwable { 151 152 // Checkpoint the acknowledged messages. 153 Iterator iterator = cpAckedLastAckLocations.keySet().iterator(); 154 while (iterator.hasNext()) { 155 String subscription = (String) iterator.next(); 156 MessageIdentity identity = (MessageIdentity) cpAckedLastAckLocations.get(subscription); 157 longTermStore.setLastAcknowledgedMessageIdentity(subscription, identity); 158 } 159 160 } 161 162 }); 163 164 return rc; 165 } 166 167 /** 168 * @return Returns the longTermStore. 169 */ 170 public TopicMessageStore getLongTermTopicMessageStore() { 171 return longTermStore; 172 } 173 174 public void deleteSubscription(String subscription) throws JMSException { 175 peristenceAdapter.checkpoint(true); 176 longTermStore.deleteSubscription(subscription); 177 } 178 179 public void replayAcknowledge(String subscription, MessageIdentity identity) { 180 try { 181 longTermStore.setLastAcknowledgedMessageIdentity(subscription,identity); 182 } 183 catch (Throwable e) { 184 log.debug("Could not replay acknowledge for message '"+identity.getMessageID()+"'. Message may have already been acknowledged. reason: " + e); 185 } 186 } 187 188 }