001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.service.boundedvm;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Set;
024    
025    import javax.jms.JMSException;
026    
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    import org.activemq.broker.BrokerClient;
030    import org.activemq.filter.DestinationMap;
031    import org.activemq.filter.Filter;
032    import org.activemq.io.util.MemoryBoundedQueue;
033    import org.activemq.message.ActiveMQDestination;
034    import org.activemq.message.ActiveMQMessage;
035    import org.activemq.message.ConsumerInfo;
036    import org.activemq.message.MessageAck;
037    import org.activemq.service.MessageContainer;
038    import org.activemq.service.MessageContainerAdmin;
039    import org.activemq.service.MessageIdentity;
040    import org.activemq.service.Service;
041    
042    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
043    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
044    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
045    
046    /**
047     * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic
048     * messages
049     *
050     * @version $Revision: 1.1.1.1 $
051     */
052    public class TransientTopicBoundedMessageContainer
053            implements
054                MessageContainer,
055                Service,
056                Runnable,
057                MessageContainerAdmin {
058        private SynchronizedBoolean started;
059        private TransientTopicBoundedMessageManager manager;
060        private BrokerClient client;
061        private MemoryBoundedQueue queue;
062        private Thread worker;
063        private CopyOnWriteArrayList subscriptions;
064        private DestinationMap accel;
065        private ConcurrentHashMap subMap;
066        private Log log;
067    
068        /**
069         * Construct this beast
070         *
071         * @param manager
072         * @param client
073         * @param queue
074         */
075        public TransientTopicBoundedMessageContainer(TransientTopicBoundedMessageManager manager, BrokerClient client,
076                MemoryBoundedQueue queue) {
077            this.manager = manager;
078            this.client = client;
079            this.queue = queue;
080            this.started = new SynchronizedBoolean(false);
081            this.subscriptions = new CopyOnWriteArrayList();
082            this.accel = new DestinationMap();
083            this.subMap = new ConcurrentHashMap(100,0.25f);
084            this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer:- " + client);
085        }
086    
087        /**
088         * @return true if this Container has no active subscriptions
089         */
090        public boolean isInactive() {
091            return subscriptions.isEmpty();
092        }
093    
094        /**
095         * @return the BrokerClient this Container is dispatching to
096         */
097        public BrokerClient getBrokerClient() {
098            return client;
099        }
100    
101        /**
102         * Add a consumer to dispatch messages to
103         *
104         * @param filter
105         * @param info
106         */
107        public TransientTopicSubscription addConsumer(Filter filter, ConsumerInfo info) {
108            TransientTopicSubscription ts = findMatch(info);
109            if (ts == null) {
110                ts = new TransientTopicSubscription(filter, info, client);
111                subscriptions.add(ts);
112                accel.put(info.getDestination(),ts);
113                subMap.put(info,ts);
114            }
115            return ts;
116        }
117    
118        /**
119         * Remove a consumer
120         *
121         * @param info
122         */
123        public void removeConsumer(ConsumerInfo info) {
124            TransientTopicSubscription ts = findMatch(info);
125            if (ts != null) {
126                subscriptions.remove(ts);
127                accel.remove(info.getDestination(),ts);
128                subMap.remove(info);
129            }
130        }
131    
132        /**
133         * start working
134         */
135        public void start() {
136            if (started.commit(false, true)) {
137                if (manager.isDecoupledDispatch()) {
138                    worker = new Thread(this, "TransientTopicDispatcher");
139                    worker.setPriority(Thread.NORM_PRIORITY + 2);
140                    worker.start();
141                }
142            }
143        }
144    
145        /**
146         * See if this container should get this message and dispatch it
147         *
148         * @param sender the BrokerClient the message came from
149         * @param message
150         * @return true if it is a valid container
151         * @throws JMSException
152         */
153        public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException {
154            boolean result = false;
155            if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) {
156                List tmpList = null;
157    
158                Set set = accel.get(message.getJMSActiveMQDestination());
159                if (!set.isEmpty()) {
160                    for (Iterator i = set.iterator(); i.hasNext();) {
161                        TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
162                        if (ts.isTarget(message)) {
163                            if (tmpList == null) {
164                                tmpList = new ArrayList();
165                            }
166                            tmpList.add(ts);
167                        }
168                    }
169                }
170                dispatchToQueue(message, tmpList);
171                result = tmpList != null;
172            }
173            return result;
174        }
175    
176        /**
177         * stop working
178         */
179        public void stop() {
180            started.set(false);
181            queue.clear();
182        }
183    
184        /**
185         * close down this container
186         */
187        public void close() {
188            if (started.get()) {
189                stop();
190            }
191            queue.close();
192        }
193    
194    
195        /**
196         * do some dispatching
197         */
198        public void run() {
199            int count = 0;
200            ActiveMQMessage message = null;
201            while (started.get()) {
202                try {
203                    message = (ActiveMQMessage) queue.dequeue(2000);
204                    if (message != null) {
205                        if (!message.isExpired()) {
206                            client.dispatch(message);
207                            if (++count == 250) {
208                                count = 0;
209                                Thread.yield();
210                            }
211                        }else {
212                            if (log.isDebugEnabled()){
213                                log.debug("Message: " + message + " has expired");
214                            }
215                        }
216                    }
217                }
218                catch (Exception e) {
219                    stop();
220                    log.warn("stop dispatching", e);
221                }
222            }
223        }
224    
225        private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException {
226            if (list != null && !list.isEmpty()) {
227                int[] ids = new int[list.size()];
228                for (int i = 0;i < list.size();i++) {
229                    TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i);
230                    ids[i] = ts.getConsumerInfo().getConsumerNo();
231                }
232                message = message.shallowCopy();
233                message.setConsumerNos(ids);
234                if (manager.isDecoupledDispatch()) {
235                    queue.enqueue(message);
236                }
237                else {
238                    client.dispatch(message);
239                }
240            }
241        }
242    
243        private TransientTopicSubscription findMatch(ConsumerInfo info) {
244            return (TransientTopicSubscription) subMap.get(info);
245        }
246    
247        /**
248         * @param destination
249         * @return true if a
250         */
251        public boolean hasConsumerFor(ActiveMQDestination destination) {
252            for (Iterator i = subscriptions.iterator();i.hasNext();) {
253                TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
254                ConsumerInfo info = ts.getConsumerInfo();
255                if (info.getDestination().matches(destination)) {
256                    return true;
257                }
258            }
259            return false;
260        }
261    
262        /**
263         * @return the destination name
264         */
265        public String getDestinationName() {
266            return "";
267        }
268    
269        /**
270         * @param msg
271         * @return @throws JMSException
272         */
273        public void addMessage(ActiveMQMessage msg) throws JMSException {
274        }
275    
276        /**
277         * @param messageIdentity
278         * @param ack
279         * @throws JMSException
280         */
281        public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
282        }
283    
284        /**
285         * @param messageIdentity
286         * @return @throws JMSException
287         */
288        public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
289            return null;
290        }
291    
292        /**
293         * @param messageIdentity
294         * @throws JMSException
295         */
296        public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
297        }
298    
299        /**
300         * @param messageIdentity
301         * @param ack
302         * @throws JMSException
303         */
304        public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException {
305        }
306    
307        /**
308         * @param messageIdentity
309         * @return @throws JMSException
310         */
311        public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
312            return false;
313        }
314    
315        /**
316         * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
317         */
318        public MessageContainerAdmin getMessageContainerAdmin() {
319            return this;
320        }
321    
322        /**
323         * @see org.activemq.service.MessageContainerAdmin#empty()
324         */
325        public void empty() throws JMSException {
326            // TODO implement me
327        }
328    
329        /**
330         * @see org.activemq.service.MessageContainer#isDeadLetterQueue()
331         */
332        public boolean isDeadLetterQueue() {
333            return false;
334        }
335    }