001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * Copyright 2005 Hiram Chirino
005     * 
006     * Licensed under the Apache License, Version 2.0 (the "License"); 
007     * you may not use this file except in compliance with the License. 
008     * You may obtain a copy of the License at 
009     * 
010     * http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS, 
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
015     * See the License for the specific language governing permissions and 
016     * limitations under the License. 
017     * 
018     **/
019    
020    package org.activemq.service.boundedvm;
021    import java.util.List;
022    
023    import javax.jms.JMSException;
024    
025    import org.activemq.broker.BrokerClient;
026    import org.activemq.filter.Filter;
027    import org.activemq.io.util.MemoryBoundedQueue;
028    import org.activemq.message.ActiveMQMessage;
029    import org.activemq.message.ConsumerInfo;
030    
031    /**
032     * A holder for Durable Queue consumer info and message routing
033     * 
034     * @version $Revision: 1.1.1.1 $
035     */
036    public class DurableQueueSubscription extends DurableSubscription {
037    
038        private MemoryBoundedQueue dispatchedQueue;
039        private MemoryBoundedQueue ackedQueue; // Where messages go that are acked in a transaction
040    
041        /**
042         * Construct the DurableQueueSubscription
043         * 
044         * @param client
045         * @param dispatchedQueue
046         * @param ackQueue 
047         * @param filter
048         * @param info
049         */
050        public DurableQueueSubscription(BrokerClient client, MemoryBoundedQueue dispatchedQueue, MemoryBoundedQueue ackQueue, Filter filter,
051                ConsumerInfo info) {
052            super(filter, info, client);
053            this.dispatchedQueue = dispatchedQueue;
054                    this.ackedQueue = ackQueue;
055        }
056    
057        /**
058         * determines if the Subscription is interested in the message
059         * 
060         * @param message
061         * @return true if this Subscription will accept the message
062         * @throws JMSException
063         */
064        public boolean isTarget(ActiveMQMessage message) throws JMSException {
065            boolean result = false;
066            if (message != null) {
067                //make sure we don't loop messages around the cluster
068                if (!client.isClusteredConnection() || !message.isEntryCluster(clusterName)
069                        || message.isEntryBroker(brokerName)) {
070                    result = filter.matches(message);
071                }
072            }
073            return result;
074        }
075    
076        /**
077         * @return true if the consumer has capacity for more messages
078         */
079        public boolean canAcceptMessages() {
080            return dispatchedQueue.size() <= consumerInfo.getPrefetchNumber();
081        }
082    
083        /**
084         * Dispatch a message to the Consumer
085         * 
086         * @param message
087         * @throws JMSException
088         */
089        public void doDispatch(DurableMessagePointer message) throws JMSException {
090            dispatchedQueue.enqueue(message);
091            ActiveMQMessage msg = message.getMessage().shallowCopy();
092            msg.setConsumerNos(new int[]{consumerInfo.getConsumerNo()});
093            client.dispatch(msg);
094        }
095            
096        
097        /**
098         * Acknowledge the receipt of a message by a consumer
099         * 
100         * @param id
101         * @return the removed ActiveMQMessage with the associated id
102         */
103        public DurableMessagePointer acknowledgeMessage(String id) {
104            return (DurableMessagePointer) dispatchedQueue.remove(id);
105        }
106    
107        /**
108         * @return all the unacknowledge messages
109         */
110        public List getUndeliveredMessages() {
111            return dispatchedQueue.getContents();
112        }
113    
114        /**
115         * close the subscription
116         */
117        public void close() {
118            super.close();
119            dispatchedQueue.close();
120            ackedQueue.close();
121        }
122            
123            /**
124         * @return true if acked a message
125         */
126        public boolean hasAckedMessage() {
127            return !ackedQueue.isEmpty();
128        }
129    
130        /**
131         * Add an acked message.
132         * @param message 
133         */
134        public void addAckedMessage(DurableMessagePointer message) {
135            ackedQueue.enqueueNoBlock(message);
136        }
137    
138        /**
139         * @return a list of all the acked messages
140         */
141        public List listAckedMessages() {
142            return ackedQueue.getContents();
143        }
144    
145        /**
146         * Add an acked message.
147         */
148        public void removeAllAckedMessages() {
149            ackedQueue.clear();
150        }
151    
152            public boolean isBrowser() {
153                    return consumerInfo.isBrowser();
154            }
155    }