001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.service.boundedvm; 020 import java.util.List; 021 022 import javax.jms.DeliveryMode; 023 import javax.jms.JMSException; 024 025 import org.activemq.broker.BrokerClient; 026 import org.activemq.filter.Filter; 027 import org.activemq.io.util.MemoryBoundedQueue; 028 import org.activemq.message.ActiveMQMessage; 029 import org.activemq.message.ConsumerInfo; 030 031 /** 032 * A holder for Transient Queue consumer info and message routing 033 * 034 * @version $Revision: 1.1.1.1 $ 035 */ 036 public class TransientQueueSubscription extends TransientSubscription { 037 038 private MemoryBoundedQueue dispatchedQueue; 039 private MemoryBoundedQueue ackedQueue; // Where messages go that are acked in a transaction 040 041 /** 042 * Construct the TransientQueueSubscription 043 * 044 * @param client 045 * @param dispatchedQueue 046 * @param ackQueue 047 * @param filter 048 * @param info 049 */ 050 public TransientQueueSubscription(BrokerClient client, MemoryBoundedQueue dispatchedQueue, MemoryBoundedQueue ackQueue, Filter filter, 051 ConsumerInfo info) { 052 super(filter, info, client); 053 this.dispatchedQueue = dispatchedQueue; 054 this.ackedQueue = ackQueue; 055 } 056 057 /** 058 * determines if the Subscription is interested in the message 059 * 060 * @param message 061 * @return true if this Subscription will accept the message 062 * @throws JMSException 063 */ 064 public boolean isTarget(ActiveMQMessage message) throws JMSException { 065 boolean result = false; 066 if (message != null) { 067 //make sure we don't loop messages around the cluster 068 if (!client.isClusteredConnection() || !message.isEntryCluster(clusterName) 069 || message.isEntryBroker(brokerName)) { 070 result = filter.matches(message) 071 && (message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT || consumerInfo 072 .getDestination().isTemporary()); 073 } 074 } 075 return result; 076 } 077 078 /** 079 * @return true if the consumer has capacity for more messages 080 */ 081 public boolean canAcceptMessages() { 082 return dispatchedQueue.size() <= consumerInfo.getPrefetchNumber(); 083 } 084 085 /** 086 * Dispatch a message to the Consumer 087 * 088 * @param message 089 * @throws JMSException 090 */ 091 public void doDispatch(ActiveMQMessage message) throws JMSException { 092 dispatchedQueue.enqueueNoBlock(message); 093 message = message.shallowCopy(); 094 message.setConsumerNos(new int[]{consumerInfo.getConsumerNo()}); 095 client.dispatch(message); 096 } 097 098 099 /** 100 * Acknowledge the receipt of a message by a consumer 101 * 102 * @param id 103 * @return the removed ActiveMQMessage with the associated id 104 */ 105 public ActiveMQMessage acknowledgeMessage(String id) { 106 ActiveMQMessage msg = (ActiveMQMessage) dispatchedQueue.remove(id); 107 return msg; 108 } 109 110 /** 111 * @return all the unacknowledge messages 112 */ 113 public List getUndeliveredMessages() { 114 return dispatchedQueue.getContents(); 115 } 116 117 /** 118 * close the subscription 119 */ 120 public void close() { 121 super.close(); 122 dispatchedQueue.close(); 123 ackedQueue.close(); 124 } 125 126 /** 127 * Add an acked message. 128 */ 129 public boolean hasAckedMessage() { 130 return !ackedQueue.isEmpty(); 131 } 132 133 /** 134 * Add an acked message. 135 * 136 * @throws InterruptedException 137 */ 138 public void addAckedMessage(ActiveMQMessage message) { 139 ackedQueue.enqueueNoBlock(message); 140 } 141 142 /** 143 * Get a list of all the acked messages 144 */ 145 public List listAckedMessages() { 146 return ackedQueue.getContents(); 147 } 148 149 /** 150 * Add an acked message. 151 */ 152 public void removeAllAckedMessages() { 153 ackedQueue.clear(); 154 } 155 156 public boolean isBrowser() { 157 return consumerInfo.isBrowser(); 158 } 159 }