001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
021    import org.apache.commons.logging.Log;
022    import org.apache.commons.logging.LogFactory;
023    import org.activemq.broker.BrokerClient;
024    import org.activemq.message.ActiveMQMessage;
025    import org.activemq.service.MessageContainerManager;
026    import org.activemq.service.Service;
027    import org.activemq.service.Subscription;
028    
029    import javax.jms.JMSException;
030    import java.util.Map;
031    import java.util.Iterator;
032    
033    /**
034     * A Dispatcher that polls for updates for active Message Consumers
035     *
036     * @version $Revision: 1.1.1.1 $
037     */
038    public class DispatchWorker implements Runnable, Service {
039        private static final Log log = LogFactory.getLog(DispatchWorker.class);
040        private static final int POLL_TIMEOUT = 250;
041    
042        private Map subscriptions = new ConcurrentHashMap(1000, 0.75f);
043        private Object lock = new Object();
044        private boolean active = true;
045        private boolean started = false;
046        private MessageContainerManager containerManager;
047    
048        /**
049         * Register the MessageContainerManager for the Dispatcher
050         *
051         * @param mcm
052         */
053        public void register(MessageContainerManager mcm) {
054            this.containerManager = mcm;
055        }
056    
057        /**
058         * Called to indicate that there is work to do on a Subscription this will wake up a Dispatch Worker if it is
059         * waiting for messages to dispatch
060         */
061        public void wakeup() {
062            synchronized (lock) {
063                active = true;
064                lock.notifyAll();
065            }
066        }
067    
068        /**
069         * Add an active subscription
070         *
071         * @param client
072         * @param sub
073         */
074        public void addActiveSubscription(BrokerClient client, Subscription sub) {
075            if (log.isDebugEnabled()) {
076                log.info("Adding subscription: " + sub + " to client: " + client);
077            }
078            subscriptions.put(sub, client);
079        }
080    
081        /**
082         * remove an active subscription
083         *
084         * @param client
085         * @param sub
086         */
087        public void removeActiveSubscription(BrokerClient client, Subscription sub) {
088            if (log.isDebugEnabled()) {
089                log.info("Removing subscription: " + sub + " from client: " + client);
090            }
091            subscriptions.remove(sub);
092        }
093    
094        /**
095         * dispatch messages to active Consumers
096         *
097         * @see java.lang.Runnable#run()
098         */
099        public void run() {
100            while (started) {
101                doPoll();
102                boolean dispatched = false;
103                try {
104                    // our collection will not throw concurrent modification exception
105                    for (Iterator iter = subscriptions.keySet().iterator(); iter.hasNext();) {
106                        Subscription sub = (Subscription) iter.next();
107                        if (sub != null && sub.isReadyToDispatch()) {
108                            dispatched = dispatchMessages(sub, dispatched);
109                        }
110                    }
111                }
112                catch (JMSException jmsEx) {
113                    log.error("Could not dispatch to Subscription: " + jmsEx, jmsEx);
114                }
115                if (!dispatched) {
116                    synchronized (lock) {
117                        active = false;
118                        if (!active && started) {
119                            try {
120                                lock.wait(POLL_TIMEOUT);
121                            }
122                            catch (InterruptedException e) {
123                            }
124                        }
125                    }
126                }
127            }
128        }
129    
130    
131        /**
132         * start the DispatchWorker
133         *
134         * @see org.activemq.service.Service#start()
135         */
136        public void start() {
137            started = true;
138        }
139    
140        /**
141         * stop the DispatchWorker
142         *
143         * @see org.activemq.service.Service#stop()
144         */
145        public void stop() {
146            started = false;
147        }
148    
149    
150        // Implementation methods
151        //-------------------------------------------------------------------------
152    
153        protected boolean dispatchMessages(Subscription subscription, boolean dispatched) throws JMSException {
154            ActiveMQMessage[] msgs = subscription.getMessagesToDispatch();
155            if (msgs != null && msgs.length > 0) {
156                BrokerClient client = (BrokerClient) subscriptions.get(subscription);
157                if (client == null) {
158                    log.warn("Null client for subscription: " + subscription);
159                }
160                else {
161                    for (int i = 0; i < msgs.length; i++) {
162                        ActiveMQMessage msg = msgs[i].shallowCopy();
163    
164                        if (log.isDebugEnabled()) {
165                            log.debug("Dispatching message: " + msg);
166                        }
167                        int[] consumerNos = new int[1];
168                        consumerNos[0] = subscription.getConsumerNumber();
169                        msg.setConsumerNos(consumerNos);
170                        client.dispatch(msg);
171                        dispatched = true;
172                    }
173                }
174            }
175            return dispatched;
176        }
177    
178        protected void doPoll() {
179            if (containerManager != null && started) {
180                try {
181                    containerManager.poll();
182                }
183                catch (JMSException e) {
184                    log.error("Error polling from the ContainerManager: ", e);
185                }
186            }
187        }
188    }