001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.service.boundedvm;
020    
021    import java.util.Collections;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.Set;
025    import java.util.HashMap;
026    import javax.jms.JMSException;
027    import org.activemq.broker.BrokerClient;
028    import org.activemq.filter.AndFilter;
029    import org.activemq.filter.DestinationMap;
030    import org.activemq.filter.Filter;
031    import org.activemq.filter.FilterFactory;
032    import org.activemq.filter.FilterFactoryImpl;
033    import org.activemq.filter.NoLocalFilter;
034    import org.activemq.io.util.MemoryBoundedQueue;
035    import org.activemq.io.util.MemoryBoundedQueueManager;
036    import org.activemq.message.ActiveMQDestination;
037    import org.activemq.message.ActiveMQMessage;
038    import org.activemq.message.ConsumerInfo;
039    import org.activemq.message.MessageAck;
040    import org.activemq.service.DeadLetterPolicy;
041    import org.activemq.service.MessageContainer;
042    import org.activemq.service.MessageContainerManager;
043    import org.activemq.service.TransactionManager;
044    import org.activemq.service.TransactionTask;
045    import org.activemq.service.impl.AutoCommitTransaction;
046    
047    import org.apache.commons.logging.Log;
048    import org.apache.commons.logging.LogFactory;
049    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
050    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
051    
052    /**
053     * A MessageContainerManager for transient topics
054     * 
055     * @version $Revision: 1.1.1.1 $
056     */
057    
058    /**
059     * A manager of MessageContainer instances
060     */
061    public class TransientTopicBoundedMessageManager implements MessageContainerManager {
062        private static final Log log = LogFactory.getLog(TransientTopicBoundedMessageManager.class);
063        private MemoryBoundedQueueManager queueManager;
064        private ConcurrentHashMap containers;
065        private ConcurrentHashMap subscriptions;
066        private DestinationMap destinationMap;
067        private FilterFactory filterFactory;
068        private SynchronizedBoolean started;
069        private Map destinations;
070        private DeadLetterPolicy deadLetterPolicy;
071        private boolean decoupledDispatch = false;
072    
073        /**
074         * Constructor for TransientTopicBoundedMessageManager
075         *
076         * @param mgr
077         */
078        public TransientTopicBoundedMessageManager(MemoryBoundedQueueManager mgr) {
079            this.queueManager = mgr;
080            this.containers = new ConcurrentHashMap();
081            this.subscriptions = new ConcurrentHashMap();
082            this.destinationMap = new DestinationMap();
083            this.destinations = new ConcurrentHashMap();
084            this.filterFactory = new FilterFactoryImpl();
085            this.started = new SynchronizedBoolean(false);
086        }
087    
088        /**
089         * start the manager
090         *
091         * @throws JMSException
092         */
093        public void start() throws JMSException {
094            if (started.commit(false, true)) {
095                for (Iterator i = containers.values().iterator(); i.hasNext();) {
096                    TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
097                    container.start();
098                }
099            }
100        }
101    
102        /**
103         * stop the manager
104         *
105         * @throws JMSException
106         */
107        public void stop() throws JMSException {
108            if (started.commit(true, false)) {
109                for (Iterator i = containers.values().iterator(); i.hasNext();) {
110                    TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
111                    container.stop();
112                }
113            }
114        }
115    
116        /**
117         * Add a consumer if appropiate
118         *
119         * @param client
120         * @param info
121         * @throws JMSException
122         */
123        public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
124            ActiveMQDestination destination = info.getDestination();
125            if (destination.isTopic()) {
126                TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
127                        .get(client);
128                if (container == null) {
129                    MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString());
130                    container = new TransientTopicBoundedMessageContainer(this, client, queue);
131                    containers.put(client, container);
132                    if (started.get()) {
133                        container.start();
134                    }
135                }
136                if (log.isDebugEnabled()) {
137                    log.debug("Adding consumer: " + info);
138                }
139    
140                TransientTopicSubscription ts = container.addConsumer(createFilter(info), info);
141                if (ts != null) {
142                    subscriptions.put(info.getConsumerId(), ts);
143                }
144    
145                destinationMap.put(destination,container);
146                String name = destination.getPhysicalName();
147                //As the destinations are used for generating
148                //subscriptions for NetworkConnectors etc.,
149                //we should not generate duplicates by adding in 
150                //durable topic subscribers
151                if (!info.isDurableTopic() && !destinations.containsKey(name)) {
152                    destinations.put(name, destination);
153                }
154            }
155        }
156    
157        /**
158         * @param client
159         * @param info
160         * @throws JMSException
161         */
162        public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
163            ActiveMQDestination destination = info.getDestination();
164            if (destination.isTopic()) {
165                TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
166                        .get(client);
167                if (container != null) {
168                    container.removeConsumer(info);
169                    if (container.isInactive()) {
170                        containers.remove(client);
171                        container.close();
172                        destinationMap.remove(destination, container);
173                    }
174    
175                    // lets check if we've no more consumers for this destination
176                    //As the destinations are used for generating
177                    //subscriptions for NetworkConnectors etc.,
178                    //we should not count durable topic subscribers
179                    if (!info.isDurableTopic() && !hasConsumerFor(destination)) {
180                        destinations.remove(destination.getPhysicalName());
181                    }
182                }
183                subscriptions.remove(info.getConsumerId());
184            }
185        }
186    
187        /**
188         * Delete a durable subscriber
189         *
190         * @param clientId
191         * @param subscriberName
192         * @throws JMSException if the subscriber doesn't exist or is still active
193         */
194        public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
195        }
196    
197        /**
198         * @param client
199         * @param message
200         * @throws JMSException
201         */
202        public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
203            if (TransactionManager.getContexTransaction()==AutoCommitTransaction.AUTO_COMMIT_TRANSACTION){
204                doSendMessage(client, message);
205            }else {
206                // If there is no transaction.. then this executes directly.
207                TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() {
208                    public void execute() throws Throwable {
209                        doSendMessage(client, message);
210                    }
211                });
212            }
213        }
214    
215        /**
216         * @param client
217         * @param message
218         * @throws JMSException
219         */
220        private void doSendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
221            Set set = destinationMap.get(message.getJMSActiveMQDestination());
222            if (!set.isEmpty()){
223                for (Iterator i = set.iterator(); i.hasNext();) {
224                    TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
225                    container.targetAndDispatch(client,message);
226                }       
227            }
228        }
229    
230        /**
231         * @param client
232         * @param ack
233         * @throws JMSException
234         * 
235         */
236        public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
237        }
238        
239        /**
240         * @throws JMSException
241         * 
242         */
243    
244        public void poll() throws JMSException {
245        }
246    
247        /**
248         * For Transient topics - a MessageContainer maps on to the messages
249         * to be dispatched through a BrokerClient, not a destination
250         * @param physicalName
251         * @return the MessageContainer used for dispatching - always returns null
252         * @throws JMSException
253         */
254        public MessageContainer getContainer(String physicalName) throws JMSException {
255            return null; 
256        }
257    
258        /**
259         * @return a map of all the destinations
260         */
261        public Map getDestinations() {
262            return Collections.unmodifiableMap(destinations);
263        }
264    
265        /**
266         * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination}
267         * objects used by non-broker consumers directly connected to this container
268         *
269         * @return
270         */
271        public Map getLocalDestinations() {
272            Map localDestinations = new HashMap();
273            for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
274                TransientTopicSubscription sub = (TransientTopicSubscription) iter.next();
275                if (sub.isLocalSubscription() && !sub.isDurableTopic()) {
276                    final ActiveMQDestination dest = sub.getDestination();
277                    localDestinations.put(dest.getPhysicalName(), dest);
278                }
279            }
280            return Collections.unmodifiableMap(localDestinations);
281        }
282        
283        /**
284         * @return the DeadLetterPolicy for this Container Manager
285         */
286        public DeadLetterPolicy getDeadLetterPolicy(){
287            return deadLetterPolicy;
288        }
289        
290        /**
291         * Set the DeadLetterPolicy for this Container Manager
292         * @param policy
293         */
294        public void setDeadLetterPolicy(DeadLetterPolicy policy){
295            this.deadLetterPolicy = policy;
296        }
297    
298        /**
299         * @return Returns the decoupledDispatch.
300         */
301        public boolean isDecoupledDispatch() {
302            return decoupledDispatch;
303        }
304        /**
305         * @param decoupledDispatch The decoupledDispatch to set.
306         */
307        public void setDecoupledDispatch(boolean decoupledDispatch) {
308            this.decoupledDispatch = decoupledDispatch;
309        }
310        /**
311         * Create filter for a Consumer
312         *
313         * @param info
314         * @return the Fitler
315         * @throws javax.jms.JMSException
316         */
317        protected Filter createFilter(ConsumerInfo info) throws JMSException {
318            Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
319            if (info.isNoLocal()) {
320                filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
321            }
322            return filter;
323        }
324    
325        protected boolean hasConsumerFor(ActiveMQDestination destination) {
326            for (Iterator i = containers.values().iterator(); i.hasNext();) {
327                TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
328                if (container.hasConsumerFor(destination)) {
329                    return true;
330                }
331            }
332            return false;
333        }
334    
335        /**
336         * @see org.activemq.service.MessageContainerManager#createMessageContainer(org.activemq.message.ActiveMQDestination)
337         */
338        public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
339        }
340        
341        /**
342         * @see org.activemq.service.MessageContainerManager#destroyMessageContainer(org.activemq.message.ActiveMQDestination)
343         */
344        public void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
345            containers.remove(dest);
346            destinationMap.removeAll(dest);
347        }
348        
349        /**
350         * @see org.activemq.service.MessageContainerManager#getMessageContainerAdmins()
351         */
352        public Map getMessageContainerAdmins() throws JMSException {
353            return Collections.EMPTY_MAP;
354        }
355        
356    }