001    /** 
002     * 
003     * Copyright 2004 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.io.util;
019    
020    import java.util.HashMap;
021    import java.util.Iterator;
022    
023    import org.activemq.message.ActiveMQMessage;
024    import org.activemq.service.QueueListEntry;
025    import org.activemq.service.impl.DefaultQueueList;
026    import org.activemq.store.cache.MessageCache;
027    
028    /**
029     * A simple cache that stores messages in memory.  Cache entries are evicted 
030     * when the memoryManager starts to run short on memory (A LRU cache is used).
031     *
032     * @version $Revision: 1.1.1.1 $
033     */
034    public class MemoryBoundedMessageCache implements MessageCache, MemoryBoundedObject {
035    
036        private static final int OBJECT_OVERHEAD = 50;
037    
038        private final MemoryBoundedObjectManager memoryManager;
039        
040        /** msgId -> LRUNode */
041        private final HashMap messages = new HashMap();
042        /** Ordered list of messageIds recently used at the front */
043        private final DefaultQueueList lruList = new DefaultQueueList();
044    
045        private int memoryUsedByThisCache;
046        private float growthLimit = 0.75f;
047        private boolean closed;
048        
049        /** Used associate the a Message to it's QueueListEntry in the lruList */
050        private static class CacheNode {        
051            ActiveMQMessage message;
052            QueueListEntry entry;
053        }
054            
055            public MemoryBoundedMessageCache(MemoryBoundedObjectManager memoryManager) {
056            this.memoryManager = memoryManager;
057            this.memoryManager.add(this);
058            }
059            
060            /**
061             * Gets a message that was previously <code>put</code> into this object.   
062             * 
063             * @param msgid
064             * @return null if the message was not previously put or if the message has expired out of the cache.
065             */
066            synchronized public ActiveMQMessage get(String msgid) {
067            CacheNode rc  = (CacheNode) messages.get(msgid);
068            if( rc != null ) {
069                // Move to front (the recently used part of the list).
070                lruList.remove(rc.entry);
071                rc.entry = lruList.addFirst(msgid);
072                return rc.message;
073            }
074            return null;
075            }
076    
077            /**
078             * Puts a message into the cache.
079             * 
080             * @param messageID
081             * @param message
082             */
083            synchronized public void put(String messageID, ActiveMQMessage message) {
084            
085            // Drop old messages until there is space.
086            while( isFull() && !messages.isEmpty() ) {
087                removeOldest();
088            }
089            
090            if( !isFull() ) {
091                incrementMemoryUsed(message);
092                CacheNode newNode = new CacheNode();
093                newNode.message = message;
094                newNode.entry = lruList.addFirst(messageID);            
095                CacheNode oldNode = (CacheNode) messages.put(messageID, newNode);
096                if( oldNode !=null ) {
097                    lruList.remove(oldNode);
098                    decrementMemoryUsed(oldNode.message);
099                }        
100            }
101            }
102    
103        private void removeOldest() {
104            String messageID = (String) lruList.removeLast();
105            CacheNode node = (CacheNode) messages.remove(messageID);
106            decrementMemoryUsed(node.message);
107        }
108    
109        private boolean isFull() {
110            return memoryManager.getPercentFull() > growthLimit;
111        }
112    
113        /**
114             * Remvoes a message from the cache.
115             * 
116             * @param messageID
117             */
118            synchronized public void remove(String messageID) {
119            CacheNode node = (CacheNode) messages.remove(messageID);
120            if( node !=null ) {
121                lruList.remove(node.entry);
122                decrementMemoryUsed(node.message);
123            }        
124            }
125        
126        private void incrementMemoryUsed(ActiveMQMessage packet) {
127            if (packet != null) {
128                int size = OBJECT_OVERHEAD;
129                if (packet != null) {
130                    if (packet.incrementMemoryReferenceCount() == 1) {
131                        size += packet.getMemoryUsage();
132                    }
133                }
134                synchronized( this ) {
135                    memoryUsedByThisCache += size;
136                }
137                memoryManager.incrementMemoryUsed(size);
138            }
139        }
140    
141        private void decrementMemoryUsed(ActiveMQMessage packet) {
142            if (packet != null) {
143                int size = OBJECT_OVERHEAD;
144                if (packet != null) {
145                    if (packet.decrementMemoryReferenceCount() == 0) {
146                        size += packet.getMemoryUsage();
147                    }
148                }
149                            
150                synchronized( this ) {
151                    memoryUsedByThisCache -= size;
152                }
153                memoryManager.decrementMemoryUsed(size);
154            }
155        }
156    
157        /**
158         * @return returns the percentage of memory usage at which that cache will stop to grow.
159         */
160        public float getGrowthLimit() {
161            return growthLimit;
162        }
163        
164        /**
165         * @param growTillFence the percentage of memory usage at which that cache will stop to grow.
166         */
167        public void setGrowthLimit(float growTillFence) {
168            this.growthLimit = growTillFence;
169        }
170    
171        synchronized public void close() {
172            if( closed ) 
173                return;
174            closed=true;
175            
176            for (Iterator iter = messages.values().iterator(); iter.hasNext();) {
177                CacheNode node = (CacheNode) iter.next();
178                decrementMemoryUsed(node.message);
179            }
180            messages.clear();
181            lruList.clear();
182            
183            memoryManager.remove(this);
184        }
185    }