001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * Copyright 2004 Hiram Chirino
005     * 
006     * Licensed under the Apache License, Version 2.0 (the "License"); 
007     * you may not use this file except in compliance with the License. 
008     * You may obtain a copy of the License at 
009     * 
010     * http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS, 
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
015     * See the License for the specific language governing permissions and 
016     * limitations under the License. 
017     * 
018     **/
019    
020    package org.activemq.io.util;
021    import java.util.ArrayList;
022    import java.util.Iterator;
023    import java.util.List;
024    
025    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
026    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
027    
028    /**
029     * A factory manager for MemoryBoundedQueue and also ensures that the maximum memory used by all active
030     * MemoryBoundedQueues created by this instance stays within the memory usage bounds set.
031     * 
032     * @version $Revision: 1.1.1.1 $
033     */
034    public class MemoryBoundedQueueManager {
035        
036        private final ConcurrentHashMap activeQueues = new ConcurrentHashMap();
037        private final MemoryBoundedObjectManager memoryManager;
038        private final SynchronizedBoolean memoryLimitEnforced = new SynchronizedBoolean(true);
039        
040        /**
041         * @param name
042         * @param maxSize
043         */
044        public MemoryBoundedQueueManager(MemoryBoundedObjectManager memoryManager) {
045            this.memoryManager = memoryManager;        
046        }
047    
048        /**
049         * retrieve a named MemoryBoundedQueue or creates one if not found
050         * 
051         * @param name
052         * @return an named instance of a MemoryBoundedQueue
053         */
054        public MemoryBoundedQueue getMemoryBoundedQueue(String name) {
055            MemoryBoundedQueue result = null;
056            synchronized (activeQueues) {
057                result = (MemoryBoundedQueue) activeQueues.get(name);
058                if (result == null) {
059                    if (memoryManager.isSupportJMSPriority())
060                        result = new MemoryBoundedPrioritizedQueue(this, name);
061                    else
062                        result = new MemoryBoundedQueue(this, name);
063                    activeQueues.put(name, result);
064                }
065            }
066            return result;
067        }
068    
069        /**
070         * close this queue manager and all associated MemoryBoundedQueues
071         */
072        public void close() {
073            memoryManager.close();
074        }
075    
076        /**
077         * @return Returns the memoryManager.
078         */
079        public MemoryBoundedObjectManager getMemoryManager() {
080            return memoryManager;
081        }
082    
083        public int getCurrentCapacity() {
084            return memoryManager.getCurrentCapacity();
085        }
086    
087        public void add(MemoryBoundedQueue queue) {
088            memoryManager.add(queue);
089        }
090    
091        public void remove(MemoryBoundedQueue queue) {
092            memoryManager.remove(queue);
093            activeQueues.remove(queue.getName());
094        }
095    
096        public boolean isFull() {
097            return memoryManager.isFull();
098        }
099    
100        public void incrementMemoryUsed(int size) {
101            memoryManager.incrementMemoryUsed(size);
102        }
103    
104        public void decrementMemoryUsed(int size) {
105            memoryManager.decrementMemoryUsed(size);
106        }    
107        
108        public List getMemoryBoundedQueues(){
109            return new ArrayList(activeQueues.values());
110        }
111        
112        public String dumpContents(){
113            String result = "Memory = " + memoryManager.getTotalMemoryUsedSize() + " , capacity = " + memoryManager.getCurrentCapacity() + "\n";
114            for (Iterator i = activeQueues.values().iterator(); i.hasNext(); ){
115                MemoryBoundedQueue q = (MemoryBoundedQueue)i.next();
116                result += "\t" + q.getName() + " enqueued = " + q.getContents().size() + " memory = " + q.getLocalMemoryUsedByThisQueue() + "\n";
117            }
118            result += "\n\n";
119            return result;
120        }
121        
122        public void setMemoryLimitEnforced(boolean enable) {
123            memoryLimitEnforced.set(enable);
124        }
125        
126        public boolean isMemoryLimitEnforced() {
127            return memoryLimitEnforced.get();
128        }
129    }