001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * Copyright 2004 Hiram Chirino 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 020 package org.activemq.io.util; 021 import java.util.ArrayList; 022 import java.util.Iterator; 023 import java.util.List; 024 025 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 026 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 027 028 /** 029 * A factory manager for MemoryBoundedQueue and also ensures that the maximum memory used by all active 030 * MemoryBoundedQueues created by this instance stays within the memory usage bounds set. 031 * 032 * @version $Revision: 1.1.1.1 $ 033 */ 034 public class MemoryBoundedQueueManager { 035 036 private final ConcurrentHashMap activeQueues = new ConcurrentHashMap(); 037 private final MemoryBoundedObjectManager memoryManager; 038 private final SynchronizedBoolean memoryLimitEnforced = new SynchronizedBoolean(true); 039 040 /** 041 * @param name 042 * @param maxSize 043 */ 044 public MemoryBoundedQueueManager(MemoryBoundedObjectManager memoryManager) { 045 this.memoryManager = memoryManager; 046 } 047 048 /** 049 * retrieve a named MemoryBoundedQueue or creates one if not found 050 * 051 * @param name 052 * @return an named instance of a MemoryBoundedQueue 053 */ 054 public MemoryBoundedQueue getMemoryBoundedQueue(String name) { 055 MemoryBoundedQueue result = null; 056 synchronized (activeQueues) { 057 result = (MemoryBoundedQueue) activeQueues.get(name); 058 if (result == null) { 059 if (memoryManager.isSupportJMSPriority()) 060 result = new MemoryBoundedPrioritizedQueue(this, name); 061 else 062 result = new MemoryBoundedQueue(this, name); 063 activeQueues.put(name, result); 064 } 065 } 066 return result; 067 } 068 069 /** 070 * close this queue manager and all associated MemoryBoundedQueues 071 */ 072 public void close() { 073 memoryManager.close(); 074 } 075 076 /** 077 * @return Returns the memoryManager. 078 */ 079 public MemoryBoundedObjectManager getMemoryManager() { 080 return memoryManager; 081 } 082 083 public int getCurrentCapacity() { 084 return memoryManager.getCurrentCapacity(); 085 } 086 087 public void add(MemoryBoundedQueue queue) { 088 memoryManager.add(queue); 089 } 090 091 public void remove(MemoryBoundedQueue queue) { 092 memoryManager.remove(queue); 093 activeQueues.remove(queue.getName()); 094 } 095 096 public boolean isFull() { 097 return memoryManager.isFull(); 098 } 099 100 public void incrementMemoryUsed(int size) { 101 memoryManager.incrementMemoryUsed(size); 102 } 103 104 public void decrementMemoryUsed(int size) { 105 memoryManager.decrementMemoryUsed(size); 106 } 107 108 public List getMemoryBoundedQueues(){ 109 return new ArrayList(activeQueues.values()); 110 } 111 112 public String dumpContents(){ 113 String result = "Memory = " + memoryManager.getTotalMemoryUsedSize() + " , capacity = " + memoryManager.getCurrentCapacity() + "\n"; 114 for (Iterator i = activeQueues.values().iterator(); i.hasNext(); ){ 115 MemoryBoundedQueue q = (MemoryBoundedQueue)i.next(); 116 result += "\t" + q.getName() + " enqueued = " + q.getContents().size() + " memory = " + q.getLocalMemoryUsedByThisQueue() + "\n"; 117 } 118 result += "\n\n"; 119 return result; 120 } 121 122 public void setMemoryLimitEnforced(boolean enable) { 123 memoryLimitEnforced.set(enable); 124 } 125 126 public boolean isMemoryLimitEnforced() { 127 return memoryLimitEnforced.get(); 128 } 129 }