001    /** 
002     * 
003     * Copyright 2004 Hiram Chirino
004     * Copyright 2004 Protique Ltd
005     * 
006     * Licensed under the Apache License, Version 2.0 (the "License"); 
007     * you may not use this file except in compliance with the License. 
008     * You may obtain a copy of the License at 
009     * 
010     * http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS, 
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
015     * See the License for the specific language governing permissions and 
016     * limitations under the License. 
017     * 
018     **/
019    package org.activemq.store.journal;
020    
021    import java.util.ArrayList;
022    import java.util.Collections;
023    import java.util.HashSet;
024    import java.util.Iterator;
025    import java.util.LinkedHashMap;
026    
027    import javax.jms.JMSException;
028    
029    import org.activeio.journal.RecordLocation;
030    import org.activemq.message.ActiveMQMessage;
031    import org.activemq.message.MessageAck;
032    import org.activemq.service.MessageIdentity;
033    import org.activemq.service.Transaction;
034    import org.activemq.service.TransactionManager;
035    import org.activemq.service.TransactionTask;
036    import org.activemq.store.MessageStore;
037    import org.activemq.store.RecoveryListener;
038    import org.activemq.store.cache.CacheMessageStore;
039    import org.activemq.store.cache.CacheMessageStoreAware;
040    import org.activemq.util.Callback;
041    import org.activemq.util.TransactionTemplate;
042    import org.apache.commons.logging.Log;
043    import org.apache.commons.logging.LogFactory;
044    
045    /**
046     * A MessageStore that uses a Journal to store it's messages.
047     * 
048     * @version $Revision: 1.1 $
049     */
050    public class JournalMessageStore implements MessageStore, CacheMessageStoreAware {
051    
052        private static final Log log = LogFactory.getLog(JournalMessageStore.class);
053        protected final JournalPersistenceAdapter peristenceAdapter;
054        protected final MessageStore longTermStore;
055        protected final String destinationName;
056        protected final TransactionTemplate transactionTemplate;
057    
058        private LinkedHashMap addedMessageIds = new LinkedHashMap();
059        private ArrayList removedMessageLocations = new ArrayList();
060        protected HashSet inFlightTxLocations = new HashSet();   
061        protected RecordLocation lastLocation;
062    
063        /** A MessageStore that we can use to retreive messages quickly. */
064        private MessageStore cacheMessageStore = this;
065    
066        protected final JournalTransactionStore transactionStore;
067    
068        private LinkedHashMap cpAddedMessageIds;
069        
070        int removedFromJournal;
071    
072        public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, String destinationName) {
073            this.peristenceAdapter = adapter;
074            this.transactionStore = this.peristenceAdapter.getTransactionStore();
075            this.longTermStore = checkpointStore;
076            this.destinationName = destinationName;
077            this.transactionTemplate = new TransactionTemplate(adapter);
078        }
079    
080        /**
081         * Not synchronized since the Journal has better throughput if you increase
082         * the number of conncurrent writes that it is doing.
083         */
084        public void addMessage(final ActiveMQMessage message) throws JMSException {
085            final boolean debug = log.isDebugEnabled();
086            final RecordLocation location = peristenceAdapter.writePacket(destinationName, message, message.isReceiptRequired());
087            if( !TransactionManager.isCurrentTransaction() ) {
088                if( debug ) 
089                    log.debug("Journalled message add: "+message.getJMSMessageID()+" at "+location);
090                 addMessage(message, location);
091            } else {
092                if( debug )
093                    log.debug("Journalled in flight message add: "+message.getJMSMessageID()+" at "+location);            
094                synchronized (this) {
095                    inFlightTxLocations.add(location);
096                }
097                final Transaction tx = TransactionManager.getContexTransaction(); 
098                transactionStore.addMessage(this, message, location);
099                tx.addPostCommitTask(new TransactionTask() {
100                    public void execute() throws Throwable {
101                        if( debug ) 
102                            log.debug("In flight message add commit: "+message.getJMSMessageID()+" at "+location);                        
103                        synchronized (JournalMessageStore.this) {
104                            inFlightTxLocations.remove(location);
105                            addMessage(message, location);
106                        }
107                    }
108                });
109                tx.addPostRollbackTask(new TransactionTask(){
110                    public void execute() throws Throwable {
111                        if( debug ) 
112                            log.debug("In flight message add rollback: "+message.getJMSMessageID()+" at "+location);                        
113                        // TODO Auto-generated method stub
114                        synchronized (JournalMessageStore.this) {
115                            inFlightTxLocations.remove(location);
116                        }
117                    }
118                });
119            }
120        }
121        
122        /**
123         * @param message
124         * @param location
125         */
126        private void addMessage(final ActiveMQMessage message, final RecordLocation location) {
127            synchronized (this) {
128                lastLocation=location;
129                MessageIdentity id = message.getJMSMessageIdentity();
130                addedMessageIds.put(id, location);
131            }
132        }
133    
134        /**
135         */
136        public void removeMessage(final MessageAck ack) throws JMSException {
137    
138            final boolean debug = log.isDebugEnabled();
139            final RecordLocation location = peristenceAdapter.writePacket(destinationName, ack, ack.isReceiptRequired());
140            if( !TransactionManager.isCurrentTransaction() ) {
141                if( debug ) 
142                    log.debug("Journalled message remove: "+ack.getMessageID()+" at "+location);            
143                removeMessage(ack, location);
144            } else {
145                if( debug ) 
146                    log.debug("Journalled in flight message remove: "+ack.getMessageID()+" at "+location);
147                
148                synchronized( this ) {
149                    inFlightTxLocations.add(location);
150                }
151                final Transaction tx = TransactionManager.getContexTransaction(); 
152                transactionStore.removeMessage(this, ack, location);
153                tx.addPostCommitTask(new TransactionTask(){
154                    public void execute() throws Throwable {
155                        if( debug ) 
156                            log.debug("In flight message remove commit: "+ack.getMessageID()+" at "+location);
157    
158                        synchronized (JournalMessageStore.this) {                        
159                            inFlightTxLocations.remove(location);
160                            removeMessage(ack, location);
161                        }
162                    }
163                });
164                tx.addPostRollbackTask(new TransactionTask(){
165                    public void execute() throws Throwable {
166                        // TODO Auto-generated method stub
167                        if( debug ) 
168                            log.debug("In flight message remove rollback: "+ack.getMessageID()+" at "+location);
169                        synchronized (JournalMessageStore.this) {
170                            inFlightTxLocations.remove(location);
171                        }
172                    }
173                });
174            }
175        }
176    
177        /**
178         * @param ack
179         * @param location
180         */
181        private void removeMessage(final MessageAck ack, final RecordLocation location) {
182            synchronized (this) {
183                lastLocation=location;
184                MessageIdentity id = ack.getMessageIdentity();
185                RecordLocation msgLocation = (RecordLocation) addedMessageIds.remove(id);
186                if (msgLocation == null) {
187                    removedMessageLocations.add(ack);
188                } else {
189                    removedFromJournal++;
190                }
191            }
192        }
193    
194        /**
195         * @return
196         * @throws JMSException
197         */
198        public RecordLocation checkpoint() throws JMSException {
199    
200            RecordLocation rc;
201            final ArrayList cpRemovedMessageLocations;
202            final ArrayList cpActiveJournalLocations;
203    
204            // swap out the message hash maps..
205            synchronized (this) {
206                cpAddedMessageIds = this.addedMessageIds;
207                cpRemovedMessageLocations = this.removedMessageLocations;
208    
209                this.inFlightTxLocations.removeAll(this.removedMessageLocations);
210                this.inFlightTxLocations.removeAll(this.addedMessageIds.values());            
211                cpActiveJournalLocations=new ArrayList(inFlightTxLocations);
212                
213                this.addedMessageIds = new LinkedHashMap();
214                this.removedMessageLocations = new ArrayList();            
215                log.debug("removedFromJournal="+removedFromJournal);
216                removedFromJournal=0;
217            }
218            
219            final boolean debug = log.isDebugEnabled();
220            if( debug ) 
221                log.debug("Checkpoint: "+destinationName);
222            
223            
224            final int messagesAdded[]=new int[]{0};
225            final int messagesRemoved[]=new int[]{0};
226    
227            transactionTemplate.run(new Callback() {
228                public void execute() throws Throwable {
229    
230                    // Checkpoint the added messages.
231                    Iterator iterator = cpAddedMessageIds.keySet().iterator();
232                    while (iterator.hasNext()) {
233                        MessageIdentity identity = (MessageIdentity) iterator.next();
234                        if( debug ) 
235                            log.debug("Adding: "+identity.getMessageID());
236                        ActiveMQMessage msg = getCacheMessage(identity);
237                        // Pull it out of the journal if we have to.
238                        if (msg == null) {
239                            RecordLocation location = (RecordLocation) cpAddedMessageIds.get(identity);
240                            msg = (ActiveMQMessage) peristenceAdapter.readPacket((RecordLocation) location);
241                        }
242                        if( msg != null ) {
243                            try {
244                                longTermStore.addMessage(msg);
245                                messagesAdded[0]++;
246                            } catch (Throwable e) {
247                                log.warn("Message could not be added to long term store: " + e.getMessage(), e);
248                            }
249                        } else {
250                            log.warn("Journal could not reload message: " + identity);                        
251                        }
252                    }
253    
254                    // Checkpoint the removed messages.
255                    iterator = cpRemovedMessageLocations.iterator();
256                    while (iterator.hasNext()) {
257                        try {
258                            MessageAck ack = (MessageAck) iterator.next();
259                            if( debug ) 
260                                log.debug("Removing: "+ack.getMessageID());
261                            longTermStore.removeMessage(ack);
262                            messagesRemoved[0]++;
263                        } catch (Throwable e) {
264                            log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
265                        }
266                    }
267                }
268    
269            });
270    
271            log.debug("Added "+messagesAdded[0]+" message(s) and removed "+messagesRemoved[0]+" message(s). removedFromJournal="+removedFromJournal);
272            synchronized (this) {
273                cpAddedMessageIds = null;
274            }
275            
276            Collections.sort(cpActiveJournalLocations);
277            if( debug ) 
278                log.debug("In flight journal locations: "+cpActiveJournalLocations);
279            
280            if( cpActiveJournalLocations.size() > 0 ) {
281                return (RecordLocation) cpActiveJournalLocations.get(0);
282            } else {
283                return lastLocation;
284            }
285        }
286    
287        private ActiveMQMessage getCacheMessage(MessageIdentity identity) throws JMSException {
288            return cacheMessageStore.getMessage(identity);
289        }
290    
291        /**
292         * 
293         */
294        public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
295            ActiveMQMessage answer = null;
296    
297            Object location;
298            synchronized (this) {
299                location = addedMessageIds.get(identity);
300                if( location==null && cpAddedMessageIds!=null )
301                    location = cpAddedMessageIds.get(identity);
302            }
303            
304            // Do we have a still have it in the journal?
305            if (location != null ) {
306                try {
307                    answer = (ActiveMQMessage)peristenceAdapter.readPacket((RecordLocation) location);
308                    if (answer != null)
309                        return answer;
310                } catch (Throwable e) {
311                    // We could have had an async checkpoint and thus we cannot read that location anymore,
312                    // but now the message should be in the long term store.
313                }
314            }
315    
316            // If all else fails try the long term message store.
317            return longTermStore.getMessage(identity);
318        }
319    
320        /**
321         * Replays the checkpointStore first as those messages are the oldest ones,
322         * then messages are replayed from the transaction log and then the cache is
323         * updated.
324         * 
325         * @param listener
326         * @throws JMSException
327         */
328        public void recover(final RecoveryListener listener) throws JMSException {
329            peristenceAdapter.checkpoint(true);
330            longTermStore.recover(listener);
331        }
332    
333        public void start() throws JMSException {
334            longTermStore.start();
335        }
336    
337        public void stop() throws JMSException {
338            longTermStore.stop();
339        }
340    
341        /**
342         * @return Returns the longTermStore.
343         */
344        public MessageStore getLongTermMessageStore() {
345            return longTermStore;
346        }
347    
348        /**
349         * @see org.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.activemq.store.cache.CacheMessageStore)
350         */
351        public void setCacheMessageStore(CacheMessageStore store) {
352            cacheMessageStore = store;
353            // Propagate the setCacheMessageStore method call to the longTermStore
354            // if possible.
355            if (longTermStore instanceof CacheMessageStoreAware) {
356                ((CacheMessageStoreAware) longTermStore).setCacheMessageStore(store);
357            }
358        }
359    
360        /**
361         * @see org.activemq.store.MessageStore#removeAllMessages()
362         */
363        public void removeAllMessages() throws JMSException {
364            peristenceAdapter.checkpoint(true);
365            longTermStore.removeAllMessages();
366        }
367    
368        public void replayAddMessage(ActiveMQMessage msg) {
369            try {
370                // Only add the message if it has not already been added.
371                ActiveMQMessage t = longTermStore.getMessage(msg.getJMSMessageIdentity());
372                if( t==null ) {
373                    longTermStore.addMessage(msg);
374                }
375            }
376            catch (Throwable e) {
377                log.debug("Could not replay add for message '" + msg.getJMSMessageIdentity().getMessageID() + "'.  Message may have already been added. reason: " + e);
378            }
379        }
380    
381        public void replayRemoveMessage(MessageAck ack) {
382            try {
383                // Only remove the message if it has not already been removed.
384                ActiveMQMessage t = longTermStore.getMessage(ack.getMessageIdentity());
385                if( t!=null ) {
386                    longTermStore.removeMessage(ack);
387                }
388            }
389            catch (Throwable e) {
390                log.debug("Could not replay acknowledge for message '" + ack.getMessageID() + "'.  Message may have already been acknowledged. reason: " + e);
391            }
392        }
393    
394    }