001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import java.util.Map;
021    
022    import javax.transaction.xa.XAException;
023    import javax.transaction.xa.XAResource;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.activemq.message.ActiveMQXid;
028    import org.activemq.store.TransactionStore;
029    
030    /**
031     * @version $Revision: 1.1.1.1 $
032     */
033    public class XATransactionCommand extends AbstractTransaction {
034        private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
035    
036        private ActiveMQXid xid;
037        private transient Map xaTxs;
038        private transient TransactionStore transactionStore;
039    
040        public XATransactionCommand(ActiveMQXid xid, Map xaTxs, TransactionStore transactionStore) {
041            this.xid = xid;
042            this.xaTxs = xaTxs;
043            this.transactionStore = transactionStore;
044        }
045    
046    
047        /**
048         * Called after the transaction command has been recovered from disk
049         *
050         * @param xaTxs
051         * @param preparedTransactions
052         */
053        public void initialise(Map xaTxs, TransactionStore preparedTransactions) {
054            this.xaTxs = xaTxs;
055            this.transactionStore = preparedTransactions;
056        }
057    
058        public void commit(boolean onePhase) throws XAException {
059            if(log.isDebugEnabled())
060                    log.debug("XA Transaction commit: "+xid);
061    
062            switch (getState()) {
063                case START_STATE:
064                    // 1 phase commit, no work done.
065                    checkForPreparedState(onePhase);
066                    setStateFinished();
067                    break;
068                case IN_USE_STATE:
069                    // 1 phase commit, work done.
070                    checkForPreparedState(onePhase);
071                    doPrePrepare();
072                    setStateFinished();
073                    transactionStore.commit(getTransactionId(), false);
074                    doPostCommit();
075                    break;
076                case PREPARED_STATE:
077                    // 2 phase commit, work done.
078                    // We would record commit here.
079                    setStateFinished();
080                    transactionStore.commit(getTransactionId(), true);
081                    doPostCommit();
082                    break;
083                default:
084                    illegalStateTransition("commit");
085            }
086        }
087    
088        private void illegalStateTransition(String callName) throws XAException {
089            XAException xae = new XAException("Cannot call " + callName + " now.");
090            xae.errorCode = XAException.XAER_PROTO;
091            throw xae;
092        }
093    
094        private void checkForPreparedState(boolean onePhase) throws XAException {
095            if (!onePhase) {
096                XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
097                xae.errorCode = XAException.XAER_PROTO;
098                throw xae;
099            }
100        }
101    
102        private void doPrePrepare() throws XAException {
103            try {
104                prePrepare();
105            } catch (XAException e) {
106                throw e;
107            } catch (Throwable e) {
108                log.warn("PRE-PREPARE FAILED: ", e);
109                rollback();
110                XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back.");
111                xae.errorCode = XAException.XA_RBOTHER;
112                xae.initCause(e);
113                throw xae;
114            }
115        }
116    
117        private void doPostCommit() throws XAException {
118            try {
119                postCommit();
120            }
121            catch (Throwable e) {
122                // I guess this could happen.  Post commit task failed
123                // to execute properly.
124                log.warn("POST COMMIT FAILED: ", e);
125                XAException xae = new XAException("POST COMMIT FAILED");
126                xae.errorCode = XAException.XAER_RMERR;
127                xae.initCause(e);
128                throw xae;
129            }
130        }
131    
132        public void rollback() throws XAException {
133            
134            if(log.isDebugEnabled())
135                    log.debug("XA Transaction rollback: "+xid);
136    
137            switch (getState()) {
138                case START_STATE:
139                    // 1 phase rollback no work done.
140                    setStateFinished();
141                    break;
142                case IN_USE_STATE:
143                    // 1 phase rollback work done.
144                    setStateFinished();
145                    transactionStore.rollback(getTransactionId());
146                    doPostRollback();
147                    break;
148                case PREPARED_STATE:
149                    // 2 phase rollback work done.
150                    setStateFinished();
151                    transactionStore.rollback(getTransactionId());
152                    doPostRollback();
153                    break;
154            }
155    
156        }
157    
158        private void doPostRollback() throws XAException {
159            try {
160                postRollback();
161            }
162            catch (Throwable e) {
163                // I guess this could happen.  Post commit task failed
164                // to execute properly.
165                log.warn("POST ROLLBACK FAILED: ", e);
166                XAException xae = new XAException("POST ROLLBACK FAILED");
167                xae.errorCode = XAException.XAER_RMERR;
168                xae.initCause(e);
169                throw xae;
170            }
171        }
172    
173        public int prepare() throws XAException {
174            if(log.isDebugEnabled())
175                    log.debug("XA Transaction prepare: "+xid);
176            
177            switch (getState()) {
178                case START_STATE:
179                    // No work done.. no commit/rollback needed.
180                    setStateFinished();
181                    return XAResource.XA_RDONLY;
182                case IN_USE_STATE:
183                    // We would record prepare here.
184                    doPrePrepare();
185                    setState(AbstractTransaction.PREPARED_STATE);
186                    transactionStore.prepare(getTransactionId());
187                    return XAResource.XA_OK;
188                default :
189                    illegalStateTransition("prepare");
190                    return XAResource.XA_RDONLY;
191            }
192        }
193    
194        private void setStateFinished() {
195            setState(AbstractTransaction.FINISHED_STATE);
196            xaTxs.remove(xid);
197        }
198    
199        public boolean isXaTransacted() {
200            return true;
201        }
202    
203        public Object getTransactionId() {
204            return xid;
205        }
206    }