001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.io.util;
020    
021    import java.util.ArrayList;
022    import java.util.Iterator;
023    import java.util.List;
024    
025    import javax.jms.JMSException;
026    
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    import org.activemq.service.QueueListEntry;
030    import org.activemq.service.impl.DefaultQueueList;
031    
032    /**
033     * @author Ramzi Saba
034     *
035     * A prioritized version of the MemoryBoundedQueue supporting the 10 JMS priority levels
036     * 0-9, 0 being the lowest and 9 being the highest.
037     *
038     * @version $Revision: 1.1.1.1 $
039     */
040    public class MemoryBoundedPrioritizedQueue extends MemoryBoundedQueue {
041        
042        private static final Log log = LogFactory.getLog(MemoryBoundedPrioritizedQueue.class);    
043        private static final int DEFAULT_PRIORITY = 4;
044        private final DefaultQueueList[] prioritizedPackets = new DefaultQueueList[10]; // array of 10 prioritized queues
045    
046        /**
047         * Constructor
048         *
049         * @param name
050         * @param manager
051         * @param name 
052         */
053        public MemoryBoundedPrioritizedQueue(MemoryBoundedQueueManager manager, String name) {
054            super(manager, name);
055            for (int i=0; i<10; ++i) {
056                    this.prioritizedPackets[i] = new DefaultQueueList();
057            }
058        }
059    
060        /**
061         * @return the number of items held by this queue
062         */
063        public int size() {
064            //return internalList.size();
065            int size=0;
066            for (int j=0; j<10; ++j) {
067                    size += prioritizedPackets[j].size();
068            }
069            return size;
070        }
071    
072        /**
073         * Enqueue a MemoryManageable without checking memory usage limits
074         *
075         * @param packet
076         */
077        public void enqueueNoBlock(MemoryManageable packet) {
078            if (!closed) {
079                //internalList.add(packet);
080                    prioritizedPackets[getPacketPriority(packet)].add(packet);
081                incrementMemoryUsed(packet);
082                synchronized (outLock) {
083                    outLock.notify();
084                }
085            }
086        }
087    
088        /**
089         * Enqueue a packet to the head of the queue with total disregard for memory constraints
090         *
091         * @param packet
092         */
093        public final void enqueueFirstNoBlock(MemoryManageable packet) {
094            if (!closed) {
095                //internalList.addFirst(packet);
096                    prioritizedPackets[getPacketPriority(packet)].addFirst(packet);
097                incrementMemoryUsed(packet);
098                synchronized (outLock) {
099                    outLock.notify();
100                }
101            }
102        }
103    
104        /**
105         * Enqueue an array of packets to the head of the queue with total disregard for memory constraints
106         *
107         * @param packets
108         */
109        public void enqueueAllFirstNoBlock(List packets) {
110            if (!closed) {
111                //internalList.addAllFirst(packets);
112                Iterator iterator = packets.iterator();
113                for (Iterator iter = packets.iterator(); iter.hasNext();) {
114                    MemoryManageable packet = (MemoryManageable) iter.next();
115                    prioritizedPackets[getPacketPriority(packet)].addFirst(packet);
116                    incrementMemoryUsed(packet);
117                }
118                synchronized (outLock) {
119                    outLock.notify();
120                }
121            }
122        }
123    
124        /**
125         * @return the first dequeued MemoryManageable or blocks until one is available
126         * @throws InterruptedException
127         */
128        public MemoryManageable dequeue() throws InterruptedException {
129            MemoryManageable result = null;
130            synchronized (outLock) {
131                //while (internalList.isEmpty() && !closed) {
132                while (isEmpty() && !closed) {
133                    outLock.wait(WAIT_TIMEOUT);
134                }
135                result = dequeueNoWait();
136            }
137            return result;
138        }
139    
140        /**
141         * dequeues a MemoryManageable from the head of the queue
142         *
143         * @return the MemoryManageable at the head of the queue or null, if none is available
144         * @throws InterruptedException
145         */
146        public MemoryManageable dequeueNoWait() throws InterruptedException {
147            MemoryManageable packet = null;
148            synchronized (outLock) {
149                while (stopped && !closed) {
150                    outLock.wait(WAIT_TIMEOUT);
151                }
152            }
153            //packet = (MemoryManageable) internalList.removeFirst();
154            for (int i=9; i>=0; --i) {
155                    packet = (MemoryManageable) prioritizedPackets[i].removeFirst();
156                    if (packet != null) break;
157            }
158            decrementMemoryUsed(packet);
159            if (packet != null) {
160                synchronized (inLock) {
161                    inLock.notify();
162                }
163            }
164            return packet;
165        }
166    
167        /**
168         * Remove a packet from the queue
169         *
170         * @param packet
171         * @return true if the packet was found
172         */
173        public boolean remove(MemoryManageable packet) {
174            boolean result = false;
175            //if (!internalList.isEmpty()) {
176            if (!isEmpty()) {
177                //result = internalList.remove(packet);
178                    result = prioritizedPackets[getPacketPriority(packet)].remove(packet);
179            }
180            if (result) {
181                decrementMemoryUsed(packet);
182            }
183            synchronized (inLock) {
184                inLock.notify();
185            }
186            return result;
187        }
188    
189        /**
190         * Remove a MemoryManageable by it's id
191         *
192         * @param id
193         * @return
194         */
195        public MemoryManageable remove(Object id) {
196            MemoryManageable result = null;
197            for (int i=0; i<10; ++i) {
198                    //QueueListEntry entry = internalList.getFirstEntry();
199                    QueueListEntry entry = prioritizedPackets[i].getFirstEntry();
200                    try {
201                        while (entry != null) {
202                            MemoryManageable p = (MemoryManageable) entry.getElement();
203                            if (p.getMemoryId().equals(id)) {
204                                result = p;
205                                remove(p);
206                                break;
207                            }
208                            //entry = internalList.getNextEntry(entry);
209                            entry = prioritizedPackets[i].getNextEntry(entry);
210                        }
211                    }
212                    catch (JMSException jmsEx) {
213                        jmsEx.printStackTrace();
214                    }
215            }
216            synchronized (inLock) {
217                inLock.notify();
218            }
219            return result;
220        }
221    
222        /**
223         * remove any MemoryManageables in the queue
224         */
225        public void clear() {
226            //while (!internalList.isEmpty()) {
227            for (int i=0; i<10; ++i) {
228                    while (!prioritizedPackets[i].isEmpty()) {
229                        //MemoryManageable packet = (MemoryManageable) internalList.removeFirst();
230                            MemoryManageable packet = (MemoryManageable) prioritizedPackets[i].removeFirst();
231                        decrementMemoryUsed(packet);
232                    }
233            }
234            synchronized (inLock) {
235                inLock.notifyAll();
236            }
237        }
238    
239        /**
240         * @return true if the queue is empty
241         */
242        public boolean isEmpty() {
243            //return internalList.isEmpty();
244            for (int i=0; i<10; ++i) {
245                    if (!prioritizedPackets[i].isEmpty()) return false;
246            }
247            return true;
248        }
249    
250        /**
251         * retrieve a MemoryManageable at an indexed position in the queue
252         *
253         * @param index
254         * @return
255         */
256        public MemoryManageable get(int index) {
257            //return (MemoryManageable) internalList.get(index);
258            throw new UnsupportedOperationException("Cannot invoke this method on a MemoryBoundedPrioritizedQueue instance");
259        }
260    
261        /**
262         * Retrieve a shallow copy of the contents as a list
263         *
264         * @return a list containing the bounded queue contents
265         */
266        public List getContents() {
267            //Object[] array = internalList.toArray();
268            List list = new ArrayList();
269            for (int j=9; j>=0; --j) {
270                    Object[] array = prioritizedPackets[j].toArray();
271                    for (int i = 0; i < array.length; i++) {
272                        list.add(array[i]);
273                    }
274            }
275            return list;
276        }
277    
278        private int getPacketPriority(MemoryManageable packet) {
279            int priority=DEFAULT_PRIORITY;
280            if (packet.getPriority()>=0 || packet.getPriority()<=9) {
281                    priority = packet.getPriority();
282            }
283            return priority;
284        }
285    }