001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.store.vm;
019    
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    
023    import javax.jms.JMSException;
024    import javax.transaction.xa.XAException;
025    
026    import org.activemq.message.ActiveMQMessage;
027    import org.activemq.message.ActiveMQXid;
028    import org.activemq.message.MessageAck;
029    import org.activemq.store.MessageStore;
030    import org.activemq.store.ProxyMessageStore;
031    import org.activemq.store.ProxyTopicMessageStore;
032    import org.activemq.store.TopicMessageStore;
033    import org.activemq.store.TransactionStore;
034    
035    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
036    
037    /**
038     * Provides a TransactionStore implementation that can create transaction aware
039     * MessageStore objects from non transaction aware MessageStore objects.
040     * 
041     * @version $Revision: 1.1.1.1 $
042     */
043    public class VMTransactionStore implements TransactionStore {
044    
045        ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
046    
047        ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
048    
049        private boolean doingRecover;
050    
051        public static class Tx {
052            private ArrayList messages = new ArrayList();
053    
054            private ArrayList acks = new ArrayList();
055    
056            public void add(AddMessageCommand msg) {
057                messages.add(msg);
058            }
059    
060            public void add(RemoveMessageCommand ack) {
061                acks.add(ack);
062            }
063    
064            public ActiveMQMessage[] getMessages() {
065                ActiveMQMessage rc[] = new ActiveMQMessage[messages.size()];
066                int count=0;
067                for (Iterator iter = messages.iterator(); iter.hasNext();) {
068                    AddMessageCommand cmd = (AddMessageCommand) iter.next();
069                    rc[count++] = cmd.getMessage(); 
070                }
071                return rc;
072            }
073    
074            public MessageAck[] getAcks() {
075                MessageAck rc[] = new MessageAck[acks.size()];
076                int count=0;
077                for (Iterator iter = acks.iterator(); iter.hasNext();) {
078                    RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
079                    rc[count++] = cmd.getMessageAck(); 
080                }
081                return rc;
082            }
083    
084            /**
085             * @throws JMSException
086             */
087            public void commit() throws XAException {
088                try {
089                        // Do all the message adds.            
090                        for (Iterator iter = messages.iterator(); iter.hasNext();) {
091                            AddMessageCommand cmd = (AddMessageCommand) iter.next();
092                            cmd.run();
093                        }
094                        // And removes..
095                        for (Iterator iter = acks.iterator(); iter.hasNext();) {
096                            RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
097                            cmd.run();
098                        }
099                } catch ( JMSException e) {
100                    throw (XAException)new XAException(XAException.XAER_RMFAIL).initCause(e);
101                }
102            }
103        }
104    
105        public interface AddMessageCommand {
106            ActiveMQMessage getMessage();
107            void run() throws JMSException;
108        }
109    
110        public interface RemoveMessageCommand {
111            MessageAck getMessageAck();
112            void run() throws JMSException;
113        }
114    
115        public MessageStore proxy(MessageStore messageStore) {
116                return new ProxyMessageStore(messageStore) {
117                    public void addMessage(final ActiveMQMessage message) throws JMSException {
118                        VMTransactionStore.this.addMessage(getDelegate(), message);
119                    }
120            
121                    public void removeMessage(final MessageAck ack) throws JMSException {
122                        VMTransactionStore.this.removeMessage(getDelegate(), ack);
123                    }
124                };
125        }
126    
127        public TopicMessageStore proxy(TopicMessageStore messageStore) {
128                return new ProxyTopicMessageStore(messageStore) {
129                    public void addMessage(final ActiveMQMessage message) throws JMSException {
130                        VMTransactionStore.this.addMessage(getDelegate(), message);
131                    }
132                    public void removeMessage(final MessageAck ack) throws JMSException {
133                        VMTransactionStore.this.removeMessage(getDelegate(), ack);
134                    }
135                };
136        }
137    
138        /**
139         * @see org.activemq.store.TransactionStore#prepare(org.activemq.service.Transaction)
140         */
141        public void prepare(Object txid) {
142            Tx tx = (Tx) inflightTransactions.remove(txid);
143            if (tx == null)
144                return;
145            preparedTransactions.put(txid, tx);
146        }
147    
148        public Tx getTx(Object txid) {
149            Tx tx = (Tx) inflightTransactions.get(txid);
150            if (tx == null) {
151                tx = new Tx();
152                inflightTransactions.put(txid, tx);
153            }
154            return tx;
155        }
156    
157        /**
158         * @throws XAException
159         * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
160         */
161        public void commit(Object txid, boolean wasPrepared) throws XAException {
162            
163            Tx tx;
164            if( wasPrepared ) {
165                tx = (Tx) preparedTransactions.remove(txid);
166            } else {
167                tx = (Tx) inflightTransactions.remove(txid);
168            }
169            
170            if( tx == null )
171                return;
172            tx.commit();
173            
174        }
175    
176        /**
177         * @see org.activemq.store.TransactionStore#rollback(org.activemq.service.Transaction)
178         */
179        public void rollback(Object txid) {
180            preparedTransactions.remove(txid);
181            inflightTransactions.remove(txid);
182        }
183    
184        public void start() throws JMSException {
185        }
186    
187        public void stop() throws JMSException {
188        }
189    
190        synchronized public void recover(RecoveryListener listener) throws XAException {
191    
192            // All the inflight transactions get rolled back..
193            inflightTransactions.clear();        
194            this.doingRecover = true;
195            try {
196                    for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
197                        Object txid = (Object) iter.next();
198                        try {
199                            Tx tx = (Tx) preparedTransactions.get(txid);
200                            listener.recover((ActiveMQXid) txid, tx.getMessages(), tx.getAcks());
201                        } catch (JMSException e) {
202                            throw (XAException) new XAException("Recovery of a transaction failed:").initCause(e);
203                        }
204                    }
205            } finally {
206                this.doingRecover = false;
207            }
208        }
209    
210        /**
211         * @param message
212         * @throws JMSException
213         */
214        void addMessage(final MessageStore destination, final ActiveMQMessage message) throws JMSException {
215            
216            if( doingRecover )
217                return;
218            
219            if (message.isPartOfTransaction()) {
220                Tx tx = getTx(message.getTransactionId());
221                tx.add(new AddMessageCommand() {
222                    public ActiveMQMessage getMessage() {
223                        return message;
224                    }
225                    public void run() throws JMSException {
226                        destination.addMessage(message);
227                    }
228                });
229            } else {
230                destination.addMessage(message);
231            }
232        }
233        
234        /**
235         * @param ack
236         * @throws JMSException
237         */
238        private void removeMessage(final MessageStore destination,final MessageAck ack) throws JMSException {
239            if( doingRecover )
240                return;
241    
242            if (ack.isPartOfTransaction()) {
243                Tx tx = getTx(ack.getTransactionId());
244                tx.add(new RemoveMessageCommand() {
245                    public MessageAck getMessageAck() {
246                        return ack;
247                    }
248                    public void run() throws JMSException {
249                        destination.removeMessage(ack);
250                    }
251                });
252            } else {
253                destination.removeMessage(ack);
254            }
255        }
256        
257    }