001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.broker.impl; 020 021 import java.io.File; 022 import java.io.IOException; 023 import java.util.ArrayList; 024 import java.util.Hashtable; 025 import java.util.Iterator; 026 import java.util.Map; 027 import javax.jms.JMSException; 028 import javax.naming.Context; 029 import javax.transaction.xa.XAException; 030 import org.activemq.broker.Broker; 031 import org.activemq.broker.BrokerAdmin; 032 import org.activemq.broker.BrokerClient; 033 import org.activemq.broker.ConsumerInfoListener; 034 import org.activemq.capacity.DelegateCapacityMonitor; 035 import org.activemq.io.util.MemoryBoundedObjectManager; 036 import org.activemq.io.util.MemoryBoundedQueueManager; 037 import org.activemq.jndi.ReadOnlyContext; 038 import org.activemq.message.ActiveMQDestination; 039 import org.activemq.message.ActiveMQMessage; 040 import org.activemq.message.ActiveMQXid; 041 import org.activemq.message.BrokerInfo; 042 import org.activemq.message.ConnectionInfo; 043 import org.activemq.message.ConsumerInfo; 044 import org.activemq.message.MessageAck; 045 import org.activemq.message.ProducerInfo; 046 import org.activemq.security.SecurityAdapter; 047 import org.activemq.service.DeadLetterPolicy; 048 import org.activemq.service.MessageContainerAdmin; 049 import org.activemq.service.MessageContainerManager; 050 import org.activemq.service.RedeliveryPolicy; 051 import org.activemq.service.Transaction; 052 import org.activemq.service.TransactionManager; 053 import org.activemq.service.boundedvm.DurableQueueBoundedMessageManager; 054 import org.activemq.service.boundedvm.TransientQueueBoundedMessageManager; 055 import org.activemq.service.boundedvm.TransientTopicBoundedMessageManager; 056 import org.activemq.service.impl.DurableTopicMessageContainerManager; 057 import org.activemq.store.PersistenceAdapter; 058 import org.activemq.store.PersistenceAdapterFactory; 059 import org.activemq.store.TransactionStore; 060 import org.activemq.store.vm.VMPersistenceAdapter; 061 import org.activemq.store.vm.VMTransactionManager; 062 import org.activemq.util.Callback; 063 import org.activemq.util.ExceptionTemplate; 064 import org.activemq.util.JMSExceptionHelper; 065 import org.apache.commons.logging.Log; 066 import org.apache.commons.logging.LogFactory; 067 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 068 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 069 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 070 071 /** 072 * The default {@link Broker} implementation 073 * 074 * @version $Revision: 1.1.1.1 $ 075 */ 076 public class DefaultBroker extends DelegateCapacityMonitor implements Broker, BrokerAdmin { 077 078 private static final Log log = LogFactory.getLog(DefaultBroker.class); 079 080 protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir"; 081 protected static final String PERSISTENCE_ADAPTER_FACTORY = "activemq.persistenceAdapterFactory"; 082 083 protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = {File.class}; 084 085 private static final long DEFAULT_MAX_MEMORY_USAGE = 20 * 1024 * 1024; //20mb 086 087 private PersistenceAdapter persistenceAdapter; 088 private TransactionManager transactionManager; 089 private MessageContainerManager[] containerManagers; 090 private File tempDir; 091 private MemoryBoundedObjectManager memoryManager; 092 private MemoryBoundedQueueManager queueManager; 093 private TransactionStore preparedTransactionStore; 094 private Map containerManagerMap; 095 private CopyOnWriteArrayList consumerInfoListeners; 096 private MessageContainerManager persistentTopicMCM; 097 private MessageContainerManager transientTopicMCM; 098 private TransientQueueBoundedMessageManager transientQueueMCM; 099 private DurableQueueBoundedMessageManager persistentQueueMCM; 100 private SecurityAdapter securityAdapter; 101 private RedeliveryPolicy redeliveryPolicy; 102 private DeadLetterPolicy deadLetterPolicy; 103 private AdvisorySupport advisory; 104 private Map messageConsumers = new ConcurrentHashMap(); 105 private BrokerInfo brokerInfo; 106 private SynchronizedBoolean started = new SynchronizedBoolean(false); 107 private BrokerContainerImpl brokerContainer; 108 109 110 111 public DefaultBroker(String brokerName, String brokerClusterName, MemoryBoundedObjectManager memoryManager) { 112 this.brokerInfo = new LocalBrokerInfo(this); 113 this.brokerInfo.setBrokerName(brokerName); 114 this.brokerInfo.setClusterName(brokerClusterName); 115 this.memoryManager = memoryManager; 116 queueManager = new MemoryBoundedQueueManager(memoryManager); 117 setDelegate(memoryManager); 118 containerManagerMap = new ConcurrentHashMap(); 119 consumerInfoListeners = new CopyOnWriteArrayList(); 120 this.advisory = new AdvisorySupport(this); 121 } 122 123 public DefaultBroker(String brokerName, MemoryBoundedObjectManager memoryManager) { 124 this(brokerName, "default", memoryManager); 125 } 126 127 public DefaultBroker(String brokerName, String cluserName) { 128 this(brokerName, cluserName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE)); 129 } 130 131 public DefaultBroker(String brokerName) { 132 this(brokerName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE)); 133 } 134 135 public DefaultBroker(String brokerName, String brokerClusterName, PersistenceAdapter persistenceAdapter) { 136 this(brokerName, brokerClusterName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE)); 137 this.persistenceAdapter = persistenceAdapter; 138 } 139 140 public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) { 141 this(brokerName); 142 this.persistenceAdapter = persistenceAdapter; 143 } 144 145 public boolean isStarted(){ 146 return started.get(); 147 } 148 149 /** 150 * Start this Service 151 * 152 * @throws JMSException 153 */ 154 public void start() throws JMSException{ 155 if(started.commit(false,true)){ 156 if(redeliveryPolicy==null){ 157 redeliveryPolicy = new RedeliveryPolicy(); 158 } 159 if(deadLetterPolicy==null){ 160 deadLetterPolicy = new DeadLetterPolicy(this); 161 } 162 if(persistenceAdapter==null){ 163 persistenceAdapter = createPersistenceAdapter(); 164 } 165 persistenceAdapter.start(); 166 167 if(transactionManager==null){ 168 preparedTransactionStore = persistenceAdapter.createTransactionStore(); 169 transactionManager = new VMTransactionManager(this,preparedTransactionStore); 170 } 171 172 // force containers to be created 173 if(containerManagerMap.isEmpty()){ 174 makeDefaultContainerManagers(); 175 } 176 getContainerManagers(); 177 178 for(int i = 0;i<containerManagers.length;i++){ 179 containerManagers[i].setDeadLetterPolicy(this.deadLetterPolicy); 180 containerManagers[i].start(); 181 } 182 transactionManager.start(); 183 } 184 } 185 186 187 /** 188 * stop this Service 189 * 190 * @throws JMSException 191 */ 192 193 public void stop() throws JMSException{ 194 if(started.commit(true,false)){ 195 ExceptionTemplate template = new ExceptionTemplate(); 196 197 if(containerManagers!=null){ 198 for(int i = 0;i<containerManagers.length;i++){ 199 final MessageContainerManager containerManager = containerManagers[i]; 200 template.run(new Callback(){ 201 202 public void execute() throws Throwable{ 203 containerManager.stop(); 204 } 205 }); 206 } 207 } 208 if(transactionManager!=null){ 209 template.run(new Callback(){ 210 211 public void execute() throws Throwable{ 212 transactionManager.stop(); 213 } 214 }); 215 } 216 217 template.run(new Callback(){ 218 219 public void execute() throws Throwable{ 220 persistenceAdapter.stop(); 221 } 222 }); 223 224 template.throwJMSException(); 225 } 226 } 227 228 // Broker interface 229 //------------------------------------------------------------------------- 230 231 public void addClient(BrokerClient client, ConnectionInfo info) throws JMSException { 232 if (securityAdapter != null) { 233 securityAdapter.authorizeConnection(client, info); 234 } 235 advisory.addConnection(client,info); 236 } 237 238 public void removeClient(BrokerClient client, ConnectionInfo info) throws JMSException { 239 if (transactionManager != null) { 240 transactionManager.cleanUpClient(client); 241 } 242 advisory.removeConnection(client,info); 243 } 244 245 public void addMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException { 246 if (securityAdapter != null) { 247 securityAdapter.authorizeProducer(client, info); 248 } 249 advisory.addProducer(client,info); 250 } 251 252 public void removeMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException { 253 advisory.removeProducer(client,info); 254 } 255 256 /** 257 * Add an active message consumer 258 */ 259 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 260 validateConsumer(info); 261 if (securityAdapter != null) { 262 securityAdapter.authorizeConsumer(client, info); 263 } 264 advisory.addAdvisory(client, info); 265 MessageContainerManager[] array = getContainerManagers(); 266 for (int i = 0;i < array.length;i++) { 267 array[i].addMessageConsumer(client, info); 268 } 269 fireConsumerInfo(client, info); 270 messageConsumers.put(info,client); 271 } 272 273 /** 274 * remove an active message consumer 275 */ 276 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 277 validateConsumer(info); 278 advisory.removeAdvisory(client, info); 279 for (int i = 0;i < containerManagers.length;i++) { 280 containerManagers[i].removeMessageConsumer(client, info); 281 } 282 fireConsumerInfo(client, info); 283 messageConsumers.remove(info); 284 } 285 286 287 /** 288 * send a message to the broker 289 */ 290 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException { 291 checkValid(); 292 ActiveMQDestination destination = message.getJMSActiveMQDestination(); 293 if (destination == null) { 294 throw new JMSException("No destination specified for the Message"); 295 } 296 if (message.getJMSMessageID() == null && !destination.isAdvisory()) { 297 throw new JMSException("No messageID specified for the Message"); 298 } 299 associateTransaction(message); 300 try { 301 if (destination.isComposite()) { 302 boolean first = true; 303 for (Iterator iter = destination.getChildDestinations().iterator();iter.hasNext();) { 304 ActiveMQDestination childDestination = (ActiveMQDestination) iter.next(); 305 // lets shallow copy just in case 306 if (first) { 307 first = false; 308 } 309 else { 310 message = message.shallowCopy(); 311 } 312 message.setJMSDestination(childDestination); 313 doMessageSend(client, message); 314 } 315 } 316 else { 317 if (destination.isTempDestinationAdvisory() && !client.isBrokerConnection()) { 318 advisory.processTempDestinationAdvisory(client,message); 319 } 320 doMessageSend(client, message); 321 } 322 } 323 finally { 324 disAssociateTransaction(); 325 } 326 } 327 328 /** 329 * Acknowledge consumption of a message by the Message Consumer 330 */ 331 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException { 332 333 associateTransaction(ack); 334 try { 335 for (int i = 0; i < containerManagers.length; i++) { 336 containerManagers[i].acknowledgeMessage(client, ack); 337 } 338 } finally { 339 disAssociateTransaction(); 340 } 341 342 } 343 344 public void deleteSubscription(String clientId, String subscriberName) throws JMSException { 345 for (int i = 0; i < containerManagers.length; i++) { 346 containerManagers[i].deleteSubscription(clientId, subscriberName); 347 } 348 } 349 350 351 /** 352 * Start a transaction. 353 * 354 * @see org.activemq.broker.Broker#startTransaction(org.activemq.broker.BrokerClient, java.lang.String) 355 */ 356 public void startTransaction(BrokerClient client, String transactionId) throws JMSException { 357 transactionManager.createLocalTransaction(client, transactionId); 358 } 359 360 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException { 361 try { 362 Transaction transaction = transactionManager.getLocalTransaction(transactionId); 363 transaction.commit(true); 364 } 365 catch (XAException e) { 366 // TODO: I think the XAException should propagate all the way to the client. 367 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 368 } 369 } 370 371 /** 372 * rollback a transaction 373 */ 374 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException { 375 try { 376 Transaction transaction = transactionManager.getLocalTransaction(transactionId); 377 transaction.rollback(); 378 } 379 catch (XAException e) { 380 // TODO: I think the XAException should propagate all the way to the client. 381 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 382 } 383 } 384 385 /** 386 * Starts an XA Transaction. 387 * 388 * @see org.activemq.broker.Broker#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 389 */ 390 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 391 transactionManager.createXATransaction(client, xid); 392 } 393 394 /** 395 * Prepares an XA Transaciton. 396 * 397 * @see org.activemq.broker.Broker#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 398 */ 399 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 400 Transaction transaction = transactionManager.getXATransaction(xid); 401 return transaction.prepare(); 402 } 403 404 /** 405 * Rollback an XA Transaction. 406 * 407 * @see org.activemq.broker.Broker#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 408 */ 409 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 410 Transaction transaction = transactionManager.getXATransaction(xid); 411 transaction.rollback(); 412 } 413 414 /** 415 * Commit an XA Transaction. 416 * 417 * @see org.activemq.broker.Broker#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean) 418 */ 419 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException { 420 Transaction transaction = transactionManager.getXATransaction(xid); 421 transaction.commit(onePhase); 422 } 423 424 /** 425 * Gets the prepared XA transactions. 426 * 427 * @see org.activemq.broker.Broker#getPreparedTransactions(org.activemq.broker.BrokerClient) 428 */ 429 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException { 430 return transactionManager.getPreparedXATransactions(); 431 } 432 433 434 435 436 // Properties 437 //------------------------------------------------------------------------- 438 439 /** 440 * Get a temp directory - used for spooling 441 * 442 * @return a File ptr to the directory 443 */ 444 public File getTempDir() { 445 if (tempDir == null) { 446 String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp"); 447 tempDir = new File(dirName); 448 } 449 return tempDir; 450 } 451 452 public String getBrokerName() { 453 return brokerInfo.getBrokerName(); 454 } 455 456 /** 457 * @return Returns the brokerClusterName. 458 */ 459 public String getBrokerClusterName() { 460 return brokerInfo.getClusterName(); 461 } 462 463 464 public void setTempDir(File tempDir) { 465 this.tempDir = tempDir; 466 } 467 468 public MessageContainerManager[] getContainerManagers() { 469 if (containerManagers == null) { 470 containerManagers = createContainerManagers(); 471 } 472 return containerManagers; 473 } 474 475 public Map getContainerManagerMap() { 476 return containerManagerMap; 477 } 478 479 public void setContainerManagerMap(Map containerManagerMap) { 480 this.containerManagerMap = containerManagerMap; 481 this.containerManagers = null; 482 } 483 484 public PersistenceAdapter getPersistenceAdapter() { 485 return persistenceAdapter; 486 } 487 488 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) { 489 this.persistenceAdapter = persistenceAdapter; 490 } 491 492 public TransactionManager getTransactionManager() { 493 return transactionManager; 494 } 495 496 public void setTransactionManager(TransactionManager transactionManager) { 497 this.transactionManager = transactionManager; 498 } 499 500 public SecurityAdapter getSecurityAdapter() { 501 return securityAdapter; 502 } 503 504 public void setSecurityAdapter(SecurityAdapter securityAdapter) { 505 this.securityAdapter = securityAdapter; 506 } 507 508 public RedeliveryPolicy getRedeliveryPolicy() { 509 return redeliveryPolicy; 510 } 511 512 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 513 this.redeliveryPolicy = redeliveryPolicy; 514 } 515 516 public TransactionStore getPreparedTransactionStore() { 517 return preparedTransactionStore; 518 } 519 520 public void setPreparedTransactionStore(TransactionStore preparedTransactionStore) { 521 this.preparedTransactionStore = preparedTransactionStore; 522 } 523 524 /** 525 * @return the DeadLetterPolicy 526 */ 527 public DeadLetterPolicy getDeadLetterPolicy(){ 528 return deadLetterPolicy; 529 } 530 531 /** 532 * set the dead letter policy 533 * @param deadLetterPolicy 534 */ 535 public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy){ 536 this.deadLetterPolicy = deadLetterPolicy; 537 } 538 539 /** 540 * @return Returns the maximumMemoryUsage. 541 */ 542 public long getMaximumMemoryUsage() { 543 return memoryManager.getValueLimit(); 544 } 545 546 /** 547 * @param maximumMemoryUsage The maximumMemoryUsage to set. 548 */ 549 public void setMaximumMemoryUsage(long maximumMemoryUsage) { 550 this.memoryManager.setValueLimit(maximumMemoryUsage); 551 } 552 553 554 public Context getDestinationContext(Hashtable environment) { 555 Map data = new ConcurrentHashMap(); 556 for (Iterator iter = containerManagerMap.entrySet().iterator(); iter.hasNext();) { 557 Map.Entry entry = (Map.Entry) iter.next(); 558 String name = entry.getKey().toString(); 559 MessageContainerManager manager = (MessageContainerManager) entry.getValue(); 560 Context context = new ReadOnlyContext(environment, manager.getDestinations()); 561 data.put(name, context); 562 } 563 return new ReadOnlyContext(environment, data); 564 } 565 566 // Implementation methods 567 //------------------------------------------------------------------------- 568 569 570 protected void doMessageSend(BrokerClient client, ActiveMQMessage message) throws JMSException { 571 if (securityAdapter != null) { 572 securityAdapter.authorizeSendMessage(client, message); 573 } 574 ActiveMQDestination dest = message.getJMSActiveMQDestination(); 575 if (dest.isTopic()){ 576 if (message.isPersistent() && !dest.isTemporary()){ 577 persistentTopicMCM.sendMessage(client,message); 578 } 579 transientTopicMCM.sendMessage(client, message); 580 }else { 581 transientQueueMCM.sendMessage(client, message); 582 persistentQueueMCM.sendMessage(client, message); 583 } 584 } 585 586 /** 587 * Factory method to create a default persistence adapter 588 * 589 * @return 590 */ 591 protected PersistenceAdapter createPersistenceAdapter() throws JMSException { 592 File directory = new File(getStoreDirectory()); 593 594 // lets use reflection to avoid runtime dependency on persistence libraries 595 PersistenceAdapter answer = null; 596 String property = System.getProperty(PERSISTENCE_ADAPTER_FACTORY); 597 if (property != null) { 598 answer = tryCreatePersistenceAdapter(property, directory, false); 599 } 600 if (answer == null) { 601 answer = tryCreatePersistenceAdapter("org.activemq.broker.impl.DefaultPersistenceAdapterFactory", directory, true); 602 } 603 if (answer != null) { 604 return answer; 605 } 606 else { 607 log.warn("Default message store (journal+derby) could not be found in the classpath or property '" + PERSISTENCE_ADAPTER_FACTORY 608 + "' not specified so defaulting to use RAM based message persistence"); 609 return new VMPersistenceAdapter(); 610 } 611 } 612 613 protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors) throws JMSException { 614 Class adapterClass = loadClass(className, ignoreErrors); 615 if (adapterClass != null) { 616 try { 617 PersistenceAdapterFactory factory = (PersistenceAdapterFactory) adapterClass.newInstance(); 618 PersistenceAdapter answer = factory.createPersistenceAdapter(directory, memoryManager); 619 log.info("Persistence adapter created using: " + className); 620 return answer; 621 } 622 catch (IOException cause) { 623 throw createInstantiateAdapterException(className, (Exception) cause); 624 } 625 catch (Throwable e) { 626 if (!ignoreErrors) { 627 throw createInstantiateAdapterException(className, e); 628 } 629 } 630 } 631 return null; 632 } 633 634 protected JMSException createInstantiateAdapterException(String className, Throwable e) { 635 return JMSExceptionHelper.newJMSException("Persistence adapter could not be created using: " 636 + className + ". Reason: " + e, e); 637 } 638 639 /** 640 * Tries to load the given class from the current context class loader or 641 * class loader which loaded us or return null if the class could not be found 642 */ 643 protected Class loadClass(String name, boolean ignoreErrors) throws JMSException { 644 try { 645 return Thread.currentThread().getContextClassLoader().loadClass(name); 646 } 647 catch (ClassNotFoundException e) { 648 try { 649 return getClass().getClassLoader().loadClass(name); 650 } 651 catch (ClassNotFoundException e2) { 652 if (ignoreErrors) { 653 log.trace("Could not find class: " + name + " on the classpath"); 654 return null; 655 } 656 else { 657 throw JMSExceptionHelper.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e); 658 } 659 } 660 } 661 } 662 663 protected String getStoreDirectory() { 664 String defaultDirectory = "ActiveMQ" + File.separator + sanitizeString(getBrokerInfo().getBrokerName()); 665 return System.getProperty(PROPERTY_STORE_DIRECTORY, defaultDirectory); 666 } 667 668 /** 669 * Factory method to create the default container managers 670 * 671 * @return 672 */ 673 protected MessageContainerManager[] createContainerManagers() { 674 int size = containerManagerMap.size(); 675 MessageContainerManager[] answer = new MessageContainerManager[size]; 676 containerManagerMap.values().toArray(answer); 677 return answer; 678 } 679 680 protected void makeDefaultContainerManagers() { 681 transientTopicMCM = new TransientTopicBoundedMessageManager(queueManager); 682 containerManagerMap.put("transientTopicContainer", transientTopicMCM); 683 persistentTopicMCM = new DurableTopicMessageContainerManager(persistenceAdapter, redeliveryPolicy, deadLetterPolicy); 684 containerManagerMap.put("persistentTopicContainer", persistentTopicMCM); 685 persistentQueueMCM = new DurableQueueBoundedMessageManager(persistenceAdapter, queueManager, redeliveryPolicy, deadLetterPolicy); 686 containerManagerMap.put("persistentQueueContainer", persistentQueueMCM); 687 transientQueueMCM = new TransientQueueBoundedMessageManager(queueManager,redeliveryPolicy, deadLetterPolicy); 688 containerManagerMap.put("transientQueueContainer", transientQueueMCM); 689 } 690 691 /** 692 * Ensures the consumer is valid, throwing a meaningful exception if not 693 * 694 * @param info 695 * @throws JMSException 696 */ 697 protected void validateConsumer(ConsumerInfo info) throws JMSException { 698 if (info.getConsumerId() == null) { 699 throw new JMSException("No consumerId specified for the ConsumerInfo"); 700 } 701 } 702 703 protected void checkValid() throws JMSException { 704 if (containerManagers == null) { 705 throw new JMSException("This Broker has not yet been started. Ensure start() is called before invoking action methods"); 706 } 707 } 708 709 /** 710 * Add a ConsumerInfoListener to the Broker 711 * 712 * @param l 713 */ 714 public void addConsumerInfoListener(ConsumerInfoListener l) { 715 if (l != null){ 716 consumerInfoListeners.add(l); 717 //fire any existing infos to the listener 718 for (Iterator i = messageConsumers.entrySet().iterator(); i.hasNext();){ 719 Map.Entry entry = (Map.Entry)i.next(); 720 ConsumerInfo info = (ConsumerInfo) entry.getKey(); 721 BrokerClient client = (BrokerClient) entry.getValue(); 722 l.onConsumerInfo(client, info); 723 } 724 } 725 } 726 727 /** 728 * Remove a ConsumerInfoListener from the Broker 729 * 730 * @param l 731 */ 732 public void removeConsumerInfoListener(ConsumerInfoListener l) { 733 consumerInfoListeners.remove(l); 734 } 735 736 protected void fireConsumerInfo(BrokerClient client, ConsumerInfo info) { 737 for (Iterator i = consumerInfoListeners.iterator(); i.hasNext();) { 738 ConsumerInfoListener l = (ConsumerInfoListener) i.next(); 739 l.onConsumerInfo(client, info); 740 } 741 } 742 743 /** 744 * @return the MessageContainerManager for durable topics 745 */ 746 public MessageContainerManager getPersistentTopicContainerManager() { 747 return persistentTopicMCM; 748 } 749 750 /** 751 * @return the MessageContainerManager for transient topics 752 */ 753 public MessageContainerManager getTransientTopicContainerManager() { 754 return transientTopicMCM; 755 } 756 757 /** 758 * @return the MessageContainerManager for persistent queues 759 */ 760 public MessageContainerManager getPersistentQueueContainerManager() { 761 return persistentQueueMCM; 762 } 763 764 /** 765 * @return the MessageContainerManager for transient queues 766 */ 767 public MessageContainerManager getTransientQueueContainerManager() { 768 return transientQueueMCM; 769 } 770 771 /** 772 * @see org.activemq.broker.Broker#getBrokerAdmin() 773 */ 774 public BrokerAdmin getBrokerAdmin() { 775 return this; 776 } 777 778 public void createMessageContainer(ActiveMQDestination dest) throws JMSException { 779 for (int i = 0; i < containerManagers.length; i++) { 780 containerManagers[i].createMessageContainer(dest); 781 } 782 } 783 784 public void destoryMessageContainer(ActiveMQDestination dest) throws JMSException { 785 for (int i = 0; i < containerManagers.length; i++) { 786 containerManagers[i].destroyMessageContainer(dest); 787 } 788 } 789 790 public MessageContainerAdmin getMessageContainerAdmin(ActiveMQDestination dest) throws JMSException { 791 for (int i = 0; i < containerManagers.length; i++) { 792 Map messageContainerAdmins = containerManagers[i].getMessageContainerAdmins(); 793 MessageContainerAdmin mca = (MessageContainerAdmin) messageContainerAdmins.get(dest); 794 if( mca != null ) { 795 return mca; 796 } 797 } 798 return null; 799 } 800 801 /** 802 * @throws JMSException 803 * @see org.activemq.broker.BrokerAdmin#listDestinations() 804 */ 805 public MessageContainerAdmin[] listMessageContainerAdmin() throws JMSException { 806 807 ArrayList l = new ArrayList(); 808 for (int i = 0; i < containerManagers.length; i++) { 809 Map messageContainerAdmins = containerManagers[i].getMessageContainerAdmins(); 810 for (Iterator iter = messageContainerAdmins.values().iterator(); iter.hasNext();) { 811 MessageContainerAdmin mca = (MessageContainerAdmin) iter.next(); 812 l.add(mca); 813 } 814 } 815 816 MessageContainerAdmin answer[] = new MessageContainerAdmin[l.size()]; 817 l.toArray(answer); 818 return answer; 819 } 820 821 822 /** 823 * Add a message to a dead letter queue 824 * @param deadLetterName 825 * @param message 826 * @throws JMSException 827 */ 828 public void sendToDeadLetterQueue(String deadLetterName,ActiveMQMessage expiredMessage) throws JMSException { 829 if (persistentQueueMCM != null) { 830 Transaction original = TransactionManager.getContexTransaction(); 831 try { 832 TransactionManager.setContexTransaction(null); 833 persistentQueueMCM.sendToDeadLetterQueue(deadLetterName, expiredMessage); 834 log.debug(expiredMessage + " sent to DLQ: " + deadLetterName); 835 } finally { 836 TransactionManager.setContexTransaction(original); 837 } 838 } 839 } 840 841 /** 842 * send a message to the broker within a transaction public void 843 * sendTransactedMessage(final BrokerClient client, final String 844 * transactionId, final ActiveMQMessage message) throws JMSException { 845 * getTransactionFor(message).addPostCommitTask(new 846 * SendMessageTransactionTask(client, message)); } 847 */ 848 849 /** 850 * Acknowledge consumption of a message within a transaction 851 public void acknowledgeTransactedMessage(final BrokerClient client, final String transactionId, final MessageAck ack) throws JMSException { 852 Transaction transaction; 853 if (ack.isXaTransacted()) { 854 try { 855 transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId)); 856 } 857 catch (XAException e) { 858 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 859 } 860 } 861 else { 862 transaction = transactionManager.getLocalTransaction(transactionId); 863 } 864 transaction.addPostCommitTask(new MessageAckTransactionTask(client, ack)); 865 transaction.addPostRollbackTask(new RedeliverMessageTransactionTask(client, ack)); 866 867 // we need to tell the dispatcher that we can now accept another message 868 // even though we don't really ack the message until the commit 869 // this is because if we have a prefetch value of 1, we can never consume 2 messages 870 // in a transaction, since the ack for the first message never arrives until the commit 871 for (int i = 0; i < containerManagers.length; i++) { 872 containerManagers[i].acknowledgeTransactedMessage(client, transactionId, ack); 873 } 874 } 875 */ 876 877 878 /** 879 * @param message 880 * @return 881 * @throws JMSException 882 private Transaction getTransactionFor(ActiveMQMessage message) throws JMSException { 883 String transactionId = message.getTransactionId(); 884 if (message.isXaTransacted()) { 885 try { 886 return transactionManager.getXATransaction(new ActiveMQXid(transactionId)); 887 } 888 catch (XAException e) { 889 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 890 } 891 } 892 return transactionManager.getLocalTransaction(transactionId); 893 } 894 895 896 public void acknowledgeMessageRecover(MessageAck ack) { 897 } 898 public void sendMessageRecover(ActiveMQMessage message) throws JMSException { 899 } 900 */ 901 902 /** 903 * Associates a Transaction with the current thread. Once this call is finished, 904 * the Transactio ncan be obtained via TransactionManager.getContexTransaction(). 905 * @param message 906 * @throws JMSException 907 */ 908 private final void associateTransaction(ActiveMQMessage message) throws JMSException { 909 Transaction transaction; 910 if( message.isPartOfTransaction() ) { 911 if (message.isXaTransacted()) { 912 try { 913 transaction = transactionManager.getXATransaction((ActiveMQXid) message.getTransactionId()); 914 } 915 catch (XAException e) { 916 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 917 } 918 } else { 919 transaction = transactionManager.getLocalTransaction((String) message.getTransactionId()); 920 } 921 922 } else { 923 transaction = null; 924 } 925 TransactionManager.setContexTransaction(transaction); 926 } 927 928 private void disAssociateTransaction() { 929 TransactionManager.setContexTransaction(null); 930 } 931 932 /** 933 * Associates a Transaction with the current thread. Once this call is finished, 934 * the Transactio ncan be obtained via TransactionManager.getContexTransaction(). 935 * @param ack 936 * @throws JMSException 937 */ 938 private void associateTransaction(MessageAck ack) throws JMSException { 939 Transaction transaction; 940 if( ack.isPartOfTransaction() ) { 941 if (ack.isXaTransacted()) { 942 try { 943 transaction = transactionManager.getXATransaction((ActiveMQXid) ack.getTransactionId()); 944 } 945 catch (XAException e) { 946 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 947 } 948 } else { 949 transaction = transactionManager.getLocalTransaction((String) ack.getTransactionId()); 950 } 951 952 } else { 953 transaction = null; 954 } 955 TransactionManager.setContexTransaction(transaction); 956 } 957 958 private String sanitizeString(String in) { 959 String result = null; 960 if (in != null) { 961 result = in.replace(':', '_'); 962 result = result.replace('/', '_'); 963 result = result.replace('\\', '_'); 964 } 965 return result; 966 } 967 968 /** 969 * @return Returns the memoryManager. 970 */ 971 public MemoryBoundedObjectManager getMemoryManager() { 972 return memoryManager; 973 } 974 975 976 /** 977 * @return Returns the queueManager. 978 */ 979 public MemoryBoundedQueueManager getQueueManager() { 980 return queueManager; 981 } 982 983 984 public String getName() { 985 return getBrokerName(); 986 } 987 988 989 public String toString (){ 990 return "broker: " + getName(); 991 } 992 993 /** 994 * @see org.activemq.broker.Broker#getBrokerInfo() 995 */ 996 public BrokerInfo getBrokerInfo(){ 997 return brokerInfo; 998 } 999 1000 protected void setBrokercontainer(BrokerContainerImpl container){ 1001 this.brokerContainer = container; 1002 } 1003 1004 protected BrokerContainerImpl getBrokerContainer(){ 1005 return brokerContainer; 1006 } 1007 1008 1009 1010 1011 }