001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import java.util.Collections; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 import java.util.Set; 025 026 import javax.jms.DeliveryMode; 027 import javax.jms.Destination; 028 import javax.jms.JMSException; 029 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 import org.activemq.broker.BrokerClient; 033 import org.activemq.filter.AndFilter; 034 import org.activemq.filter.DestinationMap; 035 import org.activemq.filter.Filter; 036 import org.activemq.filter.FilterFactory; 037 import org.activemq.filter.FilterFactoryImpl; 038 import org.activemq.filter.NoLocalFilter; 039 import org.activemq.message.ActiveMQDestination; 040 import org.activemq.message.ActiveMQMessage; 041 import org.activemq.message.ActiveMQQueue; 042 import org.activemq.message.ConsumerInfo; 043 import org.activemq.message.MessageAck; 044 import org.activemq.service.DeadLetterPolicy; 045 import org.activemq.service.Dispatcher; 046 import org.activemq.service.MessageContainer; 047 import org.activemq.service.QueueList; 048 import org.activemq.service.QueueListEntry; 049 import org.activemq.service.QueueMessageContainer; 050 import org.activemq.service.QueueMessageContainerManager; 051 import org.activemq.service.RedeliveryPolicy; 052 import org.activemq.service.Subscription; 053 import org.activemq.service.SubscriptionContainer; 054 import org.activemq.service.TransactionManager; 055 import org.activemq.service.TransactionTask; 056 import org.activemq.store.PersistenceAdapter; 057 058 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 059 060 /** 061 * A default Broker used for Queue messages 062 * 063 * @version $Revision: 1.1.1.1 $ 064 */ 065 public class DurableQueueMessageContainerManager extends MessageContainerManagerSupport implements QueueMessageContainerManager { 066 private static final Log log = LogFactory.getLog(DurableQueueMessageContainerManager.class); 067 private static final int MAX_MESSAGES_DISPATCHED_FROM_POLL = 50; 068 069 private PersistenceAdapter persistenceAdapter; 070 protected SubscriptionContainer subscriptionContainer; 071 protected FilterFactory filterFactory; 072 protected Map activeSubscriptions = new ConcurrentHashMap(); 073 protected Map browsers = new ConcurrentHashMap(); 074 protected Map messagePartSubscribers = new ConcurrentHashMap(); 075 protected DestinationMap destinationMap = new DestinationMap(); 076 private Object subscriptionMutex = new Object(); 077 078 079 080 081 public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) { 082 this(persistenceAdapter, new SubscriptionContainerImpl(redeliveryPolicy,deadLetterPolicy), new FilterFactoryImpl(), new DispatcherImpl()); 083 } 084 085 public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) { 086 super(dispatcher); 087 this.persistenceAdapter = persistenceAdapter; 088 this.subscriptionContainer = subscriptionContainer; 089 this.filterFactory = filterFactory; 090 } 091 092 /** 093 * Answers true if this ContainerManager is interested in managing the destination. 094 * 095 * @param destination 096 * @param b 097 * @return 098 */ 099 private boolean isManagerFor(ActiveMQDestination destination) { 100 return destination!=null && destination.isQueue() && !destination.isTemporary(); 101 } 102 103 /** 104 * Answers true if this ContainerManager is interested in handing a operation of 105 * on the provided destination. persistentOp is true when the opperation is persistent. 106 * 107 * @param destination 108 * @param persistentOp 109 * @param b 110 * @return 111 */ 112 private boolean isManagerFor(ActiveMQDestination destination, boolean persistentOp) { 113 // We are going to handle both persistent and non persistent operations for now. 114 return isManagerFor(destination) && persistentOp; 115 } 116 117 /** 118 * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination} 119 * objects used by non-broker consumers directly connected to this container 120 * 121 * @return 122 */ 123 public Map getLocalDestinations() { 124 Map localDestinations = new HashMap(); 125 for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) { 126 Subscription sub = (Subscription) iter.next(); 127 if (sub.isLocalSubscription()) { 128 final ActiveMQDestination dest = sub.getDestination(); 129 localDestinations.put(dest.getPhysicalName(), dest); 130 } 131 } 132 return Collections.unmodifiableMap(localDestinations); 133 } 134 135 /** 136 * @param client 137 * @param info 138 * @throws javax.jms.JMSException 139 */ 140 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 141 142 // Are we not intrested in handling that destination? 143 if( !isManagerFor(info.getDestination()) ) { 144 return; 145 } 146 147 if (log.isDebugEnabled()) { 148 log.debug("Adding consumer: " + info); 149 } 150 151 //ensure a matching container exists for the destination 152 getContainer(info.getDestination().getPhysicalName()); 153 154 Subscription sub = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info)); 155 dispatcher.addActiveSubscription(client, sub); 156 updateActiveSubscriptions(sub); 157 158 // set active last in case we end up dispatching some messages 159 // while recovering 160 sub.setActive(true); 161 } 162 163 /** 164 * @param client 165 * @param info 166 * @throws javax.jms.JMSException 167 */ 168 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 169 if (log.isDebugEnabled()) { 170 log.debug("Removing consumer: " + info); 171 } 172 if (info.getDestination() != null && info.getDestination().isQueue()) { 173 synchronized (subscriptionMutex) { 174 Subscription sub = (Subscription) subscriptionContainer.removeSubscription(info.getConsumerId()); 175 if (sub != null) { 176 sub.setActive(false); 177 sub.clear();//resets entries in the QueueMessageContainer 178 dispatcher.removeActiveSubscription(client, sub); 179 //need to do wildcards for this - but for now use exact matches 180 for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) { 181 QueueMessageContainer container = (QueueMessageContainer) iter.next(); 182 //should change this for wild cards ... 183 if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) { 184 QueueList list = getSubscriptionList(container); 185 list.remove(sub); 186 if (list.isEmpty()) { 187 activeSubscriptions.remove(sub.getDestination().getPhysicalName()); 188 } 189 list = getBrowserList(container); 190 list.remove(sub); 191 if (list.isEmpty()) { 192 browsers.remove(sub.getDestination().getPhysicalName()); 193 } 194 } 195 } 196 } 197 } 198 } 199 } 200 201 /** 202 * Delete a durable subscriber 203 * 204 * @param clientId 205 * @param subscriberName 206 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active 207 */ 208 public void deleteSubscription(String clientId, String subscriberName) throws JMSException { 209 } 210 211 /** 212 * @param client 213 * @param message 214 * @throws javax.jms.JMSException 215 */ 216 public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException { 217 218 ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination(); 219 // Are we not intrested in handling that destination? 220 if( !isManagerFor(dest, message.getJMSDeliveryMode()==DeliveryMode.PERSISTENT) ) { 221 return; 222 } 223 224 if (log.isDebugEnabled()) { 225 log.debug("Dispaching message: " + message); 226 } 227 //ensure a matching container exists for the destination 228 getContainer(((ActiveMQDestination) message.getJMSDestination()).getPhysicalName()); 229 Set set = destinationMap.get(message.getJMSActiveMQDestination()); 230 for (Iterator i = set.iterator();i.hasNext();) { 231 QueueMessageContainer container = (QueueMessageContainer) i.next(); 232 container.addMessage(message); 233 // Once transaction has completed.. dispatch the message. 234 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){ 235 public void execute() throws Throwable { 236 dispatcher.wakeup(); 237 updateSendStats(client, message); 238 } 239 }); 240 241 } 242 } 243 244 /** 245 * Acknowledge a message as being read and consumed by the Consumer 246 * 247 * @param client 248 * @param ack 249 * @throws javax.jms.JMSException 250 */ 251 public void acknowledgeMessage(final BrokerClient client, final MessageAck ack) throws JMSException { 252 // Are we not intrested in handling that destination? 253 if( !isManagerFor(ack.getDestination(), ack.isPersistent()) ) { 254 return; 255 } 256 final Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId()); 257 if (sub == null){ 258 return; 259 } 260 261 sub.messageConsumed(ack); 262 if (ack.isMessageRead()) { 263 updateAcknowledgeStats(client, sub); 264 } 265 } 266 267 /** 268 * Poll for messages 269 * 270 * @throws javax.jms.JMSException 271 */ 272 public void poll() throws JMSException { 273 synchronized (subscriptionMutex) { 274 for (Iterator iter = activeSubscriptions.keySet().iterator(); iter.hasNext();) { 275 QueueMessageContainer container = (QueueMessageContainer) iter.next(); 276 277 QueueList browserList = (QueueList) browsers.get(container); 278 doPeek(container, browserList); 279 QueueList list = (QueueList) activeSubscriptions.get(container); 280 doPoll(container, list); 281 } 282 } 283 } 284 285 public MessageContainer getContainer(String destinationName) throws JMSException { 286 MessageContainer container = (MessageContainer) messageContainers.get(destinationName); 287 if (container == null) { 288 synchronized (subscriptionMutex) { 289 container = super.getContainer(destinationName); 290 } 291 } 292 return container; 293 } 294 295 // Implementation methods 296 //------------------------------------------------------------------------- 297 298 protected MessageContainer createContainer(String destinationName) throws JMSException { 299 QueueMessageContainer container = new DurableQueueMessageContainer(persistenceAdapter, persistenceAdapter.createQueueMessageStore(destinationName), destinationName); 300 301 //Add any interested Subscriptions to the new Container 302 for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) { 303 Subscription sub = (Subscription) iter.next(); 304 if (sub.isBrowser()) { 305 updateBrowsers(container, sub); 306 } 307 else { 308 updateActiveSubscriptions(container, sub); 309 } 310 } 311 312 ActiveMQDestination key = new ActiveMQQueue(destinationName); 313 destinationMap.put(key, container); 314 return container; 315 } 316 317 protected Destination createDestination(String destinationName) { 318 return new ActiveMQQueue(destinationName); 319 } 320 321 private void doPeek(QueueMessageContainer container, QueueList browsers) throws JMSException { 322 if (browsers != null && browsers.size() > 0) { 323 for (int i = 0; i < browsers.size(); i++) { 324 SubscriptionImpl sub = (SubscriptionImpl) browsers.get(i); 325 int count = 0; 326 ActiveMQMessage msg = null; 327 do { 328 msg = container.peekNext(sub.getLastMessageIdentity()); 329 if (msg != null) { 330 if (sub.isTarget(msg)) { 331 System.out.println("browser dispatch: "+msg.getJMSMessageID()); 332 sub.addMessage(container, msg); 333 dispatcher.wakeup(sub); 334 } 335 else { 336 sub.setLastMessageIdentifier(msg.getJMSMessageIdentity()); 337 } 338 } 339 } 340 while (msg != null && !sub.isAtPrefetchLimit() && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL); 341 } 342 } 343 } 344 345 private void doPoll(QueueMessageContainer container, QueueList subList) throws JMSException { 346 int count = 0; 347 ActiveMQMessage msg = null; 348 if (subList != null && subList.size() > 0) { 349 do { 350 boolean dispatched = false; 351 msg = container.poll(); 352 if (msg != null) { 353 QueueListEntry entry = subList.getFirstEntry(); 354 boolean targeted = false; 355 while (entry != null) { 356 SubscriptionImpl sub = (SubscriptionImpl) entry.getElement(); 357 if (sub.isTarget(msg)) { 358 targeted = true; 359 if (msg.isMessagePart()){ 360 SubscriptionImpl sameTarget = (SubscriptionImpl)messagePartSubscribers.get(msg.getParentMessageID()); 361 if (sameTarget == null){ 362 sameTarget = sub; 363 messagePartSubscribers.put(msg.getParentMessageID(), sameTarget); 364 } 365 sameTarget.addMessage(container,msg); 366 if (msg.isLastMessagePart()){ 367 messagePartSubscribers.remove(msg.getParentMessageID()); 368 } 369 dispatched = true; 370 dispatcher.wakeup(sameTarget); 371 break; 372 }else if (!sub.isAtPrefetchLimit()) { 373 System.out.println("dispatching: "+msg.getJMSMessageID()); 374 sub.addMessage(container, msg); 375 dispatched = true; 376 dispatcher.wakeup(sub); 377 subList.rotate(); //round-robin the list 378 break; 379 } 380 } 381 entry = subList.getNextEntry(entry); 382 } 383 if (!dispatched) { 384 if (targeted) { //ie. it can be selected by current active consumers - but they are at 385 // pre-fectch 386 // limit 387 container.returnMessage(msg.getJMSMessageIdentity()); 388 } 389 break; 390 } 391 } 392 } 393 while (msg != null && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL); 394 } 395 } 396 397 private void updateActiveSubscriptions(Subscription subscription) throws JMSException { 398 //need to do wildcards for this - but for now use exact matches 399 synchronized (subscriptionMutex) { 400 boolean processedSubscriptionContainer = false; 401 402 String subscriptionPhysicalName = subscription.getDestination().getPhysicalName(); 403 for (Iterator iter = messageContainers.entrySet().iterator(); iter.hasNext();) { 404 Map.Entry entry = (Map.Entry) iter.next(); 405 String destinationName = (String) entry.getKey(); 406 QueueMessageContainer container = (QueueMessageContainer) entry.getValue(); 407 408 if (destinationName.equals(subscriptionPhysicalName)) { 409 processedSubscriptionContainer = true; 410 } 411 processSubscription(subscription, container); 412 } 413 if (!processedSubscriptionContainer) { 414 processSubscription(subscription, (QueueMessageContainer) getContainer(subscriptionPhysicalName)); 415 } 416 } 417 } 418 419 protected void processSubscription(Subscription subscription, QueueMessageContainer container) throws JMSException { 420 // TODO should change this for wild cards ... 421 if (subscription.isBrowser()) { 422 updateBrowsers(container, subscription); 423 } 424 else { 425 updateActiveSubscriptions(container, subscription); 426 } 427 } 428 429 private void updateActiveSubscriptions(QueueMessageContainer container, Subscription sub) throws JMSException { 430 //need to do wildcards for this - but for now use exact matches 431 //should change this for wild cards ... 432 if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) { 433 container.reset();//reset container - flushing all filter out messages to new consumer 434 QueueList list = getSubscriptionList(container); 435 if (!list.contains(sub)) { 436 list.add(sub); 437 } 438 } 439 } 440 441 private QueueList getSubscriptionList(QueueMessageContainer container) { 442 QueueList list = (QueueList) activeSubscriptions.get(container); 443 if (list == null) { 444 list = new DefaultQueueList(); 445 activeSubscriptions.put(container, list); 446 } 447 return list; 448 } 449 450 private void updateBrowsers(QueueMessageContainer container, Subscription sub) throws JMSException { 451 //need to do wildcards for this - but for now use exact matches 452 //should change this for wild cards ... 453 if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) { 454 container.reset();//reset container - flushing all filter out messages to new consumer 455 QueueList list = getBrowserList(container); 456 if (!list.contains(sub)) { 457 list.add(sub); 458 } 459 } 460 } 461 462 private QueueList getBrowserList(QueueMessageContainer container) { 463 QueueList list = (QueueList) browsers.get(container); 464 if (list == null) { 465 list = new DefaultQueueList(); 466 browsers.put(container, list); 467 } 468 return list; 469 } 470 471 /** 472 * Create filter for a Consumer 473 * 474 * @param info 475 * @return the Fitler 476 * @throws javax.jms.JMSException 477 */ 478 protected Filter createFilter(ConsumerInfo info) throws JMSException { 479 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector()); 480 if (info.isNoLocal()) { 481 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId())); 482 } 483 return filter; 484 } 485 486 public void createMessageContainer(ActiveMQDestination dest) throws JMSException { 487 // This container only does queues. 488 if(!dest.isQueue()) 489 return; 490 super.createMessageContainer(dest); 491 } 492 493 public synchronized void destroyMessageContainer(ActiveMQDestination dest) throws JMSException { 494 // This container only does queues. 495 if(!dest.isQueue()) 496 return; 497 super.destroyMessageContainer(dest); 498 destinationMap.removeAll(dest); 499 } 500 501 /** 502 * Add a message to a dead letter queue 503 * @param deadLetterName 504 * @param message 505 * @throws JMSException 506 */ 507 public void sendToDeadLetterQueue(String deadLetterName,ActiveMQMessage message) throws JMSException{ 508 QueueMessageContainer container = (QueueMessageContainer)getContainer(deadLetterName); 509 container.setDeadLetterQueue(true); 510 container.addMessage(message); 511 dispatcher.wakeup(); 512 } 513 }