001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq;
019    
020    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
021    
022    import java.util.LinkedList;
023    
024    import javax.jms.IllegalStateException;
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    import javax.jms.Message;
028    import javax.jms.MessageConsumer;
029    import javax.jms.MessageListener;
030    
031    import org.activemq.io.util.MemoryBoundedQueue;
032    import org.activemq.management.JMSConsumerStatsImpl;
033    import org.activemq.management.StatsCapable;
034    import org.activemq.management.StatsImpl;
035    import org.activemq.message.ActiveMQDestination;
036    import org.activemq.message.ActiveMQMessage;
037    import org.activemq.selector.SelectorParser;
038    import org.apache.commons.logging.Log;
039    import org.apache.commons.logging.LogFactory;
040    
041    /**
042     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages from a destination. A <CODE>
043     * MessageConsumer</CODE> object is created by passing a <CODE>Destination</CODE> object to a message-consumer
044     * creation method supplied by a session.
045     * <P>
046     * <CODE>MessageConsumer</CODE> is the parent interface for all message consumers.
047     * <P>
048     * A message consumer can be created with a message selector. A message selector allows the client to restrict the
049     * messages delivered to the message consumer to those that match the selector.
050     * <P>
051     * A client may either synchronously receive a message consumer's messages or have the consumer asynchronously deliver
052     * them as they arrive.
053     * <P>
054     * For synchronous receipt, a client can request the next message from a message consumer using one of its <CODE>
055     * receive</CODE> methods. There are several variations of <CODE>receive</CODE> that allow a client to poll or wait
056     * for the next message.
057     * <P>
058     * For asynchronous delivery, a client can register a <CODE>MessageListener</CODE> object with a message consumer. As
059     * messages arrive at the message consumer, it delivers them by calling the <CODE>MessageListener</CODE>'s<CODE>
060     * onMessage</CODE> method.
061     * <P>
062     * It is a client programming error for a <CODE>MessageListener</CODE> to throw an exception.
063     *
064     * @version $Revision: 1.1.1.1 $
065     * @see javax.jms.MessageConsumer
066     * @see javax.jms.QueueReceiver
067     * @see javax.jms.TopicSubscriber
068     * @see javax.jms.Session
069     */
070    public class ActiveMQMessageConsumer implements MessageConsumer, StatsCapable, Closeable {
071        private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class);
072        protected ActiveMQSession session;
073        protected String consumerIdentifier;
074        protected MemoryBoundedQueue messageQueue;
075        protected String messageSelector;
076        private MessageListener messageListener;
077        protected String consumerName;
078        protected ActiveMQDestination destination;
079        private boolean closed;
080        protected int consumerNumber;
081        protected int prefetchNumber;
082        protected long startTime;
083        protected boolean noLocal;
084        protected boolean browser;
085        private Thread accessThread;
086        private Object messageListenerGuard;
087        private JMSConsumerStatsImpl stats;
088        
089        private SynchronizedBoolean running = new SynchronizedBoolean(true);
090        private LinkedList stoppedQueue=new LinkedList(); 
091        /**
092         * Create a MessageConsumer
093         *
094         * @param theSession
095         * @param dest
096         * @param name
097         * @param selector
098         * @param cnum
099         * @param prefetch
100         * @param noLocalValue
101         * @param browserValue
102         * @throws JMSException
103         */
104        protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name,
105                                          String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue) throws JMSException {
106            if (dest == null) {
107                throw new InvalidDestinationException("Do not understand a null destination");
108            }
109            if (dest.isTemporary() && theSession.connection.isJ2EEcompliant() && !theSession.isInternalSession()) {
110                //validate that the destination comes from this Connection
111                String physicalName = dest.getPhysicalName();
112                if (physicalName == null) {
113                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
114                }
115                String clientID = theSession.connection.getInitializedClientID();
116                if (physicalName.indexOf(clientID) < 0) {
117                    throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
118                }
119                if (dest.isDeleted()) {
120                    throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
121                }
122            }
123            dest.incrementConsumerCounter();
124            if (selector != null) {
125                selector = selector.trim();
126                if (selector.length() > 0) {
127                    // Validate that the selector
128                    new SelectorParser().parse(selector);
129                }
130            }
131            this.session = theSession;
132            this.destination = dest;
133            this.consumerName = name;
134            this.messageSelector = selector;
135    
136            this.consumerNumber = cnum;
137            this.prefetchNumber = prefetch;
138            this.noLocal = noLocalValue;
139            this.browser = browserValue;
140            this.consumerIdentifier = theSession.connection.getClientID() + "." + theSession.getSessionId() + "." + this.consumerNumber;
141            this.startTime = System.currentTimeMillis();
142            this.messageListenerGuard = new Object();
143            this.messageQueue = theSession.connection.getMemoryBoundedQueue(this.consumerIdentifier);
144            this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest);
145            this.session.addConsumer(this);
146        }
147    
148        /**
149         * @return the memory used by the internal queue for this MessageConsumer
150         */
151        public long getLocalMemoryUsage() {
152            return this.messageQueue.getLocalMemoryUsedByThisQueue();
153        }
154    
155        /**
156         * @return the number of messages enqueued by this consumer awaiting dispatch
157         */
158        public int size() {
159            return this.messageQueue.size();
160        }
161    
162    
163        /**
164         * @return Stats for this MessageConsumer
165         */
166        public StatsImpl getStats() {
167            return stats;
168        }
169    
170        /**
171         * @return Stats for this MessageConsumer
172         */
173        public JMSConsumerStatsImpl getConsumerStats() {
174            return stats;
175        }
176    
177        /**
178         * @return pretty print of this consumer
179         */
180        public String toString() {
181            return "MessageConsumer: " + consumerIdentifier + "[" + consumerNumber + "]";
182        }
183    
184        /**
185         * @return Returns the prefetchNumber.
186         */
187        public int getPrefetchNumber() {
188            return prefetchNumber;
189        }
190    
191        /**
192         * @param prefetchNumber The prefetchNumber to set.
193         */
194        public void setPrefetchNumber(int prefetchNumber) {
195            this.prefetchNumber = prefetchNumber;
196        }
197    
198        /**
199         * Gets this message consumer's message selector expression.
200         *
201         * @return this message consumer's message selector, or null if no message selector exists for the message consumer
202         *         (that is, if the message selector was not set or was set to null or the empty string)
203         * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
204         */
205        public String getMessageSelector() throws JMSException {
206            checkClosed();
207            return this.messageSelector;
208        }
209    
210        /**
211         * Gets the message consumer's <CODE>MessageListener</CODE>.
212         *
213         * @return the listener for the message consumer, or null if no listener is set
214         * @throws JMSException if the JMS provider fails to get the message listener due to some internal error.
215         * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
216         */
217        public MessageListener getMessageListener() throws JMSException {
218            checkClosed();
219            return this.messageListener;
220        }
221    
222        /**
223         * Sets the message consumer's <CODE>MessageListener</CODE>.
224         * <P>
225         * Setting the message listener to null is the equivalent of unsetting the message listener for the message
226         * consumer.
227         * <P>
228         * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> while messages are being consumed by an
229         * existing listener or the consumer is being used to consume messages synchronously is undefined.
230         *
231         * @param listener the listener to which the messages are to be delivered
232         * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
233         * @see javax.jms.MessageConsumer#getMessageListener()
234         */
235        public void setMessageListener(MessageListener listener) throws JMSException {
236            checkClosed();
237            synchronized (messageListenerGuard) {
238                this.messageListener = listener;
239            }
240            if (listener != null) {
241                session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_ASYNC);
242                //messages may already be enqueued
243                ActiveMQMessage msg = null;
244                try {
245                    while ((msg = (ActiveMQMessage)messageQueue.dequeueNoWait()) != null) {
246                        processMessage(msg);
247                    }
248                }
249                catch (InterruptedException ex) {
250                    JMSException jmsEx = new JMSException("Interrupted setting message listener");
251                    jmsEx.setLinkedException(ex);
252                    throw jmsEx;
253                }
254            }
255        }
256    
257        /**
258         * Receives the next message produced for this message consumer.
259         * <P>
260         * This call blocks indefinitely until a message is produced or until this message consumer is closed.
261         * <P>
262         * If this <CODE>receive</CODE> is done within a transaction, the consumer retains the message until the
263         * transaction commits.
264         *
265         * @return the next message produced for this message consumer, or null if this message consumer is concurrently
266         *         closed
267         * @throws JMSException
268         */
269        public Message receive() throws JMSException {
270            checkClosed();
271            session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
272            try {
273                this.accessThread = Thread.currentThread();
274                ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue();
275                this.accessThread = null;
276                if (message != null) {
277                    boolean expired = message.isExpired();
278                    messageDelivered(message, true, expired);
279                    if (!expired) {
280                        message = message.shallowCopy();
281                    }
282                    else {
283                        message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue
284                    }
285                }
286                if( message!=null && log.isDebugEnabled() ) {
287                    log.debug("Message received: "+message);
288                }            
289                return message;
290            }
291            catch (InterruptedException ioe) {
292                return null;
293            }
294        }
295    
296        /**
297         * Receives the next message that arrives within the specified timeout interval.
298         * <P>
299         * This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A <CODE>
300         * timeout</CODE> of zero never expires, and the call blocks indefinitely.
301         *
302         * @param timeout the timeout value (in milliseconds)
303         * @return the next message produced for this message consumer, or null if the timeout expires or this message
304         *         consumer is concurrently closed
305         * @throws JMSException
306         */
307        public Message receive(long timeout) throws JMSException {
308            checkClosed();
309            session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
310            try {
311                if (timeout == 0) {
312                    return this.receive();
313                }
314                this.accessThread = Thread.currentThread();
315                ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
316                this.accessThread = null;
317                if (message != null) {
318                    boolean expired = message.isExpired();
319                    messageDelivered(message, true, expired);
320                    if (!expired) {
321                        message = message.shallowCopy();
322                    }
323                    else {
324                        message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue
325                    }
326                }
327                if( message!=null && log.isDebugEnabled() ) {
328                    log.debug("Message received: "+message);
329                }            
330                return message;
331            }
332            catch (InterruptedException ioe) {
333                return null;
334            }
335        }
336    
337        /**
338         * Receives the next message if one is immediately available.
339         *
340         * @return the next message produced for this message consumer, or null if one is not available
341         * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
342         */
343        public Message receiveNoWait() throws JMSException {
344            checkClosed();
345            session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
346            try {
347                ActiveMQMessage message = null;
348                //iterate through an scrub delivered but expired messages
349                while ((message = (ActiveMQMessage) messageQueue.dequeueNoWait()) != null) {
350                    boolean expired = message.isExpired();
351                    messageDelivered(message, true, expired);
352                    if (!expired) {
353                        if( message!=null && log.isDebugEnabled() ) {
354                            log.debug("Message received: "+message);
355                        }            
356                        return message.shallowCopy();
357                    }
358                }
359            }
360            catch (InterruptedException ioe) {
361                throw new JMSException("Queue is interrupted: " + ioe.getMessage());
362            }
363            return null;
364        }
365    
366        /**
367         * Closes the message consumer.
368         * <P>
369         * Since a provider may allocate some resources on behalf of a <CODE>MessageConsumer</CODE> outside the Java
370         * virtual machine, clients should close them when they are not needed. Relying on garbage collection to eventually
371         * reclaim these resources may not be timely enough.
372         * <P>
373         * This call blocks until a <CODE>receive</CODE> or message listener in progress has completed. A blocked message
374         * consumer <CODE>receive</CODE> call returns null when this message consumer is closed.
375         *
376         * @throws JMSException if the JMS provider fails to close the consumer due to some internal error.
377         */
378        public void close() throws JMSException {
379            try {
380                this.accessThread.interrupt();
381            }
382            catch (NullPointerException npe) {
383            }
384            catch (SecurityException se) {
385            }
386            if (destination != null) {
387                destination.decrementConsumerCounter();
388            }
389    
390            this.session.removeConsumer(this);
391            messageQueue.close();
392            closed = true;
393        }
394    
395        /**
396         * @return true if this is a durable topic subscriber
397         */
398        public boolean isDurableSubscriber() {
399            return this instanceof ActiveMQTopicSubscriber && consumerName != null && consumerName.length() > 0;
400        }
401        
402        /**
403         * @return true if this is a Transient Topic subscriber
404         */
405        public boolean isTransientSubscriber(){
406            return this.destination != null && destination.isTopic() && (consumerName == null || consumerName.length() ==0);
407        }
408    
409        /**
410         * @throws IllegalStateException
411         */
412        protected void checkClosed() throws IllegalStateException {
413            if (closed) {
414                throw new IllegalStateException("The Consumer is closed");
415            }
416        }
417    
418        /**
419         * Process a Message - passing either to the queue or message listener
420         *
421         * @param message
422         */
423        protected void processMessage(ActiveMQMessage message) {
424            if( !running.get() ) {
425                stoppedQueue.addLast(message);
426                return;
427            }
428            message.setConsumerIdentifer(this.consumerIdentifier);
429            MessageListener listener = null;
430            synchronized (messageListenerGuard) {
431                listener = this.messageListener;
432            }
433            boolean transacted = session.isTransacted();
434            try {
435                if (!closed) {
436                    if (message.getJMSActiveMQDestination() == null) {
437                        message.setJMSDestination(getDestination());
438                    }
439                    if (listener != null) {
440                        beforeMessageDelivered(message);
441                        boolean expired = message.isExpired();
442                        if (transacted) {
443                            afterMessageDelivered(message, true, expired, true);
444                        }
445                        if (!expired) {
446                            if( log.isDebugEnabled() ) {
447                                log.debug("Message delivered to message listener: "+message);
448                            }
449                            listener.onMessage(message.shallowCopy());
450                        }
451                        if (!transacted) {
452                            afterMessageDelivered(message, true, expired, true);
453                        }
454                    }
455                    else {
456                        this.messageQueue.enqueue(message);
457                    }
458                }
459                else {
460                    messageDelivered(message, false, false);
461                }
462            }
463            catch (Throwable e) {
464                log.warn("could not process message: " + message + ". Reason: " + e, e);
465                messageDelivered(message, false, false);
466            }
467        }
468    
469        /**
470         * @return Returns the consumerId.
471         */
472        protected String getConsumerIdentifier() {
473            return consumerIdentifier;
474        }
475    
476        /**
477         * @return the consumer name - used for durable consumers
478         */
479        protected String getConsumerName() {
480            return this.consumerName;
481        }
482    
483        /**
484         * Set the name of the Consumer - used for durable subscribers
485         *
486         * @param value
487         */
488        protected void setConsumerName(String value) {
489            this.consumerName = value;
490        }
491    
492        /**
493         * @return the locally unique Consumer Number
494         */
495        protected int getConsumerNumber() {
496            return this.consumerNumber;
497        }
498    
499        /**
500         * Set the locally unique consumer number
501         *
502         * @param value
503         */
504        protected void setConsumerNumber(int value) {
505            this.consumerNumber = value;
506        }
507    
508        /**
509         * @return true if this consumer does not accept locally produced messages
510         */
511        protected boolean isNoLocal() {
512            return this.noLocal;
513        }
514    
515        /**
516         * Retrive is a browser
517         *
518         * @return true if a browser
519         */
520        protected boolean isBrowser() {
521            return this.browser;
522        }
523    
524        /**
525         * Set true if only a Browser
526         *
527         * @param value
528         * @see ActiveMQQueueBrowser
529         */
530        protected void setBrowser(boolean value) {
531            this.browser = value;
532        }
533    
534        /**
535         * @return ActiveMQDestination
536         */
537        protected ActiveMQDestination getDestination() {
538            return this.destination;
539        }
540    
541        /**
542         * @return the startTime
543         */
544        protected long getStartTime() {
545            return startTime;
546        }
547    
548        protected void clearMessagesInProgress() {
549            messageQueue.clear();
550            stoppedQueue.clear();
551        }
552    
553        private void messageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired) {
554            afterMessageDelivered(message, messageRead, messageExpired, false);
555        }
556    
557        private void beforeMessageDelivered(ActiveMQMessage message) {
558            if (message == null) {
559                return;
560            }
561            boolean topic = destination != null && destination.isTopic();
562            message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic);
563            this.session.beforeMessageDelivered(message);
564        }
565    
566        private void afterMessageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired, boolean beforeCalled) {
567            if (message == null) {
568                return;
569            }
570    
571            boolean consumed = browser ? false : messageRead;
572            ActiveMQDestination destination = message.getJMSActiveMQDestination();
573            boolean topic = destination != null && destination.isTopic();
574            message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic);
575            this.session.afterMessageDelivered((isDurableSubscriber() || this.destination.isQueue()), message, consumed, messageExpired, beforeCalled);
576            if (messageRead) {
577                stats.onMessage(message);
578            }
579    
580        }
581    
582        public void start() {
583            running.set(true);
584            while( !stoppedQueue.isEmpty() ) {
585                ActiveMQMessage m = (ActiveMQMessage)stoppedQueue.removeFirst();
586                processMessage(m);
587            }
588        }
589    
590        synchronized public void stop() {
591            running.set(false);
592        }
593    }