001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq; 020 021 import java.util.Enumeration; 022 023 import javax.jms.IllegalStateException; 024 import javax.jms.JMSException; 025 import javax.jms.Queue; 026 import javax.jms.QueueBrowser; 027 028 import org.activemq.message.ActiveMQQueue; 029 030 /** 031 * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a 032 * queue without removing them. 033 * <p/> 034 * <P> 035 * The <CODE>getEnumeration</CODE> method returns a <CODE> 036 * java.util.Enumeration</CODE> that is used to scan the queue's messages. It 037 * may be an enumeration of the entire content of a queue, or it may contain 038 * only the messages matching a message selector. 039 * <p/> 040 * <P> 041 * Messages may be arriving and expiring while the scan is done. The JMS API 042 * does not require the content of an enumeration to be a static snapshot of 043 * queue content. Whether these changes are visible or not depends on the JMS 044 * provider. 045 * <p/> 046 * <P> 047 * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session 048 * </CODE> or a <CODE>QueueSession</CODE>. 049 * 050 * @see javax.jms.Session#createBrowser 051 * @see javax.jms.QueueSession#createBrowser 052 * @see javax.jms.QueueBrowser 053 * @see javax.jms.QueueReceiver 054 */ 055 056 public class ActiveMQQueueBrowser implements 057 QueueBrowser, Enumeration { 058 059 private final ActiveMQSession session; 060 private final ActiveMQQueue destination; 061 private final String selector; 062 private final int cnum; 063 064 private ActiveMQMessageConsumer consumer; 065 private boolean closed; 066 067 /** 068 * Constructor for an ActiveMQQueueBrowser - used internally 069 * 070 * @param theSession 071 * @param dest 072 * @param selector 073 * @param cnum 074 * @throws JMSException 075 */ 076 protected ActiveMQQueueBrowser(ActiveMQSession session, ActiveMQQueue destination, String selector, int cnum) throws JMSException { 077 this.session = session; 078 this.destination = destination; 079 this.selector = selector; 080 this.cnum = cnum; 081 consumer = createConsumer(); 082 } 083 084 /** 085 * @param session 086 * @param destination 087 * @param selector 088 * @param cnum 089 * @return 090 * @throws JMSException 091 */ 092 private ActiveMQMessageConsumer createConsumer() throws JMSException { 093 return new ActiveMQMessageConsumer(session, destination, "", selector, cnum, session.connection.getPrefetchPolicy().getQueueBrowserPrefetch(), false, true); 094 } 095 096 private void destroyConsumer() { 097 if( consumer == null ) 098 return; 099 100 try { 101 consumer.close(); 102 consumer=null; 103 } catch (JMSException e) { 104 e.printStackTrace(); 105 } 106 } 107 108 /** 109 * Gets an enumeration for browsing the current queue messages in the order 110 * they would be received. 111 * 112 * @return an enumeration for browsing the messages 113 * @throws JMSException if the JMS provider fails to get the enumeration for this 114 * browser due to some internal error. 115 */ 116 117 public Enumeration getEnumeration() throws JMSException { 118 checkClosed(); 119 120 if( consumer==null ) 121 consumer = createConsumer(); 122 123 //ok - started browsing - wait for inbound messages 124 if (consumer.messageQueue.size() == 0) { 125 try { 126 Thread.sleep(1000); 127 } 128 catch (InterruptedException e) { 129 } 130 } 131 return this; 132 } 133 134 private void checkClosed() throws IllegalStateException { 135 if (closed) { 136 throw new IllegalStateException("The Consumer is closed"); 137 } 138 } 139 140 /** 141 * @return true if more messages to process 142 */ 143 public boolean hasMoreElements() { 144 if( consumer==null ) 145 return false; 146 147 boolean rc = consumer.messageQueue.size() > 0; 148 if( !rc ) { 149 destroyConsumer(); 150 } 151 return rc; 152 } 153 154 155 /** 156 * @return the next message 157 */ 158 public Object nextElement() { 159 if( consumer == null ) 160 return null; 161 162 Object answer = null; 163 try { 164 answer = consumer.receiveNoWait(); 165 if( answer==null ) { 166 destroyConsumer(); 167 } 168 } 169 catch (JMSException e) { 170 e.printStackTrace(); 171 } 172 return answer; 173 } 174 175 public void close() throws JMSException { 176 destroyConsumer(); 177 closed=true; 178 } 179 180 /** 181 * Gets the queue associated with this queue browser. 182 * 183 * @return the queue 184 * @throws JMSException if the JMS provider fails to get the queue associated 185 * with this browser due to some internal error. 186 */ 187 188 public Queue getQueue() throws JMSException { 189 return destination; 190 } 191 192 193 public String getMessageSelector() throws JMSException { 194 return selector; 195 } 196 197 198 }