001 package com.mockrunner.mock.jms; 002 003 import javax.jms.InvalidSelectorException; 004 import javax.jms.JMSException; 005 import javax.jms.Message; 006 import javax.jms.MessageConsumer; 007 import javax.jms.MessageListener; 008 009 import org.activemq.filter.mockrunner.Filter; 010 import org.activemq.selector.mockrunner.SelectorParser; 011 012 /** 013 * Mock implementation of JMS <code>MessageConsumer</code>. 014 */ 015 public abstract class MockMessageConsumer implements MessageConsumer 016 { 017 private MockConnection connection; 018 private String messageSelector; 019 private Filter messageSelectorFilter; 020 private boolean closed; 021 private MessageListener messageListener; 022 023 public MockMessageConsumer(MockConnection connection, String messageSelector) 024 { 025 this.connection = connection; 026 this.messageSelector = messageSelector; 027 parseMessageSelector(); 028 closed = false; 029 messageListener = null; 030 } 031 032 private void parseMessageSelector() 033 { 034 if(null == messageSelector || messageSelector.length() == 0) 035 { 036 this.messageSelectorFilter = null; 037 } 038 else 039 { 040 try 041 { 042 this.messageSelectorFilter = new SelectorParser().parse(messageSelector); 043 } 044 catch(InvalidSelectorException exc) 045 { 046 throw new RuntimeException("Error parsing message selector: " + exc.getMessage()); 047 } 048 } 049 } 050 051 /** 052 * Returns if this consumer was closed. 053 * @return <code>true</code> if this consumer is closed 054 */ 055 public boolean isClosed() 056 { 057 return closed; 058 } 059 060 /** 061 * Returns if this consumer can consume an incoming message, 062 * i.e. if a <code>MessageListener</code> is registered, 063 * the receiver isn't closed and has an approriate selector. 064 * @return <code>true</code> if this receiver can consume the message 065 */ 066 public boolean canConsume(Message message) 067 { 068 if(messageListener == null) return false; 069 if(isClosed()) return false; 070 return matchesMessageSelector(message); 071 } 072 073 074 /** 075 * Adds a message that is immediately propagated to the 076 * message listener. If there's no message listener, 077 * nothing happens. 078 * @param message the message 079 */ 080 public void receiveMessage(Message message) 081 { 082 if(null == messageListener) return; 083 messageListener.onMessage(message); 084 } 085 086 public String getMessageSelector() throws JMSException 087 { 088 connection.throwJMSException(); 089 return messageSelector; 090 } 091 092 public MessageListener getMessageListener() throws JMSException 093 { 094 connection.throwJMSException(); 095 return messageListener; 096 } 097 098 public void setMessageListener(MessageListener messageListener) throws JMSException 099 { 100 connection.throwJMSException(); 101 this.messageListener = messageListener; 102 } 103 104 public Message receive(long timeout) throws JMSException 105 { 106 connection.throwJMSException(); 107 return receive(); 108 } 109 110 public Message receiveNoWait() throws JMSException 111 { 112 connection.throwJMSException(); 113 return receive(); 114 } 115 116 public void close() throws JMSException 117 { 118 connection.throwJMSException(); 119 closed = true; 120 } 121 122 private boolean matchesMessageSelector(Message message) 123 { 124 if(!connection.getConfigurationManager().getUseMessageSelectors()) return true; 125 if(messageSelectorFilter == null) return true; 126 try 127 { 128 return messageSelectorFilter.matches(message); 129 } 130 catch(JMSException exc) 131 { 132 throw new RuntimeException(exc.getMessage()); 133 } 134 } 135 136 protected Filter getMessageFilter() 137 { 138 return messageSelectorFilter; 139 } 140 141 protected MockConnection getConnection() 142 { 143 return connection; 144 } 145 }