001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.service.boundedvm;
020    import java.util.HashMap;
021    import java.util.List;
022    import java.util.Map;
023    
024    import javax.jms.JMSException;
025    
026    import org.activemq.broker.BrokerClient;
027    import org.activemq.filter.Filter;
028    import org.activemq.io.util.MemoryBoundedQueue;
029    import org.activemq.io.util.MemoryBoundedQueueManager;
030    import org.activemq.io.util.MemoryManageable;
031    import org.activemq.message.ActiveMQDestination;
032    import org.activemq.message.ActiveMQMessage;
033    import org.activemq.message.ConsumerInfo;
034    import org.activemq.message.MessageAck;
035    import org.activemq.service.DeadLetterPolicy;
036    import org.activemq.service.MessageContainer;
037    import org.activemq.service.MessageContainerAdmin;
038    import org.activemq.service.MessageIdentity;
039    import org.activemq.service.QueueListEntry;
040    import org.activemq.service.RedeliveryPolicy;
041    import org.activemq.service.Service;
042    import org.activemq.service.impl.DefaultQueueList;
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    
046    import EDU.oswego.cs.dl.util.concurrent.Executor;
047    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
048    
049    /**
050     * A MessageContainer for transient queues
051     * 
052     * @version $Revision: 1.1.1.1 $
053     */
054    public class TransientQueueBoundedMessageContainer implements MessageContainer, Service, Runnable, MessageContainerAdmin {
055        private MemoryBoundedQueueManager queueManager;
056        private ActiveMQDestination destination;
057        private SynchronizedBoolean started;
058        private SynchronizedBoolean running;
059        private MemoryBoundedQueue queue;
060        private DefaultQueueList subscriptions;
061        private Executor threadPool;
062        private Log log;
063        private long idleTimestamp; //length of time (ms) there have been no active subscribers
064        private DeadLetterPolicy deadLetterPolicy;
065            private final Object dispatchMutex = new Object();
066        private final Object subscriptionMutex = new Object();
067            
068        /**
069         * Construct this beast
070         * 
071         * @param threadPool
072         * @param queueManager
073         * @param destination
074         * @param redeliveryPolicy
075         * @param deadLetterPolicy
076         */
077        public TransientQueueBoundedMessageContainer(Executor threadPool, MemoryBoundedQueueManager queueManager,
078                ActiveMQDestination destination,RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
079            this.threadPool = threadPool;
080            this.queueManager = queueManager;
081            this.destination = destination;
082            this.deadLetterPolicy = deadLetterPolicy;
083            this.queue = queueManager.getMemoryBoundedQueue("TRANSIENT_QUEUE:-" + destination.getPhysicalName());
084            this.started = new SynchronizedBoolean(false);
085            this.running = new SynchronizedBoolean(false);
086            this.subscriptions = new DefaultQueueList();
087            this.log = LogFactory.getLog("TransientQueueBoundedMessageContainer:- " + destination);
088        }
089    
090        
091        /**
092         * @return true if there are subscribers waiting for messages
093         */
094        public boolean isActive(){
095            return !subscriptions.isEmpty();
096        }
097        
098        /**
099         * @return true if no messages are enqueued
100         */
101        public boolean isEmpty(){
102            return queue.isEmpty();
103        }
104        
105        /**
106         * @return the timestamp (ms) from the when the last active subscriber stopped
107         */
108        public long getIdleTimestamp(){
109            return idleTimestamp;
110        }
111        
112        
113    
114        /**
115         * Add a consumer to dispatch messages to
116         * 
117         * @param filter
118         * @param info
119         * @param client
120         * @return TransientQueueSubscription
121         * @throws JMSException
122         */
123        public TransientQueueSubscription addConsumer(Filter filter,ConsumerInfo info, BrokerClient client) throws JMSException {
124            synchronized (subscriptionMutex) {
125                TransientQueueSubscription ts = findMatch(info);
126                if (ts == null) {
127                    MemoryBoundedQueue queue = queueManager
128                            .getMemoryBoundedQueue("TRANSIENT_QUEUE_SUB:-"
129                                    + info.getConsumerId());
130                    MemoryBoundedQueue ackQueue = queueManager
131                            .getMemoryBoundedQueue("TRANSIENT_QUEUE_SUB_ACKED:-"
132                                    + info.getConsumerId());
133                    ts = new TransientQueueSubscription(client, queue, ackQueue,
134                            filter, info);
135    
136                    idleTimestamp = 0;
137                    subscriptions.add(ts);
138                    if (started.get()) {
139                        synchronized (running) {
140                            if (running.commit(false, true)) {
141                                try {
142                                    threadPool.execute(this);
143                                } catch (InterruptedException e) {
144                                    JMSException jmsEx = new JMSException(
145                                            toString()
146                                                    + " Failed to start running dispatch thread");
147                                    jmsEx.setLinkedException(e);
148                                    throw jmsEx;
149                                }
150                            }
151                        }
152                    }
153    
154                }
155                return ts;
156            }
157        }
158    
159        /**
160         * Remove a consumer
161         * 
162         * @param info
163         * @throws JMSException
164         */
165        public void removeConsumer(ConsumerInfo info) throws JMSException {
166            synchronized (subscriptionMutex) {
167                TransientQueueSubscription ts = findMatch(info);
168                if (ts != null) {
169    
170                    subscriptions.remove(ts);
171                    if (subscriptions.isEmpty()) {
172                        running.commit(true, false);
173                        idleTimestamp = System.currentTimeMillis();
174                    }
175    
176                    //get unacknowledged messages and re-enqueue them
177                    List list = ts.getUndeliveredMessages();
178                    for (int i = list.size() - 1; i >= 0; i--) {
179                        queue.enqueueFirstNoBlock((MemoryManageable) list.get(i));
180                    }
181    
182                    // If it is a queue browser, then re-enqueue the browsed
183                    // messages.
184                    if (ts.isBrowser()) {
185                        list = ts.listAckedMessages();
186                        for (int i = list.size() - 1; i >= 0; i--) {
187                            queue.enqueueFirstNoBlock((MemoryManageable) list
188                                    .get(i));
189                        }
190                        ts.removeAllAckedMessages();
191                    }
192    
193                    ts.close();
194                }
195            }
196        }
197        
198        
199        /**
200         * start working
201         * 
202         * @throws JMSException
203         */
204        public void start() throws JMSException {
205            if (started.commit(false, true)) {
206                if (!subscriptions.isEmpty()) {
207                    synchronized (running) {
208                        if (running.commit(false, true)) {
209                            try {
210                                threadPool.execute(this);
211                            }
212                            catch (InterruptedException e) {
213                                JMSException jmsEx = new JMSException(toString() + " Failed to start");
214                                jmsEx.setLinkedException(e);
215                                throw jmsEx;
216                            }
217                        }
218                    }
219                }
220            }
221        }
222    
223        /**
224         * enqueue a message for dispatching
225         * 
226         * @param message
227         */
228        public void enqueue(ActiveMQMessage message) {
229            if (message.isAdvisory()) {
230                doAdvisoryDispatchMessage(message);
231            }
232            else {
233                queue.enqueue(message);
234                startRunning();
235            }
236        }
237    
238        /**
239         * re-enqueue a message for dispatching
240         * 
241         * @param message
242         */
243        public void redeliver(ActiveMQMessage message) {
244            queue.enqueueFirstNoBlock(message);
245            startRunning();
246        }
247    
248        public void redeliver(List messages) {
249            queue.enqueueAllFirstNoBlock(messages);
250            startRunning();
251        }
252    
253        /**
254         * stop working
255         */
256        public void stop() {
257            started.set(false);
258            running.set(false);
259            queue.clear();
260        }
261    
262        /**
263         * close down this container
264         * 
265         * @throws JMSException
266         */
267        public void close() throws JMSException {
268            if (started.get()) {
269                stop();
270            }
271            queue.close();
272            synchronized (subscriptionMutex) {
273                QueueListEntry entry = subscriptions.getFirstEntry();
274                while (entry != null) {
275                    TransientQueueSubscription ts = (TransientQueueSubscription) entry
276                            .getElement();
277                    ts.close();
278                    entry = subscriptions.getNextEntry(entry);
279                }
280                subscriptions.clear();
281            }
282        }
283    
284        /**
285         * do some dispatching
286         */
287        public void run() {
288            // Only allow one thread at a time to dispatch.
289            synchronized (dispatchMutex) {
290                boolean dispatched = false;
291                boolean targeted = false;
292                ActiveMQMessage message = null;
293                int notDispatchedCount = 0;
294                int sleepTime = 250;
295                int iterationsWithoutDispatchingBeforeStopping = 10000 / sleepTime;// ~10
296                                                                                    // seconds
297                Map messageParts = new HashMap();
298                try {
299                    while (started.get() && running.get()) {
300                        dispatched = false;
301                        targeted = false;
302                        synchronized (subscriptionMutex) {
303                            if (!subscriptions.isEmpty()) {
304                                message = (ActiveMQMessage) queue
305                                        .dequeue(sleepTime);
306                                if (message != null) {
307                                    if (!message.isExpired()) {
308                                        QueueListEntry entry = subscriptions.getFirstEntry();
309                                        while (entry != null) {
310                                            TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement();
311                                            if (ts.isTarget(message)) {
312                                                targeted = true;
313                                                if (message.isMessagePart()) {
314                                                    TransientQueueSubscription sameTarget = (TransientQueueSubscription) messageParts
315                                                            .get(message.getParentMessageID());
316                                                    if (sameTarget == null) {
317                                                        sameTarget = ts;
318                                                        messageParts.put(message.getParentMessageID(),sameTarget);
319                                                    }
320                                                    sameTarget.doDispatch(message);
321                                                    if (message.isLastMessagePart()) {
322                                                        messageParts.remove(message.getParentMessageID());
323                                                    }
324                                                    message = null;
325                                                    dispatched = true;
326                                                    notDispatchedCount = 0;
327                                                    break;
328                                                } else if (ts.canAcceptMessages()) {
329                                                    ts.doDispatch(message);
330                                                    message = null;
331                                                    dispatched = true;
332                                                    notDispatchedCount = 0;
333                                                    subscriptions.rotate();
334                                                    break;
335                                                }
336                                            }
337                                            entry = subscriptions
338                                                    .getNextEntry(entry);
339                                        }
340                                    } else {
341                                        // expire message
342                                        if (log.isDebugEnabled()) {
343                                            log.debug("expired message: "+ message);
344                                        }
345                                        deadLetterPolicy.sendToDeadLetter(message);
346                                        message = null;
347                                    }
348                                }
349                            }
350                        }
351                        if (!dispatched) {
352                            if (message != null) {
353                                if (targeted) {
354                                    queue.enqueueFirstNoBlock(message);
355                                } else {
356                                    //no matching subscribers - dump to end and hope one shows up ...
357                                    queue.enqueueNoBlock(message);
358    
359                                }
360                            }
361                            if (running.get()) {
362                                if (notDispatchedCount++ > iterationsWithoutDispatchingBeforeStopping
363                                        && queue.isEmpty()) {
364                                    synchronized (running) {
365                                        running.commit(true, false);
366                                    }
367                                } else {
368                                    Thread.sleep(sleepTime);
369                                }
370                            }
371                        }
372                    }
373                } catch (InterruptedException ie) {
374                    //someone is stopping us from another thread
375                } catch (Throwable e) {
376                    log.warn("stop dispatching", e);
377                    stop();
378                }
379            }
380        }
381    
382        private TransientQueueSubscription findMatch(ConsumerInfo info) throws JMSException {
383            TransientQueueSubscription result = null;
384            synchronized(subscriptionMutex){
385                QueueListEntry entry = subscriptions.getFirstEntry();
386                while (entry != null) {
387                    TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement();
388                    if (ts.getConsumerInfo().equals(info)) {
389                        result = ts;
390                        break;
391                    }
392                    entry = subscriptions.getNextEntry(entry);
393                }
394            }
395            return result;
396        }
397    
398        /**
399         * @return the destination associated with this container
400         */
401        public ActiveMQDestination getDestination() {
402            return destination;
403        }
404    
405        /**
406         * @return the destination name
407         */
408        public String getDestinationName() {
409            return destination.getPhysicalName();
410        }
411    
412        /**
413         * @param msg
414         * @return @throws JMSException
415         */
416        public void addMessage(ActiveMQMessage msg) throws JMSException {
417        }
418    
419        /**
420         * @param messageIdentity
421         * @param ack
422         * @throws JMSException
423         */
424        public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
425        }
426    
427        /**
428         * @param messageIdentity
429         * @return @throws JMSException
430         */
431        public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
432            return null;
433        }
434    
435        /**
436         * @param messageIdentity
437         * @throws JMSException
438         */
439        public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
440        }
441    
442        /**
443         * @param messageIdentity
444         * @param ack
445         * @throws JMSException
446         */
447        public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException {
448        }
449    
450        /**
451         * @param messageIdentity
452         * @return @throws JMSException
453         */
454        public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
455            return false;
456        }
457    
458    
459        protected void clear() {
460            queue.clear();
461        }
462    
463        protected void removeExpiredMessages() {
464            long currentTime = System.currentTimeMillis();
465            List list = queue.getContents();
466            for (int i = 0;i < list.size();i++) {
467                ActiveMQMessage msg = (ActiveMQMessage) list.get(i);
468                if (msg.isExpired(currentTime)) {
469                    queue.remove(msg);
470                    if (log.isDebugEnabled()) {
471                        log.debug("expired message: " + msg);
472                    }
473                }
474            }
475        }
476        
477        protected void startRunning(){
478            if (!running.get() && started.get() && !subscriptions.isEmpty()) {
479                synchronized (running) {
480                    if (running.commit(false, true)) {
481                        try {
482                            threadPool.execute(this);
483                        }
484                        catch (InterruptedException e) {
485                           log.error(this + " Couldn't start executing ",e);
486                        }
487                    }
488                }
489            }
490        }
491    
492    
493        /**
494         * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
495         */
496        public MessageContainerAdmin getMessageContainerAdmin() {
497            return this;
498        }
499    
500        /**
501         * @see org.activemq.service.MessageContainerAdmin#empty()
502         */
503        public void empty() throws JMSException {
504            // TODO implement me.
505        }
506    
507    
508        /**
509         * @see org.activemq.service.MessageContainer#isDeadLetterQueue()
510         */
511        public boolean isDeadLetterQueue() {
512            return false;
513        }
514        
515        /**
516         * Dispatch an Advisory Message
517         * @param message
518         */
519        private synchronized void doAdvisoryDispatchMessage(ActiveMQMessage message) {
520            try {
521                if (message != null && message.isAdvisory() && !message.isExpired()) {
522                    synchronized (subscriptionMutex) {
523                        QueueListEntry entry = subscriptions.getFirstEntry();
524                        while (entry != null) {
525                            TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement();
526                            if (ts.isTarget(message)) {
527                                ts.doDispatch(message);
528                                break;
529                            }
530                            entry = subscriptions.getNextEntry(entry);
531                        }
532                    }
533                }
534            } catch (JMSException jmsEx) {
535                log.warn("Failed to dispatch advisory", jmsEx);
536            }
537        }
538    
539    }