001    /** 
002     * 
003     * Copyright 2004 Hiram Chirino
004     * Copyright 2004 Protique Ltd
005     * 
006     * Licensed under the Apache License, Version 2.0 (the "License"); 
007     * you may not use this file except in compliance with the License. 
008     * You may obtain a copy of the License at 
009     * 
010     * http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS, 
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
015     * See the License for the specific language governing permissions and 
016     * limitations under the License. 
017     * 
018     **/
019    package org.activemq.store.cache;
020    
021    import javax.jms.JMSException;
022    
023    import org.activemq.message.ActiveMQMessage;
024    import org.activemq.message.MessageAck;
025    import org.activemq.service.MessageIdentity;
026    import org.activemq.store.MessageStore;
027    import org.activemq.store.RecoveryListener;
028    
029    /**
030     * A MessageStore that uses an in memory cache to speed up getMessage() method calls.
031     * 
032     * @version $Revision: 1.1.1.1 $
033     */
034    public class CacheMessageStore implements MessageStore, CacheMessageStoreAware {
035            
036            private final MessageStore longTermStore;       
037            private final MessageCache cache;
038    
039            public CacheMessageStore(CachePersistenceAdapter adapter, MessageStore longTermStore, MessageCache cache) {
040                    this.longTermStore = longTermStore;
041                    this.cache = cache;
042                    
043                    // Make any downstream CacheMessageStoreAware objects aware of us.
044                    setCacheMessageStore(this);
045            }
046    
047            /**
048             * Add the meessage to the long term store and cache it.
049             */
050            public void addMessage(ActiveMQMessage message) throws JMSException {           
051                    longTermStore.addMessage(message);              
052                    cache.put(message.getJMSMessageID(), message);          
053            }
054            
055            /**
056             * Remove the meessage to the long term store and remove it from the cache.
057             */
058            public void removeMessage(MessageAck ack) throws JMSException {         
059                    longTermStore.removeMessage( ack);
060                    cache.remove(ack.getMessageID());               
061            }
062    
063            /**
064             * Return the message from the cache or go to the longTermStore if it is not 
065             * in there. 
066             */
067            public ActiveMQMessage getMessage(MessageIdentity identity)     throws JMSException {
068                    ActiveMQMessage answer=null;
069                    answer = cache.get(identity.getMessageID());
070                    if( answer!=null )
071                            return answer;
072                    
073                    answer = longTermStore.getMessage(identity);
074                    cache.put(identity.getMessageID(), answer);
075                    return answer;
076            }
077    
078            /**
079             * Replays the checkpointStore first as those messages are the oldest ones,
080             * then messages are replayed from the transaction log and then the cache is
081             * updated.
082             * 
083             * @param listener
084             * @throws JMSException
085             */
086            public synchronized void recover(final RecoveryListener listener)
087                            throws JMSException {
088                    longTermStore.recover(listener);
089            }
090    
091            public void start() throws JMSException {
092                    longTermStore.start();
093            }
094    
095            public void stop() throws JMSException {
096                    longTermStore.stop();
097             cache.close();
098            }
099    
100            /**
101             * @return Returns the longTermStore.
102             */
103            public MessageStore getLongTermStore() {
104                    return longTermStore;
105            }
106    
107            /**
108             * @see org.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.activemq.store.cache.CacheMessageStore)
109             */
110            public void setCacheMessageStore(CacheMessageStore store) {
111                    // Make any downstream CacheMessageStoreAware objects aware of us.
112                    // This cache implementation could be cached by another cache.
113                    if( longTermStore instanceof CacheMessageStoreAware ) {
114                            ((CacheMessageStoreAware)longTermStore).setCacheMessageStore(store);
115                    }
116            }
117    
118        /**
119         * @see org.activemq.store.MessageStore#removeAllMessages()
120         */
121        public void removeAllMessages() throws JMSException {
122                    longTermStore.removeAllMessages();
123        }
124    
125            
126    }