001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq; 019 020 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 021 022 import java.util.LinkedList; 023 024 import javax.jms.IllegalStateException; 025 import javax.jms.InvalidDestinationException; 026 import javax.jms.JMSException; 027 import javax.jms.Message; 028 import javax.jms.MessageConsumer; 029 import javax.jms.MessageListener; 030 031 import org.activemq.io.util.MemoryBoundedQueue; 032 import org.activemq.management.JMSConsumerStatsImpl; 033 import org.activemq.management.StatsCapable; 034 import org.activemq.management.StatsImpl; 035 import org.activemq.message.ActiveMQDestination; 036 import org.activemq.message.ActiveMQMessage; 037 import org.activemq.selector.SelectorParser; 038 import org.apache.commons.logging.Log; 039 import org.apache.commons.logging.LogFactory; 040 041 /** 042 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages from a destination. A <CODE> 043 * MessageConsumer</CODE> object is created by passing a <CODE>Destination</CODE> object to a message-consumer 044 * creation method supplied by a session. 045 * <P> 046 * <CODE>MessageConsumer</CODE> is the parent interface for all message consumers. 047 * <P> 048 * A message consumer can be created with a message selector. A message selector allows the client to restrict the 049 * messages delivered to the message consumer to those that match the selector. 050 * <P> 051 * A client may either synchronously receive a message consumer's messages or have the consumer asynchronously deliver 052 * them as they arrive. 053 * <P> 054 * For synchronous receipt, a client can request the next message from a message consumer using one of its <CODE> 055 * receive</CODE> methods. There are several variations of <CODE>receive</CODE> that allow a client to poll or wait 056 * for the next message. 057 * <P> 058 * For asynchronous delivery, a client can register a <CODE>MessageListener</CODE> object with a message consumer. As 059 * messages arrive at the message consumer, it delivers them by calling the <CODE>MessageListener</CODE>'s<CODE> 060 * onMessage</CODE> method. 061 * <P> 062 * It is a client programming error for a <CODE>MessageListener</CODE> to throw an exception. 063 * 064 * @version $Revision: 1.1.1.1 $ 065 * @see javax.jms.MessageConsumer 066 * @see javax.jms.QueueReceiver 067 * @see javax.jms.TopicSubscriber 068 * @see javax.jms.Session 069 */ 070 public class ActiveMQMessageConsumer implements MessageConsumer, StatsCapable, Closeable { 071 private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class); 072 protected ActiveMQSession session; 073 protected String consumerIdentifier; 074 protected MemoryBoundedQueue messageQueue; 075 protected String messageSelector; 076 private MessageListener messageListener; 077 protected String consumerName; 078 protected ActiveMQDestination destination; 079 private boolean closed; 080 protected int consumerNumber; 081 protected int prefetchNumber; 082 protected long startTime; 083 protected boolean noLocal; 084 protected boolean browser; 085 private Thread accessThread; 086 private Object messageListenerGuard; 087 private JMSConsumerStatsImpl stats; 088 089 private SynchronizedBoolean running = new SynchronizedBoolean(true); 090 private LinkedList stoppedQueue=new LinkedList(); 091 /** 092 * Create a MessageConsumer 093 * 094 * @param theSession 095 * @param dest 096 * @param name 097 * @param selector 098 * @param cnum 099 * @param prefetch 100 * @param noLocalValue 101 * @param browserValue 102 * @throws JMSException 103 */ 104 protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name, 105 String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue) throws JMSException { 106 if (dest == null) { 107 throw new InvalidDestinationException("Do not understand a null destination"); 108 } 109 if (dest.isTemporary() && theSession.connection.isJ2EEcompliant() && !theSession.isInternalSession()) { 110 //validate that the destination comes from this Connection 111 String physicalName = dest.getPhysicalName(); 112 if (physicalName == null) { 113 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); 114 } 115 String clientID = theSession.connection.getInitializedClientID(); 116 if (physicalName.indexOf(clientID) < 0) { 117 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); 118 } 119 if (dest.isDeleted()) { 120 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); 121 } 122 } 123 dest.incrementConsumerCounter(); 124 if (selector != null) { 125 selector = selector.trim(); 126 if (selector.length() > 0) { 127 // Validate that the selector 128 new SelectorParser().parse(selector); 129 } 130 } 131 this.session = theSession; 132 this.destination = dest; 133 this.consumerName = name; 134 this.messageSelector = selector; 135 136 this.consumerNumber = cnum; 137 this.prefetchNumber = prefetch; 138 this.noLocal = noLocalValue; 139 this.browser = browserValue; 140 this.consumerIdentifier = theSession.connection.getClientID() + "." + theSession.getSessionId() + "." + this.consumerNumber; 141 this.startTime = System.currentTimeMillis(); 142 this.messageListenerGuard = new Object(); 143 this.messageQueue = theSession.connection.getMemoryBoundedQueue(this.consumerIdentifier); 144 this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest); 145 this.session.addConsumer(this); 146 } 147 148 /** 149 * @return the memory used by the internal queue for this MessageConsumer 150 */ 151 public long getLocalMemoryUsage() { 152 return this.messageQueue.getLocalMemoryUsedByThisQueue(); 153 } 154 155 /** 156 * @return the number of messages enqueued by this consumer awaiting dispatch 157 */ 158 public int size() { 159 return this.messageQueue.size(); 160 } 161 162 163 /** 164 * @return Stats for this MessageConsumer 165 */ 166 public StatsImpl getStats() { 167 return stats; 168 } 169 170 /** 171 * @return Stats for this MessageConsumer 172 */ 173 public JMSConsumerStatsImpl getConsumerStats() { 174 return stats; 175 } 176 177 /** 178 * @return pretty print of this consumer 179 */ 180 public String toString() { 181 return "MessageConsumer: " + consumerIdentifier + "[" + consumerNumber + "]"; 182 } 183 184 /** 185 * @return Returns the prefetchNumber. 186 */ 187 public int getPrefetchNumber() { 188 return prefetchNumber; 189 } 190 191 /** 192 * @param prefetchNumber The prefetchNumber to set. 193 */ 194 public void setPrefetchNumber(int prefetchNumber) { 195 this.prefetchNumber = prefetchNumber; 196 } 197 198 /** 199 * Gets this message consumer's message selector expression. 200 * 201 * @return this message consumer's message selector, or null if no message selector exists for the message consumer 202 * (that is, if the message selector was not set or was set to null or the empty string) 203 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error. 204 */ 205 public String getMessageSelector() throws JMSException { 206 checkClosed(); 207 return this.messageSelector; 208 } 209 210 /** 211 * Gets the message consumer's <CODE>MessageListener</CODE>. 212 * 213 * @return the listener for the message consumer, or null if no listener is set 214 * @throws JMSException if the JMS provider fails to get the message listener due to some internal error. 215 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) 216 */ 217 public MessageListener getMessageListener() throws JMSException { 218 checkClosed(); 219 return this.messageListener; 220 } 221 222 /** 223 * Sets the message consumer's <CODE>MessageListener</CODE>. 224 * <P> 225 * Setting the message listener to null is the equivalent of unsetting the message listener for the message 226 * consumer. 227 * <P> 228 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> while messages are being consumed by an 229 * existing listener or the consumer is being used to consume messages synchronously is undefined. 230 * 231 * @param listener the listener to which the messages are to be delivered 232 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error. 233 * @see javax.jms.MessageConsumer#getMessageListener() 234 */ 235 public void setMessageListener(MessageListener listener) throws JMSException { 236 checkClosed(); 237 synchronized (messageListenerGuard) { 238 this.messageListener = listener; 239 } 240 if (listener != null) { 241 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_ASYNC); 242 //messages may already be enqueued 243 ActiveMQMessage msg = null; 244 try { 245 while ((msg = (ActiveMQMessage)messageQueue.dequeueNoWait()) != null) { 246 processMessage(msg); 247 } 248 } 249 catch (InterruptedException ex) { 250 JMSException jmsEx = new JMSException("Interrupted setting message listener"); 251 jmsEx.setLinkedException(ex); 252 throw jmsEx; 253 } 254 } 255 } 256 257 /** 258 * Receives the next message produced for this message consumer. 259 * <P> 260 * This call blocks indefinitely until a message is produced or until this message consumer is closed. 261 * <P> 262 * If this <CODE>receive</CODE> is done within a transaction, the consumer retains the message until the 263 * transaction commits. 264 * 265 * @return the next message produced for this message consumer, or null if this message consumer is concurrently 266 * closed 267 * @throws JMSException 268 */ 269 public Message receive() throws JMSException { 270 checkClosed(); 271 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC); 272 try { 273 this.accessThread = Thread.currentThread(); 274 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(); 275 this.accessThread = null; 276 if (message != null) { 277 boolean expired = message.isExpired(); 278 messageDelivered(message, true, expired); 279 if (!expired) { 280 message = message.shallowCopy(); 281 } 282 else { 283 message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue 284 } 285 } 286 if( message!=null && log.isDebugEnabled() ) { 287 log.debug("Message received: "+message); 288 } 289 return message; 290 } 291 catch (InterruptedException ioe) { 292 return null; 293 } 294 } 295 296 /** 297 * Receives the next message that arrives within the specified timeout interval. 298 * <P> 299 * This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A <CODE> 300 * timeout</CODE> of zero never expires, and the call blocks indefinitely. 301 * 302 * @param timeout the timeout value (in milliseconds) 303 * @return the next message produced for this message consumer, or null if the timeout expires or this message 304 * consumer is concurrently closed 305 * @throws JMSException 306 */ 307 public Message receive(long timeout) throws JMSException { 308 checkClosed(); 309 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC); 310 try { 311 if (timeout == 0) { 312 return this.receive(); 313 } 314 this.accessThread = Thread.currentThread(); 315 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout); 316 this.accessThread = null; 317 if (message != null) { 318 boolean expired = message.isExpired(); 319 messageDelivered(message, true, expired); 320 if (!expired) { 321 message = message.shallowCopy(); 322 } 323 else { 324 message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue 325 } 326 } 327 if( message!=null && log.isDebugEnabled() ) { 328 log.debug("Message received: "+message); 329 } 330 return message; 331 } 332 catch (InterruptedException ioe) { 333 return null; 334 } 335 } 336 337 /** 338 * Receives the next message if one is immediately available. 339 * 340 * @return the next message produced for this message consumer, or null if one is not available 341 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error. 342 */ 343 public Message receiveNoWait() throws JMSException { 344 checkClosed(); 345 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC); 346 try { 347 ActiveMQMessage message = null; 348 //iterate through an scrub delivered but expired messages 349 while ((message = (ActiveMQMessage) messageQueue.dequeueNoWait()) != null) { 350 boolean expired = message.isExpired(); 351 messageDelivered(message, true, expired); 352 if (!expired) { 353 if( message!=null && log.isDebugEnabled() ) { 354 log.debug("Message received: "+message); 355 } 356 return message.shallowCopy(); 357 } 358 } 359 } 360 catch (InterruptedException ioe) { 361 throw new JMSException("Queue is interrupted: " + ioe.getMessage()); 362 } 363 return null; 364 } 365 366 /** 367 * Closes the message consumer. 368 * <P> 369 * Since a provider may allocate some resources on behalf of a <CODE>MessageConsumer</CODE> outside the Java 370 * virtual machine, clients should close them when they are not needed. Relying on garbage collection to eventually 371 * reclaim these resources may not be timely enough. 372 * <P> 373 * This call blocks until a <CODE>receive</CODE> or message listener in progress has completed. A blocked message 374 * consumer <CODE>receive</CODE> call returns null when this message consumer is closed. 375 * 376 * @throws JMSException if the JMS provider fails to close the consumer due to some internal error. 377 */ 378 public void close() throws JMSException { 379 try { 380 this.accessThread.interrupt(); 381 } 382 catch (NullPointerException npe) { 383 } 384 catch (SecurityException se) { 385 } 386 if (destination != null) { 387 destination.decrementConsumerCounter(); 388 } 389 390 this.session.removeConsumer(this); 391 messageQueue.close(); 392 closed = true; 393 } 394 395 /** 396 * @return true if this is a durable topic subscriber 397 */ 398 public boolean isDurableSubscriber() { 399 return this instanceof ActiveMQTopicSubscriber && consumerName != null && consumerName.length() > 0; 400 } 401 402 /** 403 * @return true if this is a Transient Topic subscriber 404 */ 405 public boolean isTransientSubscriber(){ 406 return this.destination != null && destination.isTopic() && (consumerName == null || consumerName.length() ==0); 407 } 408 409 /** 410 * @throws IllegalStateException 411 */ 412 protected void checkClosed() throws IllegalStateException { 413 if (closed) { 414 throw new IllegalStateException("The Consumer is closed"); 415 } 416 } 417 418 /** 419 * Process a Message - passing either to the queue or message listener 420 * 421 * @param message 422 */ 423 protected void processMessage(ActiveMQMessage message) { 424 if( !running.get() ) { 425 stoppedQueue.addLast(message); 426 return; 427 } 428 message.setConsumerIdentifer(this.consumerIdentifier); 429 MessageListener listener = null; 430 synchronized (messageListenerGuard) { 431 listener = this.messageListener; 432 } 433 boolean transacted = session.isTransacted(); 434 try { 435 if (!closed) { 436 if (message.getJMSActiveMQDestination() == null) { 437 message.setJMSDestination(getDestination()); 438 } 439 if (listener != null) { 440 beforeMessageDelivered(message); 441 boolean expired = message.isExpired(); 442 if (transacted) { 443 afterMessageDelivered(message, true, expired, true); 444 } 445 if (!expired) { 446 if( log.isDebugEnabled() ) { 447 log.debug("Message delivered to message listener: "+message); 448 } 449 listener.onMessage(message.shallowCopy()); 450 } 451 if (!transacted) { 452 afterMessageDelivered(message, true, expired, true); 453 } 454 } 455 else { 456 this.messageQueue.enqueue(message); 457 } 458 } 459 else { 460 messageDelivered(message, false, false); 461 } 462 } 463 catch (Throwable e) { 464 log.warn("could not process message: " + message + ". Reason: " + e, e); 465 messageDelivered(message, false, false); 466 } 467 } 468 469 /** 470 * @return Returns the consumerId. 471 */ 472 protected String getConsumerIdentifier() { 473 return consumerIdentifier; 474 } 475 476 /** 477 * @return the consumer name - used for durable consumers 478 */ 479 protected String getConsumerName() { 480 return this.consumerName; 481 } 482 483 /** 484 * Set the name of the Consumer - used for durable subscribers 485 * 486 * @param value 487 */ 488 protected void setConsumerName(String value) { 489 this.consumerName = value; 490 } 491 492 /** 493 * @return the locally unique Consumer Number 494 */ 495 protected int getConsumerNumber() { 496 return this.consumerNumber; 497 } 498 499 /** 500 * Set the locally unique consumer number 501 * 502 * @param value 503 */ 504 protected void setConsumerNumber(int value) { 505 this.consumerNumber = value; 506 } 507 508 /** 509 * @return true if this consumer does not accept locally produced messages 510 */ 511 protected boolean isNoLocal() { 512 return this.noLocal; 513 } 514 515 /** 516 * Retrive is a browser 517 * 518 * @return true if a browser 519 */ 520 protected boolean isBrowser() { 521 return this.browser; 522 } 523 524 /** 525 * Set true if only a Browser 526 * 527 * @param value 528 * @see ActiveMQQueueBrowser 529 */ 530 protected void setBrowser(boolean value) { 531 this.browser = value; 532 } 533 534 /** 535 * @return ActiveMQDestination 536 */ 537 protected ActiveMQDestination getDestination() { 538 return this.destination; 539 } 540 541 /** 542 * @return the startTime 543 */ 544 protected long getStartTime() { 545 return startTime; 546 } 547 548 protected void clearMessagesInProgress() { 549 messageQueue.clear(); 550 stoppedQueue.clear(); 551 } 552 553 private void messageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired) { 554 afterMessageDelivered(message, messageRead, messageExpired, false); 555 } 556 557 private void beforeMessageDelivered(ActiveMQMessage message) { 558 if (message == null) { 559 return; 560 } 561 boolean topic = destination != null && destination.isTopic(); 562 message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic); 563 this.session.beforeMessageDelivered(message); 564 } 565 566 private void afterMessageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired, boolean beforeCalled) { 567 if (message == null) { 568 return; 569 } 570 571 boolean consumed = browser ? false : messageRead; 572 ActiveMQDestination destination = message.getJMSActiveMQDestination(); 573 boolean topic = destination != null && destination.isTopic(); 574 message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic); 575 this.session.afterMessageDelivered((isDurableSubscriber() || this.destination.isQueue()), message, consumed, messageExpired, beforeCalled); 576 if (messageRead) { 577 stats.onMessage(message); 578 } 579 580 } 581 582 public void start() { 583 running.set(true); 584 while( !stoppedQueue.isEmpty() ) { 585 ActiveMQMessage m = (ActiveMQMessage)stoppedQueue.removeFirst(); 586 processMessage(m); 587 } 588 } 589 590 synchronized public void stop() { 591 running.set(false); 592 } 593 }