001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
021    import org.activemq.broker.BrokerClient;
022    import org.activemq.filter.DestinationMap;
023    import org.activemq.filter.Filter;
024    import org.activemq.message.ActiveMQDestination;
025    import org.activemq.message.ConsumerInfo;
026    import org.activemq.service.DeadLetterPolicy;
027    import org.activemq.service.Dispatcher;
028    import org.activemq.service.Subscription;
029    import org.activemq.service.SubscriptionContainer;
030    import org.activemq.service.RedeliveryPolicy;
031    
032    import java.util.HashSet;
033    import java.util.Iterator;
034    import java.util.Map;
035    import java.util.Set;
036    
037    /**
038     * A default RAM only implementation of the {@link SubscriptionContainer}
039     *
040     * @version $Revision: 1.1.1.1 $
041     */
042    public class SubscriptionContainerImpl implements SubscriptionContainer {
043        private Map subscriptions;
044        private DestinationMap destinationIndex = new DestinationMap();
045        private RedeliveryPolicy redeliveryPolicy;
046        private DeadLetterPolicy deadLetterPolicy;
047    
048        public SubscriptionContainerImpl(RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) {
049            this(new ConcurrentHashMap(), redeliveryPolicy,deadLetterPolicy);
050        }
051    
052        public SubscriptionContainerImpl(Map subscriptions, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) {
053            this.subscriptions = subscriptions;
054            this.redeliveryPolicy = redeliveryPolicy;
055            this.deadLetterPolicy = deadLetterPolicy;
056        }
057    
058        public String toString() {
059            return super.toString() + "[size:" + subscriptions.size() + "]";
060        }
061    
062        public RedeliveryPolicy getRedeliveryPolicy() {
063            return redeliveryPolicy;
064        }
065    
066        public DeadLetterPolicy getDeadLetterPolicy(){
067            return deadLetterPolicy;
068        }
069        public Subscription getSubscription(String consumerId) {
070            return (Subscription) subscriptions.get(consumerId);
071        }
072    
073        public Subscription removeSubscription(String consumerId) {
074            Subscription subscription = (Subscription) subscriptions.remove(consumerId);
075            if (subscription != null) {
076                destinationIndex.remove(subscription.getDestination(), subscription);
077            }
078            return subscription;
079        }
080    
081        public Set getSubscriptions(ActiveMQDestination destination) {
082            Object answer = destinationIndex.get(destination);
083            if (answer instanceof Set) {
084                return (Set) answer;
085            }
086            else {
087                Set set = new HashSet(1);
088                set.add(answer);
089                return set;
090            }
091        }
092    
093        public Iterator subscriptionIterator() {
094            return subscriptions.values().iterator();
095        }
096    
097        public Subscription makeSubscription(Dispatcher dispatcher,BrokerClient client, ConsumerInfo info, Filter filter) {
098            Subscription subscription = createSubscription(dispatcher, client,info, filter);
099            subscriptions.put(info.getConsumerId(), subscription);
100            destinationIndex.put(subscription.getDestination(), subscription);
101            return subscription;
102        }
103    
104        protected Subscription createSubscription(Dispatcher dispatcher, BrokerClient client,ConsumerInfo info, Filter filter) {
105            return new SubscriptionImpl(dispatcher, client,info, filter, getRedeliveryPolicy(),getDeadLetterPolicy());
106        }
107    }