001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Map;
024    
025    import javax.jms.JMSException;
026    import javax.transaction.xa.XAException;
027    
028    import org.activemq.broker.Broker;
029    import org.activemq.broker.BrokerClient;
030    import org.activemq.message.ActiveMQMessage;
031    import org.activemq.message.ActiveMQXid;
032    import org.activemq.message.MessageAck;
033    import org.activemq.service.Transaction;
034    import org.activemq.service.TransactionManager;
035    import org.activemq.store.TransactionStore;
036    import org.activemq.store.TransactionStore.RecoveryListener;
037    import org.activemq.util.JMSExceptionHelper;
038    import org.apache.commons.logging.Log;
039    import org.apache.commons.logging.LogFactory;
040    
041    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
042    
043    /**
044     * @version $Revision: 1.1.1.1 $
045     */
046    public class TransactionManagerImpl extends TransactionManager {
047        private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
048    
049        // the broker on which transactions operate
050        private Broker broker;
051        // The prepared XA transactions.
052        private TransactionStore transactionStore;
053        // Maps clients to the txids that they created.
054        private Map activeClients = new ConcurrentHashMap();
055        // Maps txids to ActiveMQTransactions
056        private Map localTxs = new ConcurrentHashMap();
057        // Maps txids to ActiveMQTransactions
058        private Map xaTxs = new ConcurrentHashMap();
059    
060        public TransactionManagerImpl(Broker broker, TransactionStore transactionStore) {
061            this.transactionStore = transactionStore;
062            this.broker = broker;
063        }
064    
065        /**
066         * @see org.activemq.service.TransactionManager#createLocalTransaction(org.activemq.broker.BrokerClient, String)
067         */
068        public Transaction createLocalTransaction(final BrokerClient client, final String txid) throws JMSException {
069            AbstractTransaction t = new LocalTransactionCommand(localTxs, txid, transactionStore);
070            localTxs.put(txid, t);
071            return t;
072        }
073    
074        /**
075         * @see org.activemq.service.TransactionManager#createXATransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
076         */
077        public Transaction createXATransaction(final BrokerClient client, final ActiveMQXid xid) throws XAException {
078            
079            // The xa transaction may allready be running.
080            Transaction tx = (Transaction) xaTxs.get(xid);
081            if( tx == null ) {
082                    if(log.isDebugEnabled())
083                            log.debug("XA Transaction started: "+xid);
084                tx = new XATransactionCommand(xid, xaTxs, transactionStore);
085                xaTxs.put(xid, tx);
086            }               
087            return tx;
088            
089        }
090    
091        /**
092         * @see org.activemq.service.TransactionManager#getLocalTransaction(String)
093         */
094        public Transaction getLocalTransaction(String txid) throws JMSException {
095            Transaction tx = (Transaction) localTxs.get(txid);
096            if (tx == null) {
097                throw new JMSException("Transaction '" + txid
098                        + "' has not been started.");
099            }
100            return tx;
101        }
102    
103        /**
104         * @see org.activemq.service.TransactionManager#getXATransaction(org.activemq.message.ActiveMQXid)
105         */
106        public Transaction getXATransaction(ActiveMQXid xid) throws XAException {
107            Transaction tx = (Transaction) xaTxs.get(xid);
108            if (tx == null) {
109                XAException e = new XAException("Transaction '" + xid + "' has not been started.");
110                e.errorCode = XAException.XAER_NOTA;
111                throw e;
112            }
113            return tx;
114        }
115    
116        /**
117         * @see org.activemq.service.TransactionManager#getPreparedXATransactions()
118         */
119        public ActiveMQXid[] getPreparedXATransactions() throws XAException {
120            ArrayList txs = new ArrayList(xaTxs.size());
121            for (Iterator iter = xaTxs.keySet().iterator(); iter.hasNext();) {
122                ActiveMQXid tx = (ActiveMQXid) iter.next();
123                txs.add(tx);
124            }
125            ActiveMQXid rc[] = new ActiveMQXid[txs.size()];
126            txs.toArray(rc);
127            return rc;
128        }
129    
130    
131        /**
132         * @see org.activemq.service.TransactionManager#cleanUpClient(org.activemq.broker.BrokerClient)
133         */
134        public void cleanUpClient(BrokerClient client) throws JMSException {
135            // HRC: I don't think we need to keep track of the client's open transactions here...
136            // It seems like BrokerClientImpl.close() allready cleans up open transactions.
137            //
138            List list = (List) activeClients.remove(client);
139            if (list != null) {
140                for (int i = 0; i < list.size(); i++) {
141                    try {
142                        Object o = list.get(i);
143                        if (o instanceof String) {
144                            Transaction t = this.getLocalTransaction((String) o);
145                            t.rollback();
146                        }
147                        else {
148                            Transaction t = this.getXATransaction((ActiveMQXid) o);
149                            t.rollback();
150                        }
151                    }
152                    catch (Exception e) {
153                        log.warn("ERROR Rolling back disconnected client's transactions: ", e);
154                    }
155                }
156                list.clear();
157            }
158        }
159    
160        /**
161         * @see org.activemq.service.TransactionManager#recover(org.activemq.service.Transaction)
162         */
163        public void recover(Transaction transaction) {
164            // first lets associate any transient data structurs with the
165            // transaction which has recently been loaded from disk
166            if (transaction instanceof XATransactionCommand) {
167                XATransactionCommand xaTransaction = (XATransactionCommand) transaction;
168                xaTransaction.initialise(xaTxs, transactionStore);
169                xaTxs.put(transaction.getTransactionId(), transaction);
170            }
171        }
172    
173        public void start() throws JMSException {
174            transactionStore.start();
175            try {
176                transactionStore.recover(new RecoveryListener(){
177                    public void recover(ActiveMQXid xid, ActiveMQMessage[] addedMessages, MessageAck[] aks) throws JMSException, XAException {
178                        Transaction transaction = createXATransaction(null, xid);                    
179                        for (int i = 0; i < addedMessages.length; i++) {
180                            broker.sendMessage(null, addedMessages[i]);
181                        }
182                        for (int i = 0; i < aks.length; i++) {
183                            broker.acknowledgeMessage(null, aks[i]);                    
184                        }
185                        transaction.prepare();
186                    }
187                });
188            } catch (XAException e) {
189                throw JMSExceptionHelper.newJMSException("Recovery Failed: "+e.getMessage(), e);
190            }
191        }
192    
193        public void stop() throws JMSException {
194            transactionStore.stop();
195        }
196    
197    }