001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq;
020    
021    import java.util.Enumeration;
022    
023    import javax.jms.IllegalStateException;
024    import javax.jms.JMSException;
025    import javax.jms.Queue;
026    import javax.jms.QueueBrowser;
027    
028    import org.activemq.message.ActiveMQQueue;
029    
030    /**
031     * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
032     * queue without removing them.
033     * <p/>
034     * <P>
035     * The <CODE>getEnumeration</CODE> method returns a <CODE>
036     * java.util.Enumeration</CODE> that is used to scan the queue's messages. It
037     * may be an enumeration of the entire content of a queue, or it may contain
038     * only the messages matching a message selector.
039     * <p/>
040     * <P>
041     * Messages may be arriving and expiring while the scan is done. The JMS API
042     * does not require the content of an enumeration to be a static snapshot of
043     * queue content. Whether these changes are visible or not depends on the JMS
044     * provider.
045     * <p/>
046     * <P>
047     * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
048     * </CODE> or a <CODE>QueueSession</CODE>.
049     *
050     * @see javax.jms.Session#createBrowser
051     * @see javax.jms.QueueSession#createBrowser
052     * @see javax.jms.QueueBrowser
053     * @see javax.jms.QueueReceiver
054     */
055    
056    public class ActiveMQQueueBrowser implements
057            QueueBrowser, Enumeration {
058    
059        private final ActiveMQSession session;
060        private final ActiveMQQueue destination;
061        private final String selector;
062        private final int cnum;
063        
064        private ActiveMQMessageConsumer consumer;
065        private boolean closed;
066        
067        /**
068         * Constructor for an ActiveMQQueueBrowser - used internally
069         *
070         * @param theSession
071         * @param dest
072         * @param selector
073         * @param cnum
074         * @throws JMSException
075         */
076        protected ActiveMQQueueBrowser(ActiveMQSession session, ActiveMQQueue destination, String selector, int cnum) throws JMSException {
077            this.session = session;
078            this.destination = destination;
079            this.selector = selector;
080            this.cnum = cnum;        
081            consumer = createConsumer();
082        }
083    
084        /**
085         * @param session
086         * @param destination
087         * @param selector
088         * @param cnum
089         * @return
090         * @throws JMSException
091         */
092        private ActiveMQMessageConsumer createConsumer() throws JMSException {
093            return new ActiveMQMessageConsumer(session, destination, "", selector, cnum, session.connection.getPrefetchPolicy().getQueueBrowserPrefetch(), false, true);
094        }
095        
096        private void destroyConsumer() {
097            if( consumer == null )
098                return;
099            
100            try {
101                consumer.close();
102                consumer=null;
103            } catch (JMSException e) {
104                e.printStackTrace();
105            }
106        }
107    
108        /**
109         * Gets an enumeration for browsing the current queue messages in the order
110         * they would be received.
111         *
112         * @return an enumeration for browsing the messages
113         * @throws JMSException if the JMS provider fails to get the enumeration for this
114         *                      browser due to some internal error.
115         */
116    
117        public Enumeration getEnumeration() throws JMSException {
118            checkClosed();
119            
120            if( consumer==null )
121                consumer = createConsumer();
122            
123            //ok - started browsing - wait for inbound messages
124            if (consumer.messageQueue.size() == 0) {
125                try {
126                    Thread.sleep(1000);
127                }
128                catch (InterruptedException e) {
129                }
130            }
131            return this;
132        }
133    
134        private void checkClosed() throws IllegalStateException {
135            if (closed) {
136                throw new IllegalStateException("The Consumer is closed");
137            }
138        }
139    
140        /**
141         * @return true if more messages to process
142         */
143        public boolean hasMoreElements() {
144            if( consumer==null )
145                return false;
146            
147            boolean rc = consumer.messageQueue.size() > 0;
148            if( !rc ) {
149                destroyConsumer();
150            }
151            return rc;
152        }
153    
154    
155        /**
156         * @return the next message
157         */
158        public Object nextElement() {
159            if( consumer == null )
160                return null;
161            
162            Object answer = null;
163            try {
164                answer = consumer.receiveNoWait();
165                if( answer==null ) {
166                    destroyConsumer();
167                }
168            }
169            catch (JMSException e) {
170                e.printStackTrace();
171            }
172            return answer;
173        }
174        
175        public void close() throws JMSException {
176            destroyConsumer();
177            closed=true;
178        }
179    
180        /**
181         * Gets the queue associated with this queue browser.
182         *
183         * @return the queue
184         * @throws JMSException if the JMS provider fails to get the queue associated
185         *                      with this browser due to some internal error.
186         */
187    
188        public Queue getQueue() throws JMSException {
189            return destination;
190        }
191    
192    
193        public String getMessageSelector() throws JMSException {
194            return selector;
195        }
196    
197    
198    }