001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
021    import org.activemq.broker.BrokerClient;
022    import org.activemq.message.ActiveMQDestination;
023    import org.activemq.message.ActiveMQMessage;
024    import org.activemq.service.DeadLetterPolicy;
025    import org.activemq.service.Dispatcher;
026    import org.activemq.service.MessageContainer;
027    import org.activemq.service.MessageContainerManager;
028    import org.activemq.service.Subscription;
029    
030    import javax.jms.Destination;
031    import javax.jms.JMSException;
032    import java.util.Collections;
033    import java.util.HashMap;
034    import java.util.Iterator;
035    import java.util.Map;
036    
037    /**
038     * @version $Revision: 1.1.1.1 $
039     */
040    public abstract class MessageContainerManagerSupport implements MessageContainerManager {
041        protected Dispatcher dispatcher;
042        protected Map messageContainers = new ConcurrentHashMap();
043        private Map destinations = new ConcurrentHashMap();
044        private boolean maintainDestinationStats = true;
045        private DeadLetterPolicy  deadLetterPolicy;
046    
047        public MessageContainerManagerSupport(Dispatcher dispatcher) {
048            this.dispatcher = dispatcher;
049            dispatcher.register(this);
050        }
051    
052        public Map getDestinations() {
053            return Collections.unmodifiableMap(destinations);
054        }
055    
056        public void start() throws JMSException {
057            dispatcher.start();
058        }
059    
060        public void stop() throws JMSException {
061            dispatcher.stop();
062            JMSException firstException = null;
063            try {
064                dispatcher.stop();
065            }
066            catch (JMSException e) {
067                firstException = e;
068            }
069    
070            // lets stop all the containers
071            for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
072                MessageContainer container = (MessageContainer) iter.next();
073                try {
074                    container.stop();
075                }
076                catch (JMSException e) {
077                    if (firstException == null) {
078                        firstException = e;
079                    }
080                }
081            }
082            if (firstException != null) {
083                throw firstException;
084            }
085    
086        }
087    
088        public synchronized MessageContainer getContainer(String destinationName) throws JMSException {
089            MessageContainer container = (MessageContainer) messageContainers.get(destinationName);
090            if (container == null) {
091                container = createContainer(destinationName);
092                container.start();
093                messageContainers.put(destinationName, container);
094    
095                destinations.put(destinationName, createDestination(destinationName));
096            }
097            return container;
098        }
099    
100        public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
101            // since get creates the container if it does not exist.
102            getContainer(dest.getPhysicalName());
103        }
104        
105        synchronized public Map getMessageContainerAdmins() {
106            HashMap map = new HashMap(messageContainers.size());
107            for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
108                MessageContainer mc = (MessageContainer) iter.next();
109                map.put(mc.getDestinationName(), mc.getMessageContainerAdmin());            
110            }
111            return Collections.unmodifiableMap(map);
112        }
113        
114    
115        public synchronized void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
116            MessageContainer container = (MessageContainer) messageContainers.get(dest.getPhysicalName());
117            if (container != null) {
118                container.getMessageContainerAdmin().empty();
119                container.stop();
120                messageContainers.remove(dest.getPhysicalName());
121                destinations.remove(dest.getPhysicalName());
122            }
123        }
124    
125        // Properties
126        //-------------------------------------------------------------------------
127        public boolean isMaintainDestinationStats() {
128            return maintainDestinationStats;
129        }
130    
131        public void setMaintainDestinationStats(boolean maintainDestinationStats) {
132            this.maintainDestinationStats = maintainDestinationStats;
133        }
134        
135        /**
136         * @return the DeadLetterPolicy for this Container Manager
137         */
138        public DeadLetterPolicy getDeadLetterPolicy(){
139            return deadLetterPolicy;
140        }
141        
142        /**
143         * Set the DeadLetterPolicy for this Container Manager
144         * @param policy
145         */
146        public void setDeadLetterPolicy(DeadLetterPolicy policy){
147            this.deadLetterPolicy = policy;
148        }
149    
150        // Implementation methods
151        //-------------------------------------------------------------------------
152    
153        /**
154         * Factory method to create a new {@link Destination}
155         */
156        protected abstract Destination createDestination(String destinationName);
157    
158        /**
159         * Factory method to create a new {@link MessageContainer}
160         */
161        protected abstract MessageContainer createContainer(String destinationName) throws JMSException;
162    
163        /**
164         * Loads the container for the given name and destination on startup
165         */
166        protected void loadContainer(String destinationName, Destination destination) throws JMSException {
167            destinations.put(destinationName, destination);
168    
169            MessageContainer container = createContainer(destinationName);
170            container.start();
171            messageContainers.put(destinationName, container);
172        }
173    
174        /**
175         * Updates the message acknowledgement stats
176         *
177         * @param client
178         * @param subscription
179         */
180        protected void updateAcknowledgeStats(BrokerClient client, Subscription subscription) {
181            if (isMaintainDestinationStats()) {
182                // lets lookup the destination which has the stats hanging off it
183                String name = subscription.getDestination().getPhysicalName();
184                ActiveMQDestination destination = (ActiveMQDestination) destinations.get(name);
185                destination.getStats().onMessageAck();
186            }
187        }
188    
189        /**
190         * Updates the message sending stats
191         *
192         * @param client
193         * @param message
194         * @throws JMSException
195         */
196        protected void updateSendStats(BrokerClient client, ActiveMQMessage message) throws JMSException {
197            if (isMaintainDestinationStats()) {
198                // lets lookup the destination which has the stats hanging off it
199                String name = message.getJMSActiveMQDestination().getPhysicalName();
200                ActiveMQDestination destination = (ActiveMQDestination) destinations.get(name);
201                if (destination != null){
202                    destination.getStats().onMessageSend(message);
203                }
204            }
205        }
206        
207    }