001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.store.vm; 019 020 import java.util.ArrayList; 021 import java.util.Iterator; 022 023 import javax.jms.JMSException; 024 import javax.transaction.xa.XAException; 025 026 import org.activemq.message.ActiveMQMessage; 027 import org.activemq.message.ActiveMQXid; 028 import org.activemq.message.MessageAck; 029 import org.activemq.store.MessageStore; 030 import org.activemq.store.ProxyMessageStore; 031 import org.activemq.store.ProxyTopicMessageStore; 032 import org.activemq.store.TopicMessageStore; 033 import org.activemq.store.TransactionStore; 034 035 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 036 037 /** 038 * Provides a TransactionStore implementation that can create transaction aware 039 * MessageStore objects from non transaction aware MessageStore objects. 040 * 041 * @version $Revision: 1.1.1.1 $ 042 */ 043 public class VMTransactionStore implements TransactionStore { 044 045 ConcurrentHashMap inflightTransactions = new ConcurrentHashMap(); 046 047 ConcurrentHashMap preparedTransactions = new ConcurrentHashMap(); 048 049 private boolean doingRecover; 050 051 public static class Tx { 052 private ArrayList messages = new ArrayList(); 053 054 private ArrayList acks = new ArrayList(); 055 056 public void add(AddMessageCommand msg) { 057 messages.add(msg); 058 } 059 060 public void add(RemoveMessageCommand ack) { 061 acks.add(ack); 062 } 063 064 public ActiveMQMessage[] getMessages() { 065 ActiveMQMessage rc[] = new ActiveMQMessage[messages.size()]; 066 int count=0; 067 for (Iterator iter = messages.iterator(); iter.hasNext();) { 068 AddMessageCommand cmd = (AddMessageCommand) iter.next(); 069 rc[count++] = cmd.getMessage(); 070 } 071 return rc; 072 } 073 074 public MessageAck[] getAcks() { 075 MessageAck rc[] = new MessageAck[acks.size()]; 076 int count=0; 077 for (Iterator iter = acks.iterator(); iter.hasNext();) { 078 RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next(); 079 rc[count++] = cmd.getMessageAck(); 080 } 081 return rc; 082 } 083 084 /** 085 * @throws JMSException 086 */ 087 public void commit() throws XAException { 088 try { 089 // Do all the message adds. 090 for (Iterator iter = messages.iterator(); iter.hasNext();) { 091 AddMessageCommand cmd = (AddMessageCommand) iter.next(); 092 cmd.run(); 093 } 094 // And removes.. 095 for (Iterator iter = acks.iterator(); iter.hasNext();) { 096 RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next(); 097 cmd.run(); 098 } 099 } catch ( JMSException e) { 100 throw (XAException)new XAException(XAException.XAER_RMFAIL).initCause(e); 101 } 102 } 103 } 104 105 public interface AddMessageCommand { 106 ActiveMQMessage getMessage(); 107 void run() throws JMSException; 108 } 109 110 public interface RemoveMessageCommand { 111 MessageAck getMessageAck(); 112 void run() throws JMSException; 113 } 114 115 public MessageStore proxy(MessageStore messageStore) { 116 return new ProxyMessageStore(messageStore) { 117 public void addMessage(final ActiveMQMessage message) throws JMSException { 118 VMTransactionStore.this.addMessage(getDelegate(), message); 119 } 120 121 public void removeMessage(final MessageAck ack) throws JMSException { 122 VMTransactionStore.this.removeMessage(getDelegate(), ack); 123 } 124 }; 125 } 126 127 public TopicMessageStore proxy(TopicMessageStore messageStore) { 128 return new ProxyTopicMessageStore(messageStore) { 129 public void addMessage(final ActiveMQMessage message) throws JMSException { 130 VMTransactionStore.this.addMessage(getDelegate(), message); 131 } 132 public void removeMessage(final MessageAck ack) throws JMSException { 133 VMTransactionStore.this.removeMessage(getDelegate(), ack); 134 } 135 }; 136 } 137 138 /** 139 * @see org.activemq.store.TransactionStore#prepare(org.activemq.service.Transaction) 140 */ 141 public void prepare(Object txid) { 142 Tx tx = (Tx) inflightTransactions.remove(txid); 143 if (tx == null) 144 return; 145 preparedTransactions.put(txid, tx); 146 } 147 148 public Tx getTx(Object txid) { 149 Tx tx = (Tx) inflightTransactions.get(txid); 150 if (tx == null) { 151 tx = new Tx(); 152 inflightTransactions.put(txid, tx); 153 } 154 return tx; 155 } 156 157 /** 158 * @throws XAException 159 * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction) 160 */ 161 public void commit(Object txid, boolean wasPrepared) throws XAException { 162 163 Tx tx; 164 if( wasPrepared ) { 165 tx = (Tx) preparedTransactions.remove(txid); 166 } else { 167 tx = (Tx) inflightTransactions.remove(txid); 168 } 169 170 if( tx == null ) 171 return; 172 tx.commit(); 173 174 } 175 176 /** 177 * @see org.activemq.store.TransactionStore#rollback(org.activemq.service.Transaction) 178 */ 179 public void rollback(Object txid) { 180 preparedTransactions.remove(txid); 181 inflightTransactions.remove(txid); 182 } 183 184 public void start() throws JMSException { 185 } 186 187 public void stop() throws JMSException { 188 } 189 190 synchronized public void recover(RecoveryListener listener) throws XAException { 191 192 // All the inflight transactions get rolled back.. 193 inflightTransactions.clear(); 194 this.doingRecover = true; 195 try { 196 for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { 197 Object txid = (Object) iter.next(); 198 try { 199 Tx tx = (Tx) preparedTransactions.get(txid); 200 listener.recover((ActiveMQXid) txid, tx.getMessages(), tx.getAcks()); 201 } catch (JMSException e) { 202 throw (XAException) new XAException("Recovery of a transaction failed:").initCause(e); 203 } 204 } 205 } finally { 206 this.doingRecover = false; 207 } 208 } 209 210 /** 211 * @param message 212 * @throws JMSException 213 */ 214 void addMessage(final MessageStore destination, final ActiveMQMessage message) throws JMSException { 215 216 if( doingRecover ) 217 return; 218 219 if (message.isPartOfTransaction()) { 220 Tx tx = getTx(message.getTransactionId()); 221 tx.add(new AddMessageCommand() { 222 public ActiveMQMessage getMessage() { 223 return message; 224 } 225 public void run() throws JMSException { 226 destination.addMessage(message); 227 } 228 }); 229 } else { 230 destination.addMessage(message); 231 } 232 } 233 234 /** 235 * @param ack 236 * @throws JMSException 237 */ 238 private void removeMessage(final MessageStore destination,final MessageAck ack) throws JMSException { 239 if( doingRecover ) 240 return; 241 242 if (ack.isPartOfTransaction()) { 243 Tx tx = getTx(ack.getTransactionId()); 244 tx.add(new RemoveMessageCommand() { 245 public MessageAck getMessageAck() { 246 return ack; 247 } 248 public void run() throws JMSException { 249 destination.removeMessage(ack); 250 } 251 }); 252 } else { 253 destination.removeMessage(ack); 254 } 255 } 256 257 }