001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import java.util.HashMap;
021    import java.util.Iterator;
022    import java.util.Map;
023    
024    import javax.jms.JMSException;
025    
026    import org.activemq.broker.BrokerClient;
027    import org.activemq.filter.Filter;
028    import org.activemq.message.ConsumerInfo;
029    import org.activemq.message.MessageAck;
030    import org.activemq.service.DeadLetterPolicy;
031    import org.activemq.service.Dispatcher;
032    import org.activemq.service.QueueListEntry;
033    import org.activemq.service.RedeliveryPolicy;
034    import org.activemq.service.TopicMessageContainer;
035    import org.activemq.service.TransactionManager;
036    import org.activemq.service.TransactionTask;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    /**
041     * Represents a durable topic subscription where the consumer has a unique
042     * clientID used to persist the messages across both Broker restarts and
043     * JMS client restarts
044     *
045     * @version $Revision: 1.1.1.1 $
046     */
047    public class DurableTopicSubscription extends SubscriptionImpl {
048    
049        private static final Log log = LogFactory.getLog(DurableTopicSubscription.class);
050    
051        private String persistentKey;
052    
053        public DurableTopicSubscription(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) {
054            super(dispatcher, client, info, filter, redeliveryPolicy,deadLetterPolicy);
055        }
056    
057        public synchronized void messageConsumed(MessageAck ack) throws JMSException {
058            if (ack.isExpired() || (!ack.isMessageRead() && !isBrowser())) {
059                super.messageConsumed(ack);
060            }
061            else {
062                final Map lastMessagePointersPerContainer = new HashMap();
063    
064                //remove up to this message
065                boolean found = false;
066                QueueListEntry queueEntry = messagePtrs.getFirstEntry();
067                while (queueEntry != null) {
068                    final MessagePointer pointer = (MessagePointer) queueEntry.getElement();
069                    
070                    if( !pointer.isDispatched() ) {
071                                  break;
072                    }
073                            
074                    messagePtrs.remove(queueEntry);
075                    lastMessagePointersPerContainer.put(pointer.getContainer(), pointer);
076                    unconsumedMessagesDispatched.decrement();
077    
078                    TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask(){
079                        public void execute() throws Throwable {                        
080                            unconsumedMessagesDispatched.increment();
081                            MessagePointer p = new MessagePointer(pointer);
082                            p.setRedelivered(true);
083                            messagePtrs.add(p);
084                            dispatch.wakeup(DurableTopicSubscription.this);
085                            lastMessageIdentity = pointer.getMessageIdentity();
086                        }
087                    });
088    
089                    TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
090                        public void execute() throws Throwable {                        
091                            // now lets tell each container to update its lastAcknowlegedMessageID
092                            for (Iterator iter = lastMessagePointersPerContainer.entrySet().iterator(); iter.hasNext();) {
093                                Map.Entry entry = (Map.Entry) iter.next();
094                                TopicMessageContainer container = (TopicMessageContainer) entry.getKey();
095                                MessagePointer pointer = (MessagePointer) entry.getValue();
096                                container.setLastAcknowledgedMessageID(DurableTopicSubscription.this, pointer.getMessageIdentity());
097                            }
098                        }
099                    });
100                    
101                    if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
102                        found = true;
103                        break;
104                    }
105                    queueEntry = messagePtrs.getNextEntry(queueEntry);
106                }
107                if (!found) {
108                    log.debug("Did not find a matching message for identity: " + ack.getMessageIdentity());
109                }
110                //System.out.println("Message consumed. Remaining: " + messagePtrs.size() + " unconsumedMessagesDispatched: " + unconsumedMessagesDispatched.get());
111                dispatch.wakeup(this);
112            }
113        }
114    
115        public String getPersistentKey() {
116            if (persistentKey == null) {
117                persistentKey = "[" + getClientId() + ":" + getSubscriberName() + "]";
118            }
119            return persistentKey;
120        }
121    }