001 /** 002 * Copyright 2004 Protique Ltd 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 **/ 017 018 package org.activemq.service.impl; 019 import java.util.Collections; 020 import java.util.HashMap; 021 import java.util.Iterator; 022 import java.util.Map; 023 import java.util.Set; 024 025 import javax.jms.DeliveryMode; 026 import javax.jms.Destination; 027 import javax.jms.InvalidDestinationException; 028 import javax.jms.JMSException; 029 import javax.jms.Topic; 030 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 import org.activemq.DuplicateDurableSubscriptionException; 034 import org.activemq.broker.BrokerClient; 035 import org.activemq.filter.AndFilter; 036 import org.activemq.filter.DestinationMap; 037 import org.activemq.filter.Filter; 038 import org.activemq.filter.FilterFactory; 039 import org.activemq.filter.FilterFactoryImpl; 040 import org.activemq.filter.NoLocalFilter; 041 import org.activemq.message.ActiveMQDestination; 042 import org.activemq.message.ActiveMQMessage; 043 import org.activemq.message.ActiveMQTopic; 044 import org.activemq.message.ConsumerInfo; 045 import org.activemq.message.MessageAck; 046 import org.activemq.service.DeadLetterPolicy; 047 import org.activemq.service.Dispatcher; 048 import org.activemq.service.MessageContainer; 049 import org.activemq.service.RedeliveryPolicy; 050 import org.activemq.service.Subscription; 051 import org.activemq.service.SubscriptionContainer; 052 import org.activemq.service.TopicMessageContainer; 053 import org.activemq.service.TransactionManager; 054 import org.activemq.service.TransactionTask; 055 import org.activemq.store.PersistenceAdapter; 056 057 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 058 059 /** 060 * A default Broker used for Topic messages for durable consumers 061 * 062 * @version $Revision: 1.1.1.1 $ 063 */ 064 public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport { 065 private static final Log log = LogFactory.getLog(DurableTopicMessageContainerManager.class); 066 private PersistenceAdapter persistenceAdapter; 067 protected SubscriptionContainer subscriptionContainer; 068 protected FilterFactory filterFactory; 069 protected Map activeSubscriptions = new ConcurrentHashMap(); 070 private DestinationMap destinationMap = new DestinationMap(); 071 private ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap(); 072 073 public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) { 074 this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(redeliveryPolicy,deadLetterPolicy), new FilterFactoryImpl(), 075 new DispatcherImpl()); 076 } 077 078 public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, 079 SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) { 080 super(dispatcher); 081 this.persistenceAdapter = persistenceAdapter; 082 this.subscriptionContainer = subscriptionContainer; 083 this.filterFactory = filterFactory; 084 try { 085 loadAllMessageContainers(); 086 } 087 catch (JMSException e) { 088 log.error("Failed to load initial Topic Containers",e); 089 } 090 } 091 092 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 093 if (info.isDurableTopic()) { 094 if (log.isDebugEnabled()) { 095 log.debug("Adding consumer: " + info); 096 } 097 098 doAddMessageConsumer(client, info); 099 } 100 } 101 102 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 103 // we should not remove a durable topic subscription from the subscriptionContainer 104 // unless via the deleteSubscription() method 105 106 // subscriptionContainer.removeSubscription(info.getConsumerId()); 107 // subscription.clear(); 108 109 Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId()); 110 if (sub != null) { 111 sub.setActive(false); 112 dispatcher.removeActiveSubscription(client, sub); 113 } 114 } 115 116 /** 117 * Delete a durable subscriber 118 * 119 * @param clientId 120 * @param subscriberName 121 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active 122 */ 123 public void deleteSubscription(String clientId, String subscriberName) throws JMSException { 124 125 String consumerKey = ConsumerInfo.generateConsumerKey(clientId, subscriberName); 126 Subscription sub = (Subscription) durableSubscriptions.remove(consumerKey); 127 if( sub!=null ) { 128 //only delete if not active 129 if (sub.isActive()) { 130 throw new JMSException("The Consummer " + subscriberName + " is still active"); 131 } 132 else { 133 subscriptionContainer.removeSubscription(sub.getConsumerId()); 134 sub.clear(); 135 136 Set containers = destinationMap.get(sub.getDestination()); 137 for (Iterator iter = containers.iterator();iter.hasNext();) { 138 TopicMessageContainer container = (TopicMessageContainer) iter.next(); 139 if (container instanceof DurableTopicMessageContainer) { 140 ((DurableTopicMessageContainer) container).deleteSubscription(sub.getPersistentKey()); 141 } 142 } 143 144 } 145 } else { 146 throw new InvalidDestinationException("The Consumer " + subscriberName + " does not exist for client: " 147 + clientId); 148 } 149 } 150 151 /** 152 * @param client 153 * @param message 154 * @throws javax.jms.JMSException 155 */ 156 public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException { 157 ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination(); 158 if (dest != null && dest.isTopic() && message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) { 159 // note that we still need to persist the message even if there are no matching 160 // subscribers as they may come along later 161 // plus we don't pre-load subscription information 162 final MessageContainer container = getContainer(dest.getPhysicalName()); 163 container.addMessage(message); 164 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() { 165 public void execute() throws Throwable { 166 doSendMessage(client, message, container); 167 } 168 }); 169 } 170 } 171 172 /** 173 * @param client 174 * @param message 175 * @throws JMSException 176 */ 177 private void doSendMessage(BrokerClient client, ActiveMQMessage message, MessageContainer container) throws JMSException { 178 Set matchingSubscriptions = subscriptionContainer.getSubscriptions(message.getJMSActiveMQDestination()); 179 if (!matchingSubscriptions.isEmpty()) { 180 for (Iterator i = matchingSubscriptions.iterator();i.hasNext();) { 181 Subscription sub = (Subscription) i.next(); 182 if (sub.isTarget(message)) { 183 sub.addMessage(container, message); 184 } 185 } 186 updateSendStats(client, message); 187 } 188 } 189 190 /** 191 * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination} 192 * objects used by non-broker consumers directly connected to this container 193 * 194 * @return 195 */ 196 public Map getLocalDestinations() { 197 Map localDestinations = new HashMap(); 198 for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) { 199 Subscription sub = (Subscription) iter.next(); 200 if (sub.isLocalSubscription()) { 201 final ActiveMQDestination dest = sub.getDestination(); 202 localDestinations.put(dest.getPhysicalName(), dest); 203 } 204 } 205 return Collections.unmodifiableMap(localDestinations); 206 } 207 208 /** 209 * Acknowledge a message as being read and consumed byh the Consumer 210 * 211 * @param client 212 * @param ack 213 * @throws javax.jms.JMSException 214 */ 215 public void acknowledgeMessage(BrokerClient client, final MessageAck ack) throws JMSException { 216 if ( !ack.getDestination().isTopic() || !ack.isPersistent()) 217 return; 218 219 Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId()); 220 if (sub == null) { 221 return; 222 } 223 224 sub.messageConsumed(ack); 225 } 226 227 /** 228 * poll or messages 229 * 230 * @throws javax.jms.JMSException 231 */ 232 public void poll() throws JMSException { 233 //do nothing 234 } 235 236 // Implementation methods 237 //------------------------------------------------------------------------- 238 protected MessageContainer createContainer(String destinationName) throws JMSException { 239 TopicMessageContainer topicMessageContainer = 240 new DurableTopicMessageContainer(this, persistenceAdapter.createTopicMessageStore(destinationName), destinationName); 241 destinationMap.put(new ActiveMQTopic(destinationName), topicMessageContainer); 242 return topicMessageContainer; 243 } 244 245 protected Destination createDestination(String destinationName) { 246 return new ActiveMQTopic(destinationName); 247 } 248 249 public boolean isConsumerActiveOnDestination(ActiveMQDestination dest) { 250 for (Iterator iter = activeSubscriptions.values().iterator();iter.hasNext();) { 251 Subscription subscription = (Subscription) iter.next(); 252 if( subscription.getDestination().equals(dest) ) { 253 return true; 254 } 255 } 256 return false; 257 } 258 259 protected void doAddMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 260 boolean shouldRecover = false; 261 if (info.getConsumerName() != null && info.getClientId() != null) { 262 for (Iterator iter = activeSubscriptions.values().iterator();iter.hasNext();) { 263 Subscription subscription = (Subscription) iter.next(); 264 if (subscription.isSameDurableSubscription(info)) { 265 throw new DuplicateDurableSubscriptionException(info); 266 } 267 } 268 } 269 Subscription subscription = (Subscription) durableSubscriptions.get(info.getConsumerKey()); 270 //subscriptionContainer.getSubscription(info.getConsumerId()); 271 if (subscription != null) { 272 //check the subscription hasn't changed 273 if (!equal(subscription.getDestination(), info.getDestination()) 274 || !equal(subscription.getSelector(), info.getSelector())) { 275 subscriptionContainer.removeSubscription(info.getConsumerId()); 276 subscription.clear(); 277 subscription = subscriptionContainer.makeSubscription(dispatcher, client, info, createFilter(info)); 278 durableSubscriptions.put(info.getConsumerKey(), subscription); 279 } 280 } 281 else { 282 subscription = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info)); 283 shouldRecover = true; 284 durableSubscriptions.put(info.getConsumerKey(), subscription); 285 } 286 subscription.setActiveConsumer(client,info); 287 activeSubscriptions.put(info.getConsumerId(), subscription); 288 dispatcher.addActiveSubscription(client, subscription); 289 290 // load the container 291 getContainer(subscription.getDestination().getPhysicalName()); 292 293 Set containers = destinationMap.get(subscription.getDestination()); 294 for (Iterator iter = containers.iterator();iter.hasNext();) { 295 TopicMessageContainer container = (TopicMessageContainer) iter.next(); 296 if (container instanceof DurableTopicMessageContainer) { 297 ((DurableTopicMessageContainer) container).storeSubscription(info, subscription); 298 } 299 } 300 if (shouldRecover) { 301 recoverSubscriptions(subscription); 302 } 303 // lets not make the subscription active until later 304 // as we can't start dispatching until we've sent back the receipt 305 // TODO we might wish to register a post-receipt action here 306 // to perform the wakeup 307 subscription.setActive(true); 308 //dispatcher.wakeup(subscription); 309 } 310 311 /** 312 * Returns true if the two objects are null or are equal 313 */ 314 protected final boolean equal(Object o1, Object o2) { 315 return o1 == o2 || (o1 != null && o1.equals(o2)); 316 } 317 318 /** 319 * This method is called when a new durable subscription is started and so we need to go through each matching 320 * message container and dispatch any matching messages that may be outstanding 321 * 322 * @param subscription 323 */ 324 protected void recoverSubscriptions(Subscription subscription) throws JMSException { 325 // we should load all of the message containers from disk if we're a wildcard 326 327 getContainer(subscription.getDestination().getPhysicalName()); 328 Set containers = destinationMap.get(subscription.getDestination()); 329 for (Iterator iter = containers.iterator();iter.hasNext();) { 330 TopicMessageContainer container = (TopicMessageContainer) iter.next(); 331 container.recoverSubscription(subscription); 332 } 333 } 334 335 /** 336 * Called when recovering a wildcard subscription where we need to load all the durable message containers (for 337 * which we have any outstanding messages to deliver) into RAM 338 */ 339 protected void loadAllMessageContainers() throws JMSException { 340 Map destinations = persistenceAdapter.getInitialDestinations(); 341 if (destinations != null) { 342 for (Iterator iter = destinations.entrySet().iterator();iter.hasNext();) { 343 Map.Entry entry = (Map.Entry) iter.next(); 344 String name = (String) entry.getKey(); 345 Destination destination = (Destination) entry.getValue(); 346 if ( destination instanceof Topic ) { 347 loadContainer(name, destination); 348 } 349 } 350 } 351 } 352 353 /** 354 * Create filter for a Consumer 355 * 356 * @param info 357 * @return the Fitler 358 * @throws javax.jms.JMSException 359 */ 360 protected Filter createFilter(ConsumerInfo info) throws JMSException { 361 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector()); 362 if (info.isNoLocal()) { 363 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId())); 364 } 365 return filter; 366 } 367 368 public void createMessageContainer(ActiveMQDestination dest) throws JMSException { 369 // This container only does topics. 370 if(!dest.isTopic()) 371 return; 372 super.createMessageContainer(dest); 373 } 374 375 public synchronized void destroyMessageContainer(ActiveMQDestination dest) throws JMSException { 376 // This container only does topics. 377 if(!dest.isTopic()) 378 return; 379 super.destroyMessageContainer(dest); 380 destinationMap.removeAll(dest); 381 } 382 }