001    /** 
002     * 
003     * Copyright 2004 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.store.journal;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Iterator;
024    
025    import javax.jms.JMSException;
026    import javax.transaction.xa.XAException;
027    
028    import org.activeio.journal.RecordLocation;
029    import org.activemq.message.ActiveMQMessage;
030    import org.activemq.message.ActiveMQXid;
031    import org.activemq.message.MessageAck;
032    import org.activemq.store.TransactionStore;
033    import org.apache.derby.iapi.store.raw.xact.TransactionId;
034    
035    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
036    
037    /**
038     */
039    public class JournalTransactionStore implements TransactionStore {
040    
041        private final JournalPersistenceAdapter peristenceAdapter;
042        ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
043        ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
044        
045        public static class TxOperation {
046            
047            static final byte ADD_OPERATION_TYPE       = 0;
048            static final byte REMOVE_OPERATION_TYPE    = 1;
049            static final byte ACK_OPERATION_TYPE       = 3;
050            
051            public byte operationType;
052            public JournalMessageStore store;
053            public Object data;
054            
055            public TxOperation(byte operationType, JournalMessageStore store, Object data) {
056                this.operationType=operationType;
057                this.store=store;
058                this.data=data;
059            }
060            
061        }
062        /**
063         * Operations
064         * @version $Revision: 1.3 $
065         */
066        public static class Tx {
067    
068            private final RecordLocation location;
069            private ArrayList operations = new ArrayList();
070    
071            public Tx(RecordLocation location) {
072                this.location=location;
073            }
074    
075            public void add(JournalMessageStore store, ActiveMQMessage msg) {
076                operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
077            }
078    
079            public void add(JournalMessageStore store, MessageAck ack) {
080                operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
081            }
082    
083            public void add(JournalTopicMessageStore store, JournalAck ack) {
084                operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
085            }
086            
087            public ActiveMQMessage[] getMessages() {
088                ArrayList list = new ArrayList();
089                for (Iterator iter = operations.iterator(); iter.hasNext();) {
090                    TxOperation op = (TxOperation) iter.next();
091                    if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
092                        list.add(op.data);
093                    }
094                }
095                ActiveMQMessage rc[] = new ActiveMQMessage[list.size()];
096                list.toArray(rc);
097                return rc;
098            }
099    
100            public MessageAck[] getAcks() {
101                ArrayList list = new ArrayList();
102                for (Iterator iter = operations.iterator(); iter.hasNext();) {
103                    TxOperation op = (TxOperation) iter.next();
104                    if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
105                        list.add(op.data);
106                    }
107                }
108                MessageAck rc[] = new MessageAck[list.size()];
109                list.toArray(rc);
110                return rc;
111            }
112    
113            public ArrayList getOperations() {
114                return operations;
115            }
116    
117        }
118    
119        public interface AddMessageCommand {
120            ActiveMQMessage getMessage();
121    
122            void run() throws IOException;
123        }
124    
125        public interface RemoveMessageCommand {
126            MessageAck getMessageAck();
127    
128            void run() throws IOException;
129        }
130    
131        public JournalTransactionStore(JournalPersistenceAdapter adapter) {
132            this.peristenceAdapter = adapter;
133        }
134    
135        /**
136         * @throws XAException 
137         * @throws IOException
138         * @see org.activemq.store.TransactionStore#prepare(TransactionId)
139         */
140        public void prepare(Object txid) throws XAException {
141            Tx tx = (Tx) inflightTransactions.remove(txid);
142            if (tx == null)
143                return;
144            peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_PREPARE, txid, false), true);
145            preparedTransactions.put(txid, tx);
146        }
147        
148        /**
149         * @throws IOException
150         * @see org.activemq.store.TransactionStore#prepare(TransactionId)
151         */
152        public void replayPrepare(Object txid) throws IOException {
153            Tx tx = (Tx) inflightTransactions.remove(txid);
154            if (tx == null)
155                return;
156            preparedTransactions.put(txid, tx);
157        }
158    
159        public Tx getTx(Object txid, RecordLocation location) {
160            Tx tx = (Tx) inflightTransactions.get(txid);
161            if (tx == null) {
162                tx = new Tx(location);
163                inflightTransactions.put(txid, tx);
164            }
165            return tx;
166        }
167    
168        /**
169         * @throws XAException 
170         * @throws XAException
171         * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
172         */
173        public void commit(Object txid, boolean wasPrepared) throws XAException  {
174            Tx tx;
175            if (wasPrepared) {
176                tx = (Tx) preparedTransactions.remove(txid);
177            } else {
178                tx = (Tx) inflightTransactions.remove(txid);
179            }
180    
181            if (tx == null)
182                return;
183    
184            if (txid.getClass() == ActiveMQXid.class ) {
185                peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_COMMIT, txid, wasPrepared),
186                        true);
187            } else {
188                peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.LOCAL_COMMIT, txid, wasPrepared),
189                        true);
190            }
191        }
192    
193        /**
194         * @throws XAException
195         * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
196         */
197        public Tx replayCommit(Object txid, boolean wasPrepared) throws IOException {
198            if (wasPrepared) {
199                return (Tx) preparedTransactions.remove(txid);
200            } else {
201                return (Tx) inflightTransactions.remove(txid);
202            }
203        }
204    
205        /**
206         * @throws XAException 
207         * @throws IOException
208         * @see org.activemq.store.TransactionStore#rollback(TransactionId)
209         */
210        public void rollback(Object txid) throws XAException {
211    
212            Tx tx = (Tx) inflightTransactions.remove(txid);
213            if (tx != null)
214                tx = (Tx) preparedTransactions.remove(txid);
215    
216            if (tx != null) {
217                if (txid.getClass() == ActiveMQXid.class ) {
218                    peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_ROLLBACK, txid, false),
219                            true);
220                } else {
221                    peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.LOCAL_ROLLBACK, txid, false),
222                            true);
223                }
224            }
225    
226        }
227    
228        /**
229         * @throws IOException
230         * @see org.activemq.store.TransactionStore#rollback(TransactionId)
231         */
232        public void replayRollback(Object txid) throws IOException {
233            Tx tx = (Tx) inflightTransactions.remove(txid);
234            if (tx != null)
235                tx = (Tx) preparedTransactions.remove(txid);
236        }
237            
238        synchronized public void recover(RecoveryListener listener) throws XAException {
239            // All the inflight transactions get rolled back..
240            inflightTransactions.clear();
241            try {
242                for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
243                    Object txid = (Object) iter.next();
244                    Tx tx = (Tx) preparedTransactions.get(txid);
245                    try {
246                        listener.recover((ActiveMQXid) txid,tx.getMessages(), tx.getAcks());
247                    } catch (JMSException e) {
248                        throw (XAException)new XAException().initCause(e);
249                    }
250                }
251            } finally {
252            }
253        }
254    
255        /**
256         * @param message
257         * @throws IOException
258         */
259        void addMessage(JournalMessageStore store, ActiveMQMessage message, RecordLocation location) {
260            Tx tx = getTx(message.getTransactionId(), location);
261            tx.add(store, message);
262        }
263    
264        /**
265         * @param ack
266         * @throws IOException
267         */
268        public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) {
269            Tx tx = getTx(ack.getTransactionId(), location);
270            tx.add(store, ack);
271        }
272        
273        
274        public void acknowledge(JournalTopicMessageStore store, JournalAck ack, RecordLocation location) {
275            Tx tx = getTx(ack.getTransactionId(), location);
276            tx.add(store, ack);
277        }
278    
279    
280        public RecordLocation checkpoint() throws IOException {
281            
282            // Nothing really to checkpoint.. since, we don't
283            // checkpoint tx operations in to long term store until they are committed.
284    
285            // But we keep track of the first location of an operation
286            // that was associated with an active tx. The journal can not
287            // roll over active tx records.        
288            RecordLocation rc = null;
289            for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
290                Tx tx = (Tx) iter.next();
291                RecordLocation location = tx.location;
292                if (rc == null || rc.compareTo(location) < 0) {
293                    rc = location;
294                }
295            }
296            for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
297                Tx tx = (Tx) iter.next();
298                RecordLocation location = tx.location;
299                if (rc == null || rc.compareTo(location) < 0) {
300                    rc = location;
301                }
302            }
303            return rc;
304        }
305    
306        public void start() throws JMSException {
307        }
308    
309        public void stop() throws JMSException {
310        }
311    
312    
313    }