001    package com.mockrunner.mock.jms;
002    
003    import javax.jms.InvalidSelectorException;
004    import javax.jms.JMSException;
005    import javax.jms.Message;
006    import javax.jms.MessageConsumer;
007    import javax.jms.MessageListener;
008    
009    import org.activemq.filter.mockrunner.Filter;
010    import org.activemq.selector.mockrunner.SelectorParser;
011    
012    /**
013     * Mock implementation of JMS <code>MessageConsumer</code>.
014     */
015    public abstract class MockMessageConsumer implements MessageConsumer
016    {
017        private MockConnection connection;
018        private String messageSelector;
019        private Filter messageSelectorFilter;
020        private boolean closed;
021        private MessageListener messageListener;
022            
023        public MockMessageConsumer(MockConnection connection, String messageSelector)
024        {
025            this.connection = connection;
026            this.messageSelector = messageSelector;
027            parseMessageSelector();
028            closed = false;
029            messageListener = null;
030        }
031    
032        private void parseMessageSelector()
033        {
034            if(null == messageSelector || messageSelector.length() == 0)
035            {
036                this.messageSelectorFilter = null;
037            }
038            else
039            {
040                try
041                {
042                    this.messageSelectorFilter = new SelectorParser().parse(messageSelector);
043                }
044                catch(InvalidSelectorException exc)
045                {
046                    throw new RuntimeException("Error parsing message selector: " + exc.getMessage());
047                }
048            }
049        }
050    
051        /**
052         * Returns if this consumer was closed.
053         * @return <code>true</code> if this consumer is closed
054         */
055        public boolean isClosed()
056        {
057            return closed;
058        }
059        
060        /**
061         * Returns if this consumer can consume an incoming message,
062         * i.e. if a <code>MessageListener</code> is registered,
063         * the receiver isn't closed and has an approriate selector.
064         * @return <code>true</code> if this receiver can consume the message
065         */
066        public boolean canConsume(Message message)
067        {
068            if(messageListener == null) return false;
069            if(isClosed()) return false;
070            return matchesMessageSelector(message);
071        }
072    
073        
074        /**
075         * Adds a message that is immediately propagated to the
076         * message listener. If there's no message listener,
077         * nothing happens.
078         * @param message the message
079         */
080        public void receiveMessage(Message message)
081        {
082            if(null == messageListener) return;
083            messageListener.onMessage(message);
084        }
085    
086        public String getMessageSelector() throws JMSException
087        {
088            connection.throwJMSException();
089            return messageSelector;
090        }
091    
092        public MessageListener getMessageListener() throws JMSException
093        {
094            connection.throwJMSException();
095            return messageListener;
096        }
097    
098        public void setMessageListener(MessageListener messageListener) throws JMSException
099        {
100            connection.throwJMSException();
101            this.messageListener = messageListener;
102        }
103    
104        public Message receive(long timeout) throws JMSException
105        {
106            connection.throwJMSException();
107            return receive();
108        }
109    
110        public Message receiveNoWait() throws JMSException
111        {
112            connection.throwJMSException();
113            return receive();
114        }
115    
116        public void close() throws JMSException
117        {
118            connection.throwJMSException();
119            closed = true;
120        }
121        
122        private boolean matchesMessageSelector(Message message)
123        {
124            if(!connection.getConfigurationManager().getUseMessageSelectors()) return true;
125            if(messageSelectorFilter == null) return true;
126            try
127            {
128                return messageSelectorFilter.matches(message);
129            }
130            catch(JMSException exc)
131            {
132                throw new RuntimeException(exc.getMessage());
133            }
134        }
135        
136        protected Filter getMessageFilter()
137        {
138            return messageSelectorFilter;
139        }
140        
141        protected MockConnection getConnection()
142        {
143            return connection;
144        }
145    }