001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import java.util.ArrayList; 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.Map; 024 025 import javax.jms.JMSException; 026 import javax.transaction.xa.XAException; 027 028 import org.activemq.broker.Broker; 029 import org.activemq.broker.BrokerClient; 030 import org.activemq.message.ActiveMQMessage; 031 import org.activemq.message.ActiveMQXid; 032 import org.activemq.message.MessageAck; 033 import org.activemq.service.Transaction; 034 import org.activemq.service.TransactionManager; 035 import org.activemq.store.TransactionStore; 036 import org.activemq.store.TransactionStore.RecoveryListener; 037 import org.activemq.util.JMSExceptionHelper; 038 import org.apache.commons.logging.Log; 039 import org.apache.commons.logging.LogFactory; 040 041 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 042 043 /** 044 * @version $Revision: 1.1.1.1 $ 045 */ 046 public class TransactionManagerImpl extends TransactionManager { 047 private static final Log log = LogFactory.getLog(TransactionManagerImpl.class); 048 049 // the broker on which transactions operate 050 private Broker broker; 051 // The prepared XA transactions. 052 private TransactionStore transactionStore; 053 // Maps clients to the txids that they created. 054 private Map activeClients = new ConcurrentHashMap(); 055 // Maps txids to ActiveMQTransactions 056 private Map localTxs = new ConcurrentHashMap(); 057 // Maps txids to ActiveMQTransactions 058 private Map xaTxs = new ConcurrentHashMap(); 059 060 public TransactionManagerImpl(Broker broker, TransactionStore transactionStore) { 061 this.transactionStore = transactionStore; 062 this.broker = broker; 063 } 064 065 /** 066 * @see org.activemq.service.TransactionManager#createLocalTransaction(org.activemq.broker.BrokerClient, String) 067 */ 068 public Transaction createLocalTransaction(final BrokerClient client, final String txid) throws JMSException { 069 AbstractTransaction t = new LocalTransactionCommand(localTxs, txid, transactionStore); 070 localTxs.put(txid, t); 071 return t; 072 } 073 074 /** 075 * @see org.activemq.service.TransactionManager#createXATransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 076 */ 077 public Transaction createXATransaction(final BrokerClient client, final ActiveMQXid xid) throws XAException { 078 079 // The xa transaction may allready be running. 080 Transaction tx = (Transaction) xaTxs.get(xid); 081 if( tx == null ) { 082 if(log.isDebugEnabled()) 083 log.debug("XA Transaction started: "+xid); 084 tx = new XATransactionCommand(xid, xaTxs, transactionStore); 085 xaTxs.put(xid, tx); 086 } 087 return tx; 088 089 } 090 091 /** 092 * @see org.activemq.service.TransactionManager#getLocalTransaction(String) 093 */ 094 public Transaction getLocalTransaction(String txid) throws JMSException { 095 Transaction tx = (Transaction) localTxs.get(txid); 096 if (tx == null) { 097 throw new JMSException("Transaction '" + txid 098 + "' has not been started."); 099 } 100 return tx; 101 } 102 103 /** 104 * @see org.activemq.service.TransactionManager#getXATransaction(org.activemq.message.ActiveMQXid) 105 */ 106 public Transaction getXATransaction(ActiveMQXid xid) throws XAException { 107 Transaction tx = (Transaction) xaTxs.get(xid); 108 if (tx == null) { 109 XAException e = new XAException("Transaction '" + xid + "' has not been started."); 110 e.errorCode = XAException.XAER_NOTA; 111 throw e; 112 } 113 return tx; 114 } 115 116 /** 117 * @see org.activemq.service.TransactionManager#getPreparedXATransactions() 118 */ 119 public ActiveMQXid[] getPreparedXATransactions() throws XAException { 120 ArrayList txs = new ArrayList(xaTxs.size()); 121 for (Iterator iter = xaTxs.keySet().iterator(); iter.hasNext();) { 122 ActiveMQXid tx = (ActiveMQXid) iter.next(); 123 txs.add(tx); 124 } 125 ActiveMQXid rc[] = new ActiveMQXid[txs.size()]; 126 txs.toArray(rc); 127 return rc; 128 } 129 130 131 /** 132 * @see org.activemq.service.TransactionManager#cleanUpClient(org.activemq.broker.BrokerClient) 133 */ 134 public void cleanUpClient(BrokerClient client) throws JMSException { 135 // HRC: I don't think we need to keep track of the client's open transactions here... 136 // It seems like BrokerClientImpl.close() allready cleans up open transactions. 137 // 138 List list = (List) activeClients.remove(client); 139 if (list != null) { 140 for (int i = 0; i < list.size(); i++) { 141 try { 142 Object o = list.get(i); 143 if (o instanceof String) { 144 Transaction t = this.getLocalTransaction((String) o); 145 t.rollback(); 146 } 147 else { 148 Transaction t = this.getXATransaction((ActiveMQXid) o); 149 t.rollback(); 150 } 151 } 152 catch (Exception e) { 153 log.warn("ERROR Rolling back disconnected client's transactions: ", e); 154 } 155 } 156 list.clear(); 157 } 158 } 159 160 /** 161 * @see org.activemq.service.TransactionManager#recover(org.activemq.service.Transaction) 162 */ 163 public void recover(Transaction transaction) { 164 // first lets associate any transient data structurs with the 165 // transaction which has recently been loaded from disk 166 if (transaction instanceof XATransactionCommand) { 167 XATransactionCommand xaTransaction = (XATransactionCommand) transaction; 168 xaTransaction.initialise(xaTxs, transactionStore); 169 xaTxs.put(transaction.getTransactionId(), transaction); 170 } 171 } 172 173 public void start() throws JMSException { 174 transactionStore.start(); 175 try { 176 transactionStore.recover(new RecoveryListener(){ 177 public void recover(ActiveMQXid xid, ActiveMQMessage[] addedMessages, MessageAck[] aks) throws JMSException, XAException { 178 Transaction transaction = createXATransaction(null, xid); 179 for (int i = 0; i < addedMessages.length; i++) { 180 broker.sendMessage(null, addedMessages[i]); 181 } 182 for (int i = 0; i < aks.length; i++) { 183 broker.acknowledgeMessage(null, aks[i]); 184 } 185 transaction.prepare(); 186 } 187 }); 188 } catch (XAException e) { 189 throw JMSExceptionHelper.newJMSException("Recovery Failed: "+e.getMessage(), e); 190 } 191 } 192 193 public void stop() throws JMSException { 194 transactionStore.stop(); 195 } 196 197 }