001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.service.impl; 020 import java.util.ArrayList; 021 import java.util.List; 022 import javax.jms.JMSException; 023 import org.apache.commons.logging.Log; 024 import org.apache.commons.logging.LogFactory; 025 import org.activemq.broker.BrokerClient; 026 import org.activemq.broker.BrokerConnector; 027 import org.activemq.broker.BrokerContainer; 028 import org.activemq.broker.Broker; 029 import org.activemq.filter.Filter; 030 import org.activemq.message.ActiveMQDestination; 031 import org.activemq.message.ActiveMQMessage; 032 import org.activemq.message.BrokerInfo; 033 import org.activemq.message.ConsumerInfo; 034 import org.activemq.message.MessageAck; 035 import org.activemq.service.DeadLetterPolicy; 036 import org.activemq.service.Dispatcher; 037 import org.activemq.service.MessageContainer; 038 import org.activemq.service.MessageIdentity; 039 import org.activemq.service.QueueList; 040 import org.activemq.service.QueueListEntry; 041 import org.activemq.service.RedeliveryPolicy; 042 import org.activemq.service.SubscriberEntry; 043 import org.activemq.service.Subscription; 044 import org.activemq.service.TransactionManager; 045 import org.activemq.service.TransactionTask; 046 import org.activemq.security.SecurityAdapter; 047 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 048 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 049 050 /** 051 * A Subscription holds messages to be dispatched to a a Client Consumer 052 * 053 * @version $Revision: 1.1.1.1 $ 054 */ 055 public class SubscriptionImpl implements Subscription { 056 private static final Log log = LogFactory.getLog(SubscriptionImpl.class); 057 private String clientId; 058 private String subscriberName; 059 private ActiveMQDestination destination; 060 private String selector; 061 private int prefetchLimit; 062 private boolean noLocal; 063 private int consumerNumber; 064 private String consumerId; 065 private boolean browser; 066 protected Dispatcher dispatch; 067 protected String brokerName; 068 protected String clusterName; 069 protected MessageIdentity lastMessageIdentity; 070 private Filter filter; 071 protected SynchronizedInt unconsumedMessagesDispatched = new SynchronizedInt(0); 072 protected QueueList messagePtrs = new DefaultQueueList(); 073 private boolean usePrefetch = true; 074 private SubscriberEntry subscriberEntry; 075 private BrokerClient activeClient; 076 private RedeliveryPolicy redeliveryPolicy; 077 private DeadLetterPolicy deadLetterPolicy; 078 private SynchronizedBoolean active = new SynchronizedBoolean(false); 079 private Object lock = new Object(); 080 081 /** 082 * Create a Subscription object that holds messages to be dispatched to a Consumer 083 * 084 * @param dispatcher 085 * @param client 086 * @param info 087 * @param filter 088 * @param redeliveryPolicy 089 * @param deadLetterPolicy 090 */ 091 public SubscriptionImpl(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter, 092 RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) { 093 this.dispatch = dispatcher; 094 this.filter = filter; 095 this.redeliveryPolicy = redeliveryPolicy; 096 this.deadLetterPolicy = deadLetterPolicy; 097 setActiveConsumer(client, info); 098 } 099 100 /** 101 * Set the active consumer info 102 * 103 * @param client 104 * @param info 105 */ 106 public void setActiveConsumer(BrokerClient client, ConsumerInfo info) { 107 if (info != null) { 108 this.clientId = info.getClientId(); 109 this.subscriberName = info.getConsumerName(); 110 this.noLocal = info.isNoLocal(); 111 this.destination = info.getDestination(); 112 this.selector = info.getSelector(); 113 this.prefetchLimit = info.getPrefetchNumber(); 114 this.consumerNumber = info.getConsumerNo(); 115 this.consumerId = info.getConsumerId(); 116 this.browser = info.isBrowser(); 117 } 118 this.activeClient = client; 119 if (client != null) { 120 BrokerConnector brokerConnector = client.getBrokerConnector(); 121 if (brokerConnector != null) { 122 BrokerInfo brokerInfo = brokerConnector.getBrokerInfo(); 123 if (brokerInfo != null) { 124 brokerName = brokerInfo.getBrokerName(); 125 clusterName = brokerInfo.getClusterName(); 126 } 127 } 128 } 129 } 130 131 /** 132 * @return pretty print of the Subscription 133 */ 134 public String toString() { 135 String str = "SubscriptionImpl(" + super.hashCode() + ")[" + consumerId + "]" + clientId + ": " 136 + subscriberName + " : " + destination; 137 return str; 138 } 139 140 /** 141 * Called when the Subscription is discarded 142 * 143 * @throws JMSException 144 */ 145 public void clear() throws JMSException { 146 synchronized (lock) { 147 QueueListEntry entry = messagePtrs.getFirstEntry(); 148 while (entry != null) { 149 MessagePointer pointer = (MessagePointer) entry.getElement(); 150 pointer.clear(); 151 entry = messagePtrs.getNextEntry(entry); 152 } 153 messagePtrs.clear(); 154 } 155 } 156 157 /** 158 * Called when an active subscriber has closed. This resets all MessagePtrs 159 * 160 * @throws JMSException 161 */ 162 public void reset() throws JMSException { 163 synchronized (lock) { 164 QueueListEntry entry = messagePtrs.getFirstEntry(); 165 while (entry != null) { 166 MessagePointer pointer = (MessagePointer) entry.getElement(); 167 if (pointer.isDispatched() && !pointer.isDeleted()) { 168 pointer.reset(); 169 pointer.setRedelivered(true); 170 } 171 else { 172 break; 173 } 174 entry = messagePtrs.getNextEntry(entry); 175 } 176 } 177 } 178 179 public BrokerClient getActiveClient() { 180 return activeClient; 181 } 182 183 /** 184 * @return Returns the clientId. 185 */ 186 public String getClientId() { 187 return clientId; 188 } 189 190 /** 191 * @param clientId The clientId to set. 192 */ 193 public void setClientId(String clientId) { 194 this.clientId = clientId; 195 } 196 197 /** 198 * @return Returns the filter. 199 */ 200 public Filter getFilter() { 201 return filter; 202 } 203 204 /** 205 * @param filter The filter to set. 206 */ 207 public void setFilter(Filter filter) { 208 this.filter = filter; 209 } 210 211 public boolean isWildcard() { 212 return filter.isWildcard(); 213 } 214 215 public String getPersistentKey() { 216 // not required other than for persistent topic subscriptions 217 return null; 218 } 219 220 public boolean isSameDurableSubscription(ConsumerInfo info) throws JMSException { 221 if (isDurableTopic()) { 222 return equal(clientId, info.getClientId()) && equal(subscriberName, info.getConsumerName()); 223 } 224 return false; 225 } 226 227 /** 228 * @return Returns the noLocal. 229 */ 230 public boolean isNoLocal() { 231 return noLocal; 232 } 233 234 /** 235 * @param noLocal The noLocal to set. 236 */ 237 public void setNoLocal(boolean noLocal) { 238 this.noLocal = noLocal; 239 } 240 241 /** 242 * @return Returns the subscriberName. 243 */ 244 public String getSubscriberName() { 245 return subscriberName; 246 } 247 248 /** 249 * @param subscriberName The subscriberName to set. 250 */ 251 public void setSubscriberName(String subscriberName) { 252 this.subscriberName = subscriberName; 253 } 254 255 public RedeliveryPolicy getRedeliveryPolicy() { 256 return redeliveryPolicy; 257 } 258 259 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 260 this.redeliveryPolicy = redeliveryPolicy; 261 } 262 263 /** 264 * determines if the Subscription is interested in the message 265 * 266 * @param message 267 * @return true if this Subscription will accept the message 268 * @throws JMSException 269 */ 270 public boolean isTarget(ActiveMQMessage message) throws JMSException { 271 boolean result = false; 272 if (message != null) { 273 if (activeClient == null || brokerName == null || clusterName == null 274 || !activeClient.isClusteredConnection() || !message.isEntryCluster(clusterName) 275 || message.isEntryBroker(brokerName)) { 276 result = message.isDispatchedFromDLQ() || filter.matches(message); 277 // lets check that we don't have no-local enabled 278 if (noLocal && result) { 279 if (clientIDsEqual(message)) { 280 result = false; 281 } 282 } 283 284 if (result && !isAuthorizedForMessage(message)) { 285 result = false; 286 } 287 } 288 } 289 return result; 290 } 291 292 /** 293 * If the Subscription is a target for the message, the subscription will add a reference to the message and 294 * register an interest in the message to the container 295 * 296 * @param container 297 * @param message 298 * @throws JMSException 299 */ 300 public void addMessage(MessageContainer container, ActiveMQMessage message) throws JMSException { 301 //log.info("###### Adding to subscription: " + this + " message: " + message); 302 if (log.isDebugEnabled()) { 303 log.debug("Adding to subscription: " + this + " message: " + message); 304 } 305 MessagePointer pointer = new MessagePointer(container, message); 306 synchronized (lock) { 307 messagePtrs.add(pointer); 308 } 309 dispatch.wakeup(this); 310 lastMessageIdentity = message.getJMSMessageIdentity(); 311 } 312 313 /** 314 * Indicates a message has been delivered to a MessageConsumer 315 * 316 * @param ack 317 * @throws JMSException 318 */ 319 public void messageConsumed(final MessageAck ack) throws JMSException { 320 //remove up to this message 321 int count = 0; 322 boolean found = false; 323 synchronized (lock) { 324 QueueListEntry entry = messagePtrs.getFirstEntry(); 325 while (entry != null) { 326 final MessagePointer pointer = (MessagePointer) entry.getElement(); 327 count++; 328 // If in transaction: only consume the message acked. 329 // If not in transaction: consume all previously delivered messages. 330 if (!ack.isPartOfTransaction() || pointer.getMessageIdentity().equals(ack.getMessageIdentity())) { 331 if ((ack.isExpired() || ack.isMessageRead()) && !browser) { 332 pointer.delete(ack);//delete message from the container (if possible) 333 } 334 if (!ack.isMessageRead() && !browser) { 335 // It was a NACK. 336 pointer.reset(); 337 pointer.setRedelivered(true); 338 } 339 else { 340 unconsumedMessagesDispatched.decrement(); 341 // We may have to undo the delivery.. 342 TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask() { 343 public void execute() throws Throwable { 344 unconsumedMessagesDispatched.increment(); 345 pointer.reset(); 346 pointer.setRedelivered(true); 347 dispatch.wakeup(SubscriptionImpl.this); 348 } 349 }); 350 final QueueListEntry theEntry = entry; 351 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() { 352 public void execute() throws Throwable { 353 messagePtrs.remove(theEntry); 354 if ((ack.isExpired() || ack.isMessageRead()) && !browser) { 355 if (ack.isExpired() && !pointer.getContainer().isDeadLetterQueue()) { 356 ActiveMQMessage msg = pointer.getContainer().getMessage( 357 pointer.getMessageIdentity()); 358 if (msg != null) { 359 deadLetterPolicy.sendToDeadLetter(msg); 360 } 361 } 362 } 363 } 364 }); 365 } 366 if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) { 367 found = true; 368 break; 369 } 370 } 371 entry = messagePtrs.getNextEntry(entry); 372 } 373 } 374 if (!found && log.isDebugEnabled()) { 375 log.debug("Did not find a matching message for identity: " + ack.getMessageIdentity()); 376 } 377 dispatch.wakeup(this); 378 } 379 380 /** 381 * Retrieve messages to dispatch 382 * 383 * @return the messages to dispatch 384 * @throws JMSException 385 */ 386 public ActiveMQMessage[] getMessagesToDispatch() throws JMSException { 387 if (usePrefetch) { 388 return getMessagesWithPrefetch(); 389 } 390 List tmpList = new ArrayList(); 391 synchronized (lock) { 392 QueueListEntry entry = messagePtrs.getFirstEntry(); 393 while (entry != null) { 394 MessagePointer pointer = (MessagePointer) entry.getElement(); 395 if (!pointer.isDispatched()) { 396 ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity()); 397 if (msg != null) { 398 if (pointer.isDispatched() || pointer.isRedelivered()) { 399 //already dispatched - so mark as redelivered 400 msg.setJMSRedelivered(true); 401 if (redeliveryPolicy.isBackOffMode() 402 && msg.getDeliveryCount() < redeliveryPolicy.getMaximumRetryCount()) { 403 long sleepTime = redeliveryPolicy.getInitialRedeliveryTimeout(); 404 sleepTime *= (msg.getDeliveryCount() * redeliveryPolicy.getBackOffIncreaseRate()); 405 try { 406 Thread.sleep(sleepTime); 407 } 408 catch (InterruptedException e) { 409 } 410 } 411 //incremenent delivery count 412 msg.incrementDeliveryCount(); 413 } 414 if (!pointer.getContainer().isDeadLetterQueue() 415 && (msg.isExpired() || msg.getDeliveryCount() >= redeliveryPolicy 416 .getMaximumRetryCount())) { 417 if (msg.isExpired()) { 418 log.warn("Message: " + msg + " has expired"); 419 } 420 else { 421 log.warn("Message: " + msg + " exceeded retry count: " + msg.getDeliveryCount()); 422 } 423 deadLetterPolicy.sendToDeadLetter(msg); 424 QueueListEntry discarded = entry; 425 entry = messagePtrs.getPrevEntry(discarded); 426 messagePtrs.remove(discarded); 427 } 428 else { 429 pointer.setDispatched(true); 430 msg.setDispatchedFromDLQ(pointer.getContainer().isDeadLetterQueue()); 431 tmpList.add(msg); 432 } 433 } 434 else { 435 //the message is probably expired 436 log.info("Message probably expired: " + msg); 437 QueueListEntry discarded = entry; 438 entry = messagePtrs.getPrevEntry(discarded); 439 messagePtrs.remove(discarded); 440 if (msg != null) { 441 deadLetterPolicy.sendToDeadLetter(msg); 442 } 443 } 444 } 445 entry = messagePtrs.getNextEntry(entry); 446 } 447 } 448 ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()]; 449 return (ActiveMQMessage[]) tmpList.toArray(messages); 450 } 451 452 public SubscriberEntry getSubscriptionEntry() { 453 if (subscriberEntry == null) { 454 subscriberEntry = createSubscriptionEntry(); 455 } 456 return subscriberEntry; 457 } 458 459 public boolean isLocalSubscription() { 460 if (activeClient != null) { 461 return !(activeClient.isClusteredConnection() || activeClient.isBrokerConnection()); 462 } 463 return true; 464 } 465 466 // Implementation methods 467 //------------------------------------------------------------------------- 468 protected SubscriberEntry createSubscriptionEntry() { 469 SubscriberEntry answer = new SubscriberEntry(); 470 answer.setClientID(clientId); 471 answer.setConsumerName(subscriberName); 472 answer.setDestination(destination.getPhysicalName()); 473 answer.setSelector(selector); 474 return answer; 475 } 476 477 protected ActiveMQMessage[] getMessagesWithPrefetch() throws JMSException { 478 479 List tmpList = new ArrayList(); 480 synchronized (lock) { 481 QueueListEntry entry = messagePtrs.getFirstEntry(); 482 int count = 0; 483 boolean fragmentedMessages = false; 484 int maxNumberToDispatch = prefetchLimit - unconsumedMessagesDispatched.get(); 485 while (entry != null && (count < maxNumberToDispatch || fragmentedMessages)) { 486 MessagePointer pointer = (MessagePointer) entry.getElement(); 487 if (!pointer.isDispatched()) { 488 ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity()); 489 if (msg != null && !msg.isExpired()) { 490 if (pointer.isDispatched() || pointer.isRedelivered()) { 491 //already dispatched - so mark as redelivered 492 msg.setJMSRedelivered(true); 493 } 494 pointer.setDispatched(true); 495 tmpList.add(msg); 496 fragmentedMessages = msg.isMessagePart() && !msg.isLastMessagePart(); 497 unconsumedMessagesDispatched.increment(); 498 count++; 499 } 500 else { 501 //the message is probably expired 502 log.info("Message probably expired: " + msg); 503 QueueListEntry discarded = entry; 504 entry = messagePtrs.getPrevEntry(discarded); 505 messagePtrs.remove(discarded); 506 if (msg != null) { 507 deadLetterPolicy.sendToDeadLetter(msg); 508 } 509 } 510 } 511 entry = messagePtrs.getNextEntry(entry); 512 } 513 } 514 /** 515 * if (tmpList.isEmpty() && ! messagePtrs.isEmpty()) { System.out.println("### Nothing to dispatch but 516 * messagePtrs still has: " + messagePtrs.size() + " to dispatch, prefetchLimit: " + prefetchLimit + " 517 * unconsumedMessagesDispatched: " + unconsumedMessagesDispatched.get() + " maxNumberToDispatch: " + 518 * maxNumberToDispatch); MessagePointer first = (MessagePointer) messagePtrs.getFirst(); System.out.println("### 519 * First: " + first + " dispatched: " + first.isDispatched() + " id: " + first.getMessageIdentity()); } else { 520 * if (! tmpList.isEmpty()) { System.out.println("### dispatching: " + tmpList.size() + " items = " + tmpList); } } 521 */ 522 ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()]; 523 return (ActiveMQMessage[]) tmpList.toArray(messages); 524 } 525 526 /** 527 * Indicates the Subscription it's reached it's pre-fetch limit 528 * 529 * @return true/false 530 * @throws JMSException 531 */ 532 public boolean isAtPrefetchLimit() throws JMSException { 533 if (usePrefetch) { 534 int underlivedMessageCount = messagePtrs.size() - unconsumedMessagesDispatched.get(); 535 return underlivedMessageCount >= prefetchLimit; 536 } 537 else { 538 return false; 539 } 540 } 541 542 /** 543 * Indicates if this Subscription has more messages to send to the Consumer 544 * 545 * @return true if more messages available to dispatch 546 */ 547 public boolean isReadyToDispatch() throws JMSException { 548 /** TODO we may have dispatched messags inside messagePtrs */ 549 boolean answer = active.get() && messagePtrs.size() > 0; 550 return answer; 551 } 552 553 /** 554 * @return Returns the destination. 555 */ 556 public ActiveMQDestination getDestination() { 557 return destination; 558 } 559 560 /** 561 * @return Returns the selector. 562 */ 563 public String getSelector() { 564 return selector; 565 } 566 567 /** 568 * @return Returns the active. 569 */ 570 public boolean isActive() { 571 return active.get(); 572 } 573 574 /** 575 * @param newActive The active to set. 576 * @throws JMSException 577 */ 578 public void setActive(boolean newActive) throws JMSException { 579 synchronized (active.getLock()) { 580 active.set(newActive); 581 } 582 if (!newActive) { 583 reset(); 584 } 585 } 586 587 /** 588 * @return Returns the consumerNumber. 589 */ 590 public int getConsumerNumber() { 591 return consumerNumber; 592 } 593 594 /** 595 * @return the consumer Id for the active consumer 596 */ 597 public String getConsumerId() { 598 return consumerId; 599 } 600 601 /** 602 * Indicates the Subscriber is a Durable Subscriber 603 * 604 * @return true if the subscriber is a durable topic 605 * @throws JMSException 606 */ 607 public boolean isDurableTopic() throws JMSException { 608 return destination.isTopic() && subscriberName != null && subscriberName.length() > 0; 609 } 610 611 /** 612 * Indicates the consumer is a browser only 613 * 614 * @return true if a Browser 615 * @throws JMSException 616 */ 617 public boolean isBrowser() throws JMSException { 618 return browser; 619 } 620 621 public MessageIdentity getLastMessageIdentity() throws JMSException { 622 return lastMessageIdentity; 623 } 624 625 public void setLastMessageIdentifier(MessageIdentity messageIdentity) throws JMSException { 626 this.lastMessageIdentity = messageIdentity; 627 } 628 629 protected boolean clientIDsEqual(ActiveMQMessage message) { 630 String msgClientID = message.getJMSClientID(); 631 String subClientID = clientId; 632 if (msgClientID == null || subClientID == null) { 633 return false; 634 } 635 else { 636 return msgClientID.equals(subClientID); 637 } 638 } 639 640 protected static final boolean equal(Object left, Object right) { 641 return left == right || (left != null && right != null && left.equals(right)); 642 } 643 644 645 /** 646 * Returns whether or not the consumer can receive the given message 647 */ 648 protected boolean isAuthorizedForMessage(ActiveMQMessage message) { 649 // TODO we could maybe provide direct access to the security adapter 650 BrokerClient client = getActiveClient(); 651 if (client != null) { 652 BrokerConnector connector = client.getBrokerConnector(); 653 if (connector != null) { 654 BrokerContainer container = connector.getBrokerContainer(); 655 if (container != null) { 656 Broker broker = container.getBroker(); 657 if (broker != null) { 658 SecurityAdapter securityAdapter = broker.getSecurityAdapter(); 659 if (securityAdapter != null) { 660 return securityAdapter.authorizeReceive(client, message); 661 } 662 } 663 } 664 } 665 } 666 return true; 667 } 668 }