001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.io.util;
020    
021    import java.util.ArrayList;
022    import java.util.Iterator;
023    import java.util.List;
024    
025    import javax.jms.JMSException;
026    
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    import org.activemq.service.QueueListEntry;
030    import org.activemq.service.impl.DefaultQueueList;
031    
032    import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
033    
034    /**
035     * MemoryBoundedQueue is a queue bounded by memory usage for MemoryManageables
036     *
037     * @version $Revision: 1.1.1.1 $
038     */
039    public class MemoryBoundedQueue implements MemoryBoundedObject {
040        
041        private static final Log log = LogFactory.getLog(MemoryBoundedQueue.class);
042        
043        private static final int OBJECT_OVERHEAD = 50;
044        protected static final int WAIT_TIMEOUT = 100;
045    
046        private final MemoryBoundedQueueManager manager;
047        private final String name;
048    
049        protected final Object outLock = new Object();
050        protected final Object inLock = new Object();
051        private final DefaultQueueList internalList = new DefaultQueueList();
052        protected boolean stopped = false;
053        protected boolean closed = false;
054        private SynchronizedLong memoryUsedByThisQueue = new SynchronizedLong(0);
055    
056        /**
057         * Constructor
058         *
059         * @param name
060         * @param manager
061         * @param name 
062         */
063        public MemoryBoundedQueue(MemoryBoundedQueueManager manager, String name) {
064            this.manager = manager;
065            this.name = name;
066            this.manager.add(this);
067        }
068        /**
069         * @return a pretty print of this queue
070         */
071        public String toString() {
072            return "MemoryBoundedQueue{ size=" + size() + ", memory usage=" + memoryUsedByThisQueue + " }";
073        }
074    
075        /**
076         * @return the number of items held by this queue
077         */
078        public int size() {
079            return internalList.size();
080        }
081    
082        /**
083         * @return an aproximation the memory used by this queue
084         */
085        public long getLocalMemoryUsedByThisQueue() {
086            return memoryUsedByThisQueue.get();
087        }
088    
089        /**
090         * close and remove this queue from the MemoryBoundedQueueManager
091         */
092        public void close() {
093            try {
094                clear();
095                closed = true;
096                synchronized (outLock) {
097                    outLock.notifyAll();
098                }
099                synchronized (inLock) {
100                    inLock.notifyAll();
101                }
102            }
103            catch (Throwable e) {
104                e.printStackTrace();
105            }
106            finally {
107                manager.remove(this);
108            }
109        }
110    
111        /**
112         * Enqueue a MemoryManageable without checking memory usage limits
113         *
114         * @param packet
115         */
116        public void enqueueNoBlock(MemoryManageable packet) {
117            if (!closed) {
118                internalList.add(packet);
119                incrementMemoryUsed(packet);
120                synchronized (outLock) {
121                    outLock.notify();
122                }
123            }
124        }
125    
126        /**
127         * Enqueue a MemoryManageable to this queue
128         *
129         * @param packet
130         */
131        public void enqueue(MemoryManageable packet) {
132            if (!manager.isMemoryLimitEnforced() || !manager.isFull()) {
133                enqueueNoBlock(packet);
134            }
135            else {
136                synchronized (inLock) {
137                    try {
138                        while (manager.isFull() && !closed) {
139                            log.warn("Queue is full, waiting for it to be dequeued.");
140                            inLock.wait(WAIT_TIMEOUT);
141                        }
142                    }
143                    catch (InterruptedException ie) {
144                    }
145                }
146                enqueueNoBlock(packet);
147            }
148        }
149    
150        /**
151         * Enqueue a packet to the head of the queue with total disregard for memory constraints
152         *
153         * @param packet
154         */
155        public void enqueueFirstNoBlock(MemoryManageable packet) {
156            if (!closed) {
157                internalList.addFirst(packet);
158                incrementMemoryUsed(packet);
159                synchronized (outLock) {
160                    outLock.notify();
161                }
162            }
163        }
164    
165        /**
166         * Enqueue an array of packets to the head of the queue with total disregard for memory constraints
167         *
168         * @param packets
169         */
170        public void enqueueAllFirstNoBlock(List packets) {
171            if (!closed) {
172                internalList.addAllFirst(packets);
173                Iterator iterator = packets.iterator();
174                for (Iterator iter = packets.iterator(); iter.hasNext();) {
175                    MemoryManageable packet = (MemoryManageable) iter.next();
176                    incrementMemoryUsed(packet);
177                }
178                synchronized (outLock) {
179                    outLock.notify();
180                }
181            }
182        }
183    
184        /**
185         * Enqueue a MemoryManageable to the head of the queue
186         *
187         * @param packet
188         * @throws InterruptedException
189         */
190        public void enqueueFirst(MemoryManageable packet) throws InterruptedException {
191            if (!manager.isMemoryLimitEnforced() || !manager.isFull()) {
192                enqueueFirstNoBlock(packet);
193            }
194            else {
195                synchronized (inLock) {
196                    while (manager.isFull() && !closed) {
197                        inLock.wait(WAIT_TIMEOUT);
198                    }
199                }
200                enqueueFirstNoBlock(packet);
201            }
202        }
203    
204        /**
205         * @return the first dequeued MemoryManageable or blocks until one is available
206         * @throws InterruptedException
207         */
208        public MemoryManageable dequeue() throws InterruptedException {
209            MemoryManageable result = null;
210            synchronized (outLock) {
211                while (internalList.isEmpty() && !closed) {
212                    outLock.wait(WAIT_TIMEOUT);
213                }
214                result = dequeueNoWait();
215            }
216            return result;
217        }
218    
219        /**
220         * Dequeues a MemoryManageable from the head of the queue
221         *
222         * @param timeInMillis time to wait for a MemoryManageable to be available
223         * @return the first MemoryManageable or null if none available within <I>timeInMillis </I>
224         * @throws InterruptedException
225         */
226        public MemoryManageable dequeue(long timeInMillis) throws InterruptedException {
227            MemoryManageable result = null;
228            if (timeInMillis == 0) {
229                result = dequeue();
230            }
231            else {
232                synchronized (outLock) {
233                    // if timeInMillis is less than zero assume nowait
234                    long waitTime = timeInMillis;
235                    long start = (timeInMillis <= 0) ? 0 : System.currentTimeMillis();
236                    while (!closed) {
237                        result = dequeueNoWait();
238                        if (result != null || waitTime <= 0) {
239                            break;
240                        }
241                        else {
242                            outLock.wait(waitTime);
243                            waitTime = timeInMillis - (System.currentTimeMillis() - start);
244                        }
245                    }
246                }
247            }
248            return result;
249        }
250    
251        /**
252         * dequeues a MemoryManageable from the head of the queue
253         *
254         * @return the MemoryManageable at the head of the queue or null, if none is available
255         * @throws InterruptedException
256         */
257        public MemoryManageable dequeueNoWait() throws InterruptedException {
258            MemoryManageable packet = null;
259            synchronized (outLock) {
260                while (stopped && !closed) {
261                    outLock.wait(WAIT_TIMEOUT);
262                }
263            }
264            packet = (MemoryManageable) internalList.removeFirst();
265            decrementMemoryUsed(packet);
266            if (packet != null) {
267                synchronized (inLock) {
268                    inLock.notify();
269                }
270            }
271            return packet;
272        }
273    
274        /**
275         * @return true if the queue is enabled for dequeing (default = true)
276         */
277        public boolean isStarted() {
278            synchronized (outLock) {
279                return stopped == false;
280            }
281        }
282    
283        /**
284         * disable dequeueing
285         */
286        public void stop() {
287            synchronized (outLock) {
288                stopped = true;
289            }
290        }
291    
292        /**
293         * enable dequeueing
294         */
295        public void start() {
296            synchronized (outLock) {
297                stopped = false;
298                outLock.notifyAll();
299            }
300            synchronized (inLock) {
301                inLock.notifyAll();
302            }
303        }
304    
305        /**
306         * Remove a packet from the queue
307         *
308         * @param packet
309         * @return true if the packet was found
310         */
311        public boolean remove(MemoryManageable packet) {
312            boolean result = false;
313            if (!internalList.isEmpty()) {
314                result = internalList.remove(packet);
315            }
316            if (result) {
317                decrementMemoryUsed(packet);
318            }
319            synchronized (inLock) {
320                inLock.notify();
321            }
322            return result;
323        }
324    
325        /**
326         * Remove a MemoryManageable by it's id
327         *
328         * @param id
329         * @return
330         */
331        public MemoryManageable remove(Object id) {
332            MemoryManageable result = null;
333            QueueListEntry entry = internalList.getFirstEntry();
334            try {
335                while (entry != null) {
336                    MemoryManageable p = (MemoryManageable) entry.getElement();
337                    if (p.getMemoryId().equals(id)) {
338                        result = p;
339                        remove(p);
340                        break;
341                    }
342                    entry = internalList.getNextEntry(entry);
343                }
344            }
345            catch (JMSException jmsEx) {
346                jmsEx.printStackTrace();
347            }
348            synchronized (inLock) {
349                inLock.notify();
350            }
351            return result;
352        }
353    
354        /**
355         * remove any MemoryManageables in the queue
356         */
357        public void clear() {
358            while (!internalList.isEmpty()) {
359                MemoryManageable packet = (MemoryManageable) internalList.removeFirst();
360                decrementMemoryUsed(packet);
361            }
362            synchronized (inLock) {
363                inLock.notifyAll();
364            }
365        }
366    
367        /**
368         * @return true if the queue is empty
369         */
370        public boolean isEmpty() {
371            return internalList.isEmpty();
372        }
373    
374        /**
375         * retrieve a MemoryManageable at an indexed position in the queue
376         *
377         * @param index
378         * @return
379         */
380        public MemoryManageable get(int index) {
381            return (MemoryManageable) internalList.get(index);
382        }
383    
384        /**
385         * Retrieve a shallow copy of the contents as a list
386         *
387         * @return a list containing the bounded queue contents
388         */
389        public List getContents() {
390            Object[] array = internalList.toArray();
391            List list = new ArrayList();
392            for (int i = 0; i < array.length; i++) {
393                list.add(array[i]);
394            }
395            return list;
396        }
397    
398        protected void incrementMemoryUsed(MemoryManageable packet) {
399            if (packet != null) {
400                int size = OBJECT_OVERHEAD;
401                if (packet != null) {
402                    if (packet.incrementMemoryReferenceCount() == 1) {
403                        size += packet.getMemoryUsage();
404                    }
405                }    
406                memoryUsedByThisQueue.add(size);
407                manager.incrementMemoryUsed(size);
408            }
409        }
410    
411        protected void decrementMemoryUsed(MemoryManageable packet) {
412            if (packet != null) {
413                int size = OBJECT_OVERHEAD;
414                if (packet != null) {
415                    if ( packet.decrementMemoryReferenceCount() == 0) {
416                        size += packet.getMemoryUsage();
417                    }
418                }
419                            
420                memoryUsedByThisQueue.subtract(size);
421                manager.decrementMemoryUsed(size);
422            }
423        }
424        /**
425         * @return Returns the name.
426         */
427        public String getName() {
428            return name;
429        }
430    }