001    /**
002     * Copyright 2004 Protique Ltd
003     * 
004     * Licensed under the Apache License, Version 2.0 (the "License"); 
005     * you may not use this file except in compliance with the License. 
006     * You may obtain a copy of the License at 
007     * 
008     * http://www.apache.org/licenses/LICENSE-2.0
009     * 
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS, 
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
013     * See the License for the specific language governing permissions and 
014     * limitations under the License. 
015     * 
016     **/
017    
018    package org.activemq.service.impl;
019    import java.util.Collections;
020    import java.util.HashMap;
021    import java.util.Iterator;
022    import java.util.Map;
023    import java.util.Set;
024    
025    import javax.jms.DeliveryMode;
026    import javax.jms.Destination;
027    import javax.jms.InvalidDestinationException;
028    import javax.jms.JMSException;
029    import javax.jms.Topic;
030    
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    import org.activemq.DuplicateDurableSubscriptionException;
034    import org.activemq.broker.BrokerClient;
035    import org.activemq.filter.AndFilter;
036    import org.activemq.filter.DestinationMap;
037    import org.activemq.filter.Filter;
038    import org.activemq.filter.FilterFactory;
039    import org.activemq.filter.FilterFactoryImpl;
040    import org.activemq.filter.NoLocalFilter;
041    import org.activemq.message.ActiveMQDestination;
042    import org.activemq.message.ActiveMQMessage;
043    import org.activemq.message.ActiveMQTopic;
044    import org.activemq.message.ConsumerInfo;
045    import org.activemq.message.MessageAck;
046    import org.activemq.service.DeadLetterPolicy;
047    import org.activemq.service.Dispatcher;
048    import org.activemq.service.MessageContainer;
049    import org.activemq.service.RedeliveryPolicy;
050    import org.activemq.service.Subscription;
051    import org.activemq.service.SubscriptionContainer;
052    import org.activemq.service.TopicMessageContainer;
053    import org.activemq.service.TransactionManager;
054    import org.activemq.service.TransactionTask;
055    import org.activemq.store.PersistenceAdapter;
056    
057    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
058    
059    /**
060     * A default Broker used for Topic messages for durable consumers
061     * 
062     * @version $Revision: 1.1.1.1 $
063     */
064    public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport {
065        private static final Log log = LogFactory.getLog(DurableTopicMessageContainerManager.class);
066        private PersistenceAdapter persistenceAdapter;
067        protected SubscriptionContainer subscriptionContainer;
068        protected FilterFactory filterFactory;
069        protected Map activeSubscriptions = new ConcurrentHashMap();
070        private DestinationMap destinationMap = new DestinationMap();
071        private ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
072        
073        public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) {
074            this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(redeliveryPolicy,deadLetterPolicy), new FilterFactoryImpl(),
075                    new DispatcherImpl());
076        }
077    
078        public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter,
079                SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
080            super(dispatcher);
081            this.persistenceAdapter = persistenceAdapter;
082            this.subscriptionContainer = subscriptionContainer;
083            this.filterFactory = filterFactory;
084            try {
085                loadAllMessageContainers();
086            }
087            catch (JMSException e) {
088                log.error("Failed to load initial Topic Containers",e);
089            }
090        }
091    
092        public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
093            if (info.isDurableTopic()) {
094                if (log.isDebugEnabled()) {
095                    log.debug("Adding consumer: " + info);
096                }
097    
098                doAddMessageConsumer(client, info);
099            }
100        }
101    
102        public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
103            // we should not remove a durable topic subscription from the subscriptionContainer
104            // unless via the deleteSubscription() method
105            
106    //        subscriptionContainer.removeSubscription(info.getConsumerId());
107    //        subscription.clear();
108            
109            Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
110            if (sub != null) {
111                sub.setActive(false);
112                dispatcher.removeActiveSubscription(client, sub);
113            }
114        }
115    
116        /**
117         * Delete a durable subscriber
118         * 
119         * @param clientId
120         * @param subscriberName
121         * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
122         */
123        public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
124            
125            String consumerKey = ConsumerInfo.generateConsumerKey(clientId, subscriberName);        
126            Subscription sub = (Subscription) durableSubscriptions.remove(consumerKey);
127            if( sub!=null ) {
128                //only delete if not active
129                if (sub.isActive()) {
130                    throw new JMSException("The Consummer " + subscriberName + " is still active");
131                }
132                else {
133                    subscriptionContainer.removeSubscription(sub.getConsumerId());
134                    sub.clear();
135                    
136                    Set containers = destinationMap.get(sub.getDestination());
137                    for (Iterator iter = containers.iterator();iter.hasNext();) {
138                        TopicMessageContainer container = (TopicMessageContainer) iter.next();
139                        if (container instanceof DurableTopicMessageContainer) {
140                            ((DurableTopicMessageContainer) container).deleteSubscription(sub.getPersistentKey());
141                        }
142                    }
143                    
144                }
145            } else {
146                throw new InvalidDestinationException("The Consumer " + subscriberName + " does not exist for client: "
147                        + clientId);
148            }
149        }
150    
151        /**
152         * @param client
153         * @param message
154         * @throws javax.jms.JMSException
155         */
156        public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
157            ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
158            if (dest != null && dest.isTopic() && message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) {
159                // note that we still need to persist the message even if there are no matching
160                // subscribers as they may come along later
161                // plus we don't pre-load subscription information                
162                final MessageContainer container = getContainer(dest.getPhysicalName());
163                container.addMessage(message);
164                TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() {
165                    public void execute() throws Throwable {
166                        doSendMessage(client, message, container);
167                    }
168                });
169            }
170        }
171    
172        /**
173         * @param client
174         * @param message
175         * @throws JMSException
176         */
177        private void doSendMessage(BrokerClient client, ActiveMQMessage message, MessageContainer container) throws JMSException {
178            Set matchingSubscriptions = subscriptionContainer.getSubscriptions(message.getJMSActiveMQDestination());
179            if (!matchingSubscriptions.isEmpty()) {
180                for (Iterator i = matchingSubscriptions.iterator();i.hasNext();) {
181                    Subscription sub = (Subscription) i.next();
182                    if (sub.isTarget(message)) {
183                        sub.addMessage(container, message);
184                    }
185                }
186                updateSendStats(client, message);
187            }        
188        }
189    
190        /**
191         * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination}
192         * objects used by non-broker consumers directly connected to this container
193         *
194         * @return
195         */
196        public Map getLocalDestinations() {
197            Map localDestinations = new HashMap();
198            for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) {
199                Subscription sub = (Subscription) iter.next();
200                if (sub.isLocalSubscription()) {
201                    final ActiveMQDestination dest = sub.getDestination();
202                    localDestinations.put(dest.getPhysicalName(), dest);
203                }
204            }
205            return Collections.unmodifiableMap(localDestinations);
206        }
207    
208        /**
209         * Acknowledge a message as being read and consumed byh the Consumer
210         * 
211         * @param client
212         * @param ack
213         * @throws javax.jms.JMSException
214         */
215        public void acknowledgeMessage(BrokerClient client, final MessageAck ack) throws JMSException {
216            if ( !ack.getDestination().isTopic() || !ack.isPersistent())
217                return;
218                
219            Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
220            if (sub == null) {
221                return;
222            }
223            
224            sub.messageConsumed(ack);
225        }
226    
227        /**
228         * poll or messages
229         * 
230         * @throws javax.jms.JMSException
231         */
232        public void poll() throws JMSException {
233            //do nothing
234        }
235    
236        // Implementation methods
237        //-------------------------------------------------------------------------
238        protected MessageContainer createContainer(String destinationName) throws JMSException {
239            TopicMessageContainer topicMessageContainer = 
240               new DurableTopicMessageContainer(this, persistenceAdapter.createTopicMessageStore(destinationName), destinationName);
241            destinationMap.put(new ActiveMQTopic(destinationName), topicMessageContainer);
242            return topicMessageContainer;
243        }
244    
245        protected Destination createDestination(String destinationName) {
246            return new ActiveMQTopic(destinationName);
247        }
248    
249        public boolean isConsumerActiveOnDestination(ActiveMQDestination dest) {
250            for (Iterator iter = activeSubscriptions.values().iterator();iter.hasNext();) {
251                Subscription subscription = (Subscription) iter.next();
252                if( subscription.getDestination().equals(dest) ) {
253                    return true;
254                }
255            }
256            return false;
257        }
258        
259        protected void doAddMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
260            boolean shouldRecover = false;
261            if (info.getConsumerName() != null && info.getClientId() != null) {
262                for (Iterator iter = activeSubscriptions.values().iterator();iter.hasNext();) {
263                    Subscription subscription = (Subscription) iter.next();
264                    if (subscription.isSameDurableSubscription(info)) {
265                        throw new DuplicateDurableSubscriptionException(info);
266                    }
267                }
268            }        
269            Subscription subscription = (Subscription) durableSubscriptions.get(info.getConsumerKey());  
270            //subscriptionContainer.getSubscription(info.getConsumerId());
271            if (subscription != null) {
272                //check the subscription hasn't changed
273                if (!equal(subscription.getDestination(), info.getDestination())
274                        || !equal(subscription.getSelector(), info.getSelector())) {
275                    subscriptionContainer.removeSubscription(info.getConsumerId());
276                    subscription.clear();
277                    subscription = subscriptionContainer.makeSubscription(dispatcher, client, info, createFilter(info));
278                    durableSubscriptions.put(info.getConsumerKey(), subscription);
279                }
280            }
281            else {
282                subscription = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
283                shouldRecover = true;
284                durableSubscriptions.put(info.getConsumerKey(), subscription);
285            }
286            subscription.setActiveConsumer(client,info);
287            activeSubscriptions.put(info.getConsumerId(), subscription);
288            dispatcher.addActiveSubscription(client, subscription);
289            
290            // load the container
291            getContainer(subscription.getDestination().getPhysicalName());
292            
293            Set containers = destinationMap.get(subscription.getDestination());
294            for (Iterator iter = containers.iterator();iter.hasNext();) {
295                TopicMessageContainer container = (TopicMessageContainer) iter.next();
296                if (container instanceof DurableTopicMessageContainer) {
297                    ((DurableTopicMessageContainer) container).storeSubscription(info, subscription);
298                }
299            }
300            if (shouldRecover) {
301                recoverSubscriptions(subscription);
302            }
303            // lets not make the subscription active until later
304            // as we can't start dispatching until we've sent back the receipt
305            // TODO we might wish to register a post-receipt action here
306            // to perform the wakeup
307            subscription.setActive(true);
308            //dispatcher.wakeup(subscription);
309        }
310    
311        /**
312         * Returns true if the two objects are null or are equal
313         */
314        protected final boolean equal(Object o1, Object o2) {
315            return o1 == o2 || (o1 != null && o1.equals(o2));
316        }
317    
318        /**
319         * This method is called when a new durable subscription is started and so we need to go through each matching
320         * message container and dispatch any matching messages that may be outstanding
321         * 
322         * @param subscription
323         */
324        protected void recoverSubscriptions(Subscription subscription) throws JMSException {
325            // we should load all of the message containers from disk if we're a wildcard
326            
327            getContainer(subscription.getDestination().getPhysicalName());
328            Set containers = destinationMap.get(subscription.getDestination());
329            for (Iterator iter = containers.iterator();iter.hasNext();) {
330                TopicMessageContainer container = (TopicMessageContainer) iter.next();
331                container.recoverSubscription(subscription);
332            }
333        }
334    
335        /**
336         * Called when recovering a wildcard subscription where we need to load all the durable message containers (for
337         * which we have any outstanding messages to deliver) into RAM
338         */
339        protected void loadAllMessageContainers() throws JMSException {
340            Map destinations = persistenceAdapter.getInitialDestinations();
341            if (destinations != null) {
342                for (Iterator iter = destinations.entrySet().iterator();iter.hasNext();) {
343                    Map.Entry entry = (Map.Entry) iter.next();
344                    String name = (String) entry.getKey();
345                    Destination destination = (Destination) entry.getValue();
346                    if ( destination instanceof Topic ) {
347                        loadContainer(name, destination);
348                    }
349                }
350            }
351        }
352    
353        /**
354         * Create filter for a Consumer
355         * 
356         * @param info
357         * @return the Fitler
358         * @throws javax.jms.JMSException
359         */
360        protected Filter createFilter(ConsumerInfo info) throws JMSException {
361            Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
362            if (info.isNoLocal()) {
363                filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
364            }
365            return filter;
366        }
367        
368        public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
369            // This container only does topics.
370            if(!dest.isTopic()) 
371                return;
372            super.createMessageContainer(dest);
373        }
374        
375        public synchronized void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
376            // This container only does topics.
377            if(!dest.isTopic()) 
378                return;
379            super.destroyMessageContainer(dest);
380            destinationMap.removeAll(dest);
381        }
382    }