001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.broker.impl;
020    
021    import java.io.File;
022    import java.io.IOException;
023    import java.util.ArrayList;
024    import java.util.Hashtable;
025    import java.util.Iterator;
026    import java.util.Map;
027    import javax.jms.JMSException;
028    import javax.naming.Context;
029    import javax.transaction.xa.XAException;
030    import org.activemq.broker.Broker;
031    import org.activemq.broker.BrokerAdmin;
032    import org.activemq.broker.BrokerClient;
033    import org.activemq.broker.ConsumerInfoListener;
034    import org.activemq.capacity.DelegateCapacityMonitor;
035    import org.activemq.io.util.MemoryBoundedObjectManager;
036    import org.activemq.io.util.MemoryBoundedQueueManager;
037    import org.activemq.jndi.ReadOnlyContext;
038    import org.activemq.message.ActiveMQDestination;
039    import org.activemq.message.ActiveMQMessage;
040    import org.activemq.message.ActiveMQXid;
041    import org.activemq.message.BrokerInfo;
042    import org.activemq.message.ConnectionInfo;
043    import org.activemq.message.ConsumerInfo;
044    import org.activemq.message.MessageAck;
045    import org.activemq.message.ProducerInfo;
046    import org.activemq.security.SecurityAdapter;
047    import org.activemq.service.DeadLetterPolicy;
048    import org.activemq.service.MessageContainerAdmin;
049    import org.activemq.service.MessageContainerManager;
050    import org.activemq.service.RedeliveryPolicy;
051    import org.activemq.service.Transaction;
052    import org.activemq.service.TransactionManager;
053    import org.activemq.service.boundedvm.DurableQueueBoundedMessageManager;
054    import org.activemq.service.boundedvm.TransientQueueBoundedMessageManager;
055    import org.activemq.service.boundedvm.TransientTopicBoundedMessageManager;
056    import org.activemq.service.impl.DurableTopicMessageContainerManager;
057    import org.activemq.store.PersistenceAdapter;
058    import org.activemq.store.PersistenceAdapterFactory;
059    import org.activemq.store.TransactionStore;
060    import org.activemq.store.vm.VMPersistenceAdapter;
061    import org.activemq.store.vm.VMTransactionManager;
062    import org.activemq.util.Callback;
063    import org.activemq.util.ExceptionTemplate;
064    import org.activemq.util.JMSExceptionHelper;
065    import org.apache.commons.logging.Log;
066    import org.apache.commons.logging.LogFactory;
067    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
068    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
069    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
070    
071    /**
072     * The default {@link Broker} implementation
073     *
074     * @version $Revision: 1.1.1.1 $
075     */
076    public class DefaultBroker extends DelegateCapacityMonitor implements Broker, BrokerAdmin {
077    
078        private static final Log log = LogFactory.getLog(DefaultBroker.class);
079    
080        protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir";
081        protected static final String PERSISTENCE_ADAPTER_FACTORY = "activemq.persistenceAdapterFactory";
082    
083        protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = {File.class};
084    
085        private static final long DEFAULT_MAX_MEMORY_USAGE = 20 * 1024 * 1024; //20mb
086    
087        private PersistenceAdapter persistenceAdapter;
088        private TransactionManager transactionManager;
089        private MessageContainerManager[] containerManagers;
090        private File tempDir;
091        private MemoryBoundedObjectManager memoryManager;
092        private MemoryBoundedQueueManager queueManager;
093        private TransactionStore preparedTransactionStore;
094        private Map containerManagerMap;
095        private CopyOnWriteArrayList consumerInfoListeners;
096        private MessageContainerManager persistentTopicMCM;
097        private MessageContainerManager transientTopicMCM;
098        private TransientQueueBoundedMessageManager transientQueueMCM;
099        private DurableQueueBoundedMessageManager persistentQueueMCM;
100        private SecurityAdapter securityAdapter;
101        private RedeliveryPolicy redeliveryPolicy;
102        private DeadLetterPolicy deadLetterPolicy;
103        private AdvisorySupport  advisory;
104        private Map messageConsumers = new ConcurrentHashMap();
105        private BrokerInfo brokerInfo;
106        private SynchronizedBoolean started = new SynchronizedBoolean(false);
107        private BrokerContainerImpl brokerContainer;
108    
109    
110    
111        public DefaultBroker(String brokerName, String brokerClusterName, MemoryBoundedObjectManager memoryManager) {
112            this.brokerInfo = new LocalBrokerInfo(this);
113            this.brokerInfo.setBrokerName(brokerName);
114            this.brokerInfo.setClusterName(brokerClusterName);
115            this.memoryManager = memoryManager;
116            queueManager = new MemoryBoundedQueueManager(memoryManager);
117            setDelegate(memoryManager);
118            containerManagerMap = new ConcurrentHashMap();
119            consumerInfoListeners = new CopyOnWriteArrayList();
120            this.advisory = new AdvisorySupport(this);
121        }
122    
123        public DefaultBroker(String brokerName, MemoryBoundedObjectManager memoryManager) {
124            this(brokerName, "default", memoryManager);
125        }
126    
127        public DefaultBroker(String brokerName, String cluserName) {
128            this(brokerName, cluserName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE));
129        }
130    
131        public DefaultBroker(String brokerName) {
132            this(brokerName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE));
133        }
134    
135        public DefaultBroker(String brokerName, String brokerClusterName, PersistenceAdapter persistenceAdapter) {
136            this(brokerName, brokerClusterName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE));
137            this.persistenceAdapter = persistenceAdapter;
138        }
139    
140        public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) {
141            this(brokerName);
142            this.persistenceAdapter = persistenceAdapter;
143        }
144        
145        public boolean isStarted(){
146            return started.get();
147        }
148    
149        /**
150         * Start this Service
151         *
152         * @throws JMSException
153         */
154        public void start() throws JMSException{
155            if(started.commit(false,true)){
156                if(redeliveryPolicy==null){
157                    redeliveryPolicy = new RedeliveryPolicy();
158                }
159                if(deadLetterPolicy==null){
160                    deadLetterPolicy = new DeadLetterPolicy(this);
161                }
162                if(persistenceAdapter==null){
163                    persistenceAdapter = createPersistenceAdapter();
164                }
165                persistenceAdapter.start();
166    
167                if(transactionManager==null){
168                    preparedTransactionStore = persistenceAdapter.createTransactionStore();
169                    transactionManager = new VMTransactionManager(this,preparedTransactionStore);
170                }
171    
172                // force containers to be created
173                if(containerManagerMap.isEmpty()){
174                    makeDefaultContainerManagers();
175                }
176                getContainerManagers();
177    
178                for(int i = 0;i<containerManagers.length;i++){
179                    containerManagers[i].setDeadLetterPolicy(this.deadLetterPolicy);
180                    containerManagers[i].start();
181                }
182                transactionManager.start();
183            }
184        }
185    
186    
187        /**
188         * stop this Service
189         *
190         * @throws JMSException
191         */
192    
193        public void stop() throws JMSException{
194            if(started.commit(true,false)){
195                ExceptionTemplate template = new ExceptionTemplate();
196    
197                if(containerManagers!=null){
198                    for(int i = 0;i<containerManagers.length;i++){
199                        final MessageContainerManager containerManager = containerManagers[i];
200                        template.run(new Callback(){
201    
202                            public void execute() throws Throwable{
203                                containerManager.stop();
204                            }
205                        });
206                    }
207                }
208                if(transactionManager!=null){
209                    template.run(new Callback(){
210    
211                        public void execute() throws Throwable{
212                            transactionManager.stop();
213                        }
214                    });
215                }
216    
217                template.run(new Callback(){
218    
219                    public void execute() throws Throwable{
220                        persistenceAdapter.stop();
221                    }
222                });
223    
224                template.throwJMSException();
225            }
226        }
227    
228        // Broker interface
229        //-------------------------------------------------------------------------
230    
231        public void addClient(BrokerClient client, ConnectionInfo info) throws JMSException {
232            if (securityAdapter != null) {
233                securityAdapter.authorizeConnection(client, info);
234            }
235            advisory.addConnection(client,info);
236        }
237    
238        public void removeClient(BrokerClient client, ConnectionInfo info) throws JMSException {
239            if (transactionManager != null) {
240                transactionManager.cleanUpClient(client);
241            }
242            advisory.removeConnection(client,info);
243        }
244    
245        public void addMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
246            if (securityAdapter != null) {
247                securityAdapter.authorizeProducer(client, info);
248            }
249            advisory.addProducer(client,info);
250        }
251    
252        public void removeMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
253            advisory.removeProducer(client,info);
254        }
255    
256        /**
257         * Add an active message consumer
258         */
259        public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
260            validateConsumer(info);
261            if (securityAdapter != null) {
262                securityAdapter.authorizeConsumer(client, info);
263            }
264            advisory.addAdvisory(client, info);
265            MessageContainerManager[] array = getContainerManagers();
266            for (int i = 0;i < array.length;i++) {
267                array[i].addMessageConsumer(client, info);
268            }
269            fireConsumerInfo(client, info);
270            messageConsumers.put(info,client);
271        }
272    
273        /**
274         * remove an active message consumer
275         */
276        public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
277            validateConsumer(info);
278            advisory.removeAdvisory(client, info);
279            for (int i = 0;i < containerManagers.length;i++) {
280                containerManagers[i].removeMessageConsumer(client, info);
281            }
282            fireConsumerInfo(client, info);
283            messageConsumers.remove(info);
284        }
285    
286    
287        /**
288         * send a message to the broker
289         */
290        public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
291            checkValid();
292            ActiveMQDestination destination = message.getJMSActiveMQDestination();
293            if (destination == null) {
294                throw new JMSException("No destination specified for the Message");
295            }
296            if (message.getJMSMessageID() == null && !destination.isAdvisory()) {
297                throw new JMSException("No messageID specified for the Message");
298            }
299            associateTransaction(message);
300            try {
301                if (destination.isComposite()) {
302                    boolean first = true;
303                    for (Iterator iter = destination.getChildDestinations().iterator();iter.hasNext();) {
304                        ActiveMQDestination childDestination = (ActiveMQDestination) iter.next();
305                        // lets shallow copy just in case
306                        if (first) {
307                            first = false;
308                        }
309                        else {
310                            message = message.shallowCopy();
311                        }
312                        message.setJMSDestination(childDestination);
313                        doMessageSend(client, message);
314                    }
315                }
316                else {
317                    if (destination.isTempDestinationAdvisory() && !client.isBrokerConnection()) {
318                        advisory.processTempDestinationAdvisory(client,message);
319                    }
320                    doMessageSend(client, message);
321                }
322            }
323            finally {
324                disAssociateTransaction();
325            }
326        }
327    
328        /**
329         * Acknowledge consumption of a message by the Message Consumer
330         */
331        public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
332            
333            associateTransaction(ack);
334            try {
335                    for (int i = 0; i < containerManagers.length; i++) {
336                        containerManagers[i].acknowledgeMessage(client, ack);
337                    }
338            } finally {
339                disAssociateTransaction();
340            }
341            
342        }
343    
344        public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
345            for (int i = 0; i < containerManagers.length; i++) {
346                containerManagers[i].deleteSubscription(clientId, subscriberName);
347            }
348        }
349    
350    
351        /**
352         * Start a transaction.
353         *
354         * @see org.activemq.broker.Broker#startTransaction(org.activemq.broker.BrokerClient, java.lang.String)
355         */
356        public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
357            transactionManager.createLocalTransaction(client, transactionId);
358        }
359    
360        public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
361            try {
362                Transaction transaction = transactionManager.getLocalTransaction(transactionId);
363                transaction.commit(true);
364            }
365            catch (XAException e) {
366                // TODO: I think the XAException should propagate all the way to the client.
367                throw (JMSException) new JMSException(e.getMessage()).initCause(e);
368            }
369        }
370    
371        /**
372         * rollback a transaction
373         */
374        public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
375            try {
376                Transaction transaction = transactionManager.getLocalTransaction(transactionId);
377                transaction.rollback();
378            }
379            catch (XAException e) {
380                // TODO: I think the XAException should propagate all the way to the client.
381                throw (JMSException) new JMSException(e.getMessage()).initCause(e);
382            }
383        }
384    
385        /**
386         * Starts an XA Transaction.
387         *
388         * @see org.activemq.broker.Broker#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
389         */
390        public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
391            transactionManager.createXATransaction(client, xid);
392        }
393    
394        /**
395         * Prepares an XA Transaciton.
396         *
397         * @see org.activemq.broker.Broker#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
398         */
399        public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
400            Transaction transaction = transactionManager.getXATransaction(xid);
401            return transaction.prepare();
402        }
403    
404        /**
405         * Rollback an XA Transaction.
406         *
407         * @see org.activemq.broker.Broker#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
408         */
409        public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
410            Transaction transaction = transactionManager.getXATransaction(xid);
411            transaction.rollback();
412        }
413    
414        /**
415         * Commit an XA Transaction.
416         *
417         * @see org.activemq.broker.Broker#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean)
418         */
419        public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
420            Transaction transaction = transactionManager.getXATransaction(xid);
421            transaction.commit(onePhase);
422        }
423    
424        /**
425         * Gets the prepared XA transactions.
426         *
427         * @see org.activemq.broker.Broker#getPreparedTransactions(org.activemq.broker.BrokerClient)
428         */
429        public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
430            return transactionManager.getPreparedXATransactions();
431        }
432        
433        
434    
435    
436        // Properties
437        //-------------------------------------------------------------------------
438    
439        /**
440         * Get a temp directory - used for spooling
441         *
442         * @return a File ptr to the directory
443         */
444        public File getTempDir() {
445            if (tempDir == null) {
446                String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp");
447                tempDir = new File(dirName);
448            }
449            return tempDir;
450        }
451    
452        public String getBrokerName() {
453            return brokerInfo.getBrokerName();
454        }
455    
456        /**
457         * @return Returns the brokerClusterName.
458         */
459        public String getBrokerClusterName() {
460            return brokerInfo.getClusterName();
461        }
462    
463        
464        public void setTempDir(File tempDir) {
465            this.tempDir = tempDir;
466        }
467    
468        public MessageContainerManager[] getContainerManagers() {
469            if (containerManagers == null) {
470                containerManagers = createContainerManagers();
471            }
472            return containerManagers;
473        }
474    
475        public Map getContainerManagerMap() {
476            return containerManagerMap;
477        }
478    
479        public void setContainerManagerMap(Map containerManagerMap) {
480            this.containerManagerMap = containerManagerMap;
481            this.containerManagers = null;
482        }
483    
484        public PersistenceAdapter getPersistenceAdapter() {
485            return persistenceAdapter;
486        }
487    
488        public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
489            this.persistenceAdapter = persistenceAdapter;
490        }
491    
492        public TransactionManager getTransactionManager() {
493            return transactionManager;
494        }
495    
496        public void setTransactionManager(TransactionManager transactionManager) {
497            this.transactionManager = transactionManager;
498        }
499    
500        public SecurityAdapter getSecurityAdapter() {
501            return securityAdapter;
502        }
503    
504        public void setSecurityAdapter(SecurityAdapter securityAdapter) {
505            this.securityAdapter = securityAdapter;
506        }
507    
508        public RedeliveryPolicy getRedeliveryPolicy() {
509            return redeliveryPolicy;
510        }
511    
512        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
513            this.redeliveryPolicy = redeliveryPolicy;
514        }
515    
516        public TransactionStore getPreparedTransactionStore() {
517            return preparedTransactionStore;
518        }
519    
520        public void setPreparedTransactionStore(TransactionStore preparedTransactionStore) {
521            this.preparedTransactionStore = preparedTransactionStore;
522        }
523        
524        /**
525         * @return the DeadLetterPolicy
526         */
527        public DeadLetterPolicy getDeadLetterPolicy(){
528            return deadLetterPolicy;
529        }
530        
531        /**
532         * set the dead letter policy
533         * @param deadLetterPolicy
534         */
535        public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy){
536            this.deadLetterPolicy = deadLetterPolicy;
537        }
538    
539        /**
540         * @return Returns the maximumMemoryUsage.
541         */
542        public long getMaximumMemoryUsage() {
543            return memoryManager.getValueLimit();
544        }
545    
546        /**
547         * @param maximumMemoryUsage The maximumMemoryUsage to set.
548         */
549        public void setMaximumMemoryUsage(long maximumMemoryUsage) {
550            this.memoryManager.setValueLimit(maximumMemoryUsage);
551        }
552    
553    
554        public Context getDestinationContext(Hashtable environment) {
555            Map data = new ConcurrentHashMap();
556            for (Iterator iter = containerManagerMap.entrySet().iterator(); iter.hasNext();) {
557                Map.Entry entry = (Map.Entry) iter.next();
558                String name = entry.getKey().toString();
559                MessageContainerManager manager = (MessageContainerManager) entry.getValue();
560                Context context = new ReadOnlyContext(environment, manager.getDestinations());
561                data.put(name, context);
562            }
563            return new ReadOnlyContext(environment, data);
564        }
565    
566        // Implementation methods
567        //-------------------------------------------------------------------------
568    
569    
570        protected void doMessageSend(BrokerClient client, ActiveMQMessage message) throws JMSException {
571            if (securityAdapter != null) {
572                securityAdapter.authorizeSendMessage(client, message);
573            }
574            ActiveMQDestination dest = message.getJMSActiveMQDestination();
575            if (dest.isTopic()){
576                if (message.isPersistent() && !dest.isTemporary()){
577                    persistentTopicMCM.sendMessage(client,message);
578                }
579                transientTopicMCM.sendMessage(client, message);
580            }else {
581                transientQueueMCM.sendMessage(client, message);
582                persistentQueueMCM.sendMessage(client, message);
583            }
584        }
585    
586        /**
587         * Factory method to create a default persistence adapter
588         *
589         * @return
590         */
591        protected PersistenceAdapter createPersistenceAdapter() throws JMSException {
592            File directory = new File(getStoreDirectory());
593    
594            // lets use reflection to avoid runtime dependency on persistence libraries
595            PersistenceAdapter answer = null;
596            String property = System.getProperty(PERSISTENCE_ADAPTER_FACTORY);
597            if (property != null) {
598                answer = tryCreatePersistenceAdapter(property, directory, false);
599            }
600            if (answer == null) {
601                answer = tryCreatePersistenceAdapter("org.activemq.broker.impl.DefaultPersistenceAdapterFactory", directory, true);
602            }
603            if (answer != null) {
604                return answer;
605            }
606            else {
607                log.warn("Default message store (journal+derby) could not be found in the classpath or property '" + PERSISTENCE_ADAPTER_FACTORY
608                        + "' not specified so defaulting to use RAM based message persistence");
609                return new VMPersistenceAdapter();
610            }
611        }
612    
613        protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors) throws JMSException {
614            Class adapterClass = loadClass(className, ignoreErrors);
615            if (adapterClass != null) {
616                try {
617                    PersistenceAdapterFactory factory = (PersistenceAdapterFactory) adapterClass.newInstance();
618                    PersistenceAdapter answer = factory.createPersistenceAdapter(directory, memoryManager);
619                    log.info("Persistence adapter created using: " + className);
620                    return answer;
621                }
622                catch (IOException cause) {
623                    throw createInstantiateAdapterException(className, (Exception) cause);
624                }
625                catch (Throwable e) {
626                    if (!ignoreErrors) {
627                        throw createInstantiateAdapterException(className, e);
628                    }
629                }
630            }
631            return null;
632        }
633    
634        protected JMSException createInstantiateAdapterException(String className, Throwable e) {
635            return JMSExceptionHelper.newJMSException("Persistence adapter could not be created using: "
636                    + className + ". Reason: " + e, e);
637        }
638    
639        /**
640         * Tries to load the given class from the current context class loader or
641         * class loader which loaded us or return null if the class could not be found
642         */
643        protected Class loadClass(String name, boolean ignoreErrors) throws JMSException {
644            try {
645                return Thread.currentThread().getContextClassLoader().loadClass(name);
646            }
647            catch (ClassNotFoundException e) {
648                try {
649                    return getClass().getClassLoader().loadClass(name);
650                }
651                catch (ClassNotFoundException e2) {
652                    if (ignoreErrors) {
653                        log.trace("Could not find class: " + name + " on the classpath");
654                        return null;
655                    }
656                    else {
657                        throw JMSExceptionHelper.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e);
658                    }
659                }
660            }
661        }
662    
663        protected String getStoreDirectory() {
664            String defaultDirectory = "ActiveMQ" + File.separator + sanitizeString(getBrokerInfo().getBrokerName());
665            return System.getProperty(PROPERTY_STORE_DIRECTORY, defaultDirectory);
666        }
667    
668        /**
669         * Factory method to create the default container managers
670         *
671         * @return
672         */
673        protected MessageContainerManager[] createContainerManagers() {
674            int size = containerManagerMap.size();
675            MessageContainerManager[] answer = new MessageContainerManager[size];
676            containerManagerMap.values().toArray(answer);
677            return answer;
678        }
679    
680        protected void makeDefaultContainerManagers() {
681            transientTopicMCM = new TransientTopicBoundedMessageManager(queueManager);
682            containerManagerMap.put("transientTopicContainer", transientTopicMCM);
683            persistentTopicMCM = new DurableTopicMessageContainerManager(persistenceAdapter, redeliveryPolicy, deadLetterPolicy);
684            containerManagerMap.put("persistentTopicContainer", persistentTopicMCM);
685            persistentQueueMCM = new DurableQueueBoundedMessageManager(persistenceAdapter, queueManager, redeliveryPolicy, deadLetterPolicy);
686            containerManagerMap.put("persistentQueueContainer", persistentQueueMCM);
687            transientQueueMCM = new TransientQueueBoundedMessageManager(queueManager,redeliveryPolicy, deadLetterPolicy);
688            containerManagerMap.put("transientQueueContainer", transientQueueMCM);
689        }
690    
691        /**
692         * Ensures the consumer is valid, throwing a meaningful exception if not
693         *
694         * @param info
695         * @throws JMSException
696         */
697        protected void validateConsumer(ConsumerInfo info) throws JMSException {
698            if (info.getConsumerId() == null) {
699                throw new JMSException("No consumerId specified for the ConsumerInfo");
700            }
701        }
702    
703        protected void checkValid() throws JMSException {
704            if (containerManagers == null) {
705                throw new JMSException("This Broker has not yet been started. Ensure start() is called before invoking action methods");
706            }
707        }
708    
709        /**
710         * Add a ConsumerInfoListener to the Broker
711         *
712         * @param l
713         */
714        public void addConsumerInfoListener(ConsumerInfoListener l) {
715            if (l != null){
716                consumerInfoListeners.add(l);
717                //fire any existing infos to the listener
718                for (Iterator i = messageConsumers.entrySet().iterator(); i.hasNext();){
719                    Map.Entry entry = (Map.Entry)i.next();
720                    ConsumerInfo info = (ConsumerInfo) entry.getKey();
721                    BrokerClient client = (BrokerClient) entry.getValue();
722                    l.onConsumerInfo(client, info);
723                }
724            }
725        }
726    
727        /**
728         * Remove a ConsumerInfoListener from the Broker
729         *
730         * @param l
731         */
732        public void removeConsumerInfoListener(ConsumerInfoListener l) {
733            consumerInfoListeners.remove(l);
734        }
735    
736        protected void fireConsumerInfo(BrokerClient client, ConsumerInfo info) {
737            for (Iterator i = consumerInfoListeners.iterator(); i.hasNext();) {
738                ConsumerInfoListener l = (ConsumerInfoListener) i.next();
739                l.onConsumerInfo(client, info);
740            }
741        }
742    
743        /**
744         * @return the MessageContainerManager for durable topics
745         */
746        public MessageContainerManager getPersistentTopicContainerManager() {
747            return persistentTopicMCM;
748        }
749    
750        /**
751         * @return the MessageContainerManager for transient topics
752         */
753        public MessageContainerManager getTransientTopicContainerManager() {
754            return transientTopicMCM;
755        }
756    
757        /**
758         * @return the MessageContainerManager for persistent queues
759         */
760        public MessageContainerManager getPersistentQueueContainerManager() {
761            return persistentQueueMCM;
762        }
763    
764        /**
765         * @return the MessageContainerManager for transient queues
766         */
767        public MessageContainerManager getTransientQueueContainerManager() {
768            return transientQueueMCM;
769        }
770    
771        /**
772         * @see org.activemq.broker.Broker#getBrokerAdmin()
773         */
774        public BrokerAdmin getBrokerAdmin() {
775            return this;
776        }
777    
778        public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
779            for (int i = 0; i < containerManagers.length; i++) {
780                containerManagers[i].createMessageContainer(dest);
781            }
782        }
783    
784        public void destoryMessageContainer(ActiveMQDestination dest) throws JMSException {
785            for (int i = 0; i < containerManagers.length; i++) {
786                containerManagers[i].destroyMessageContainer(dest);
787            }
788        }
789    
790        public MessageContainerAdmin getMessageContainerAdmin(ActiveMQDestination dest) throws JMSException {
791            for (int i = 0; i < containerManagers.length; i++) {
792                Map messageContainerAdmins = containerManagers[i].getMessageContainerAdmins();
793                MessageContainerAdmin mca = (MessageContainerAdmin) messageContainerAdmins.get(dest);
794                if( mca != null ) {
795                    return mca;
796                }
797            }
798            return null;
799        }
800    
801        /**
802         * @throws JMSException
803         * @see org.activemq.broker.BrokerAdmin#listDestinations()
804         */
805        public MessageContainerAdmin[] listMessageContainerAdmin() throws JMSException {
806            
807            ArrayList l = new ArrayList();
808            for (int i = 0; i < containerManagers.length; i++) {
809                Map messageContainerAdmins = containerManagers[i].getMessageContainerAdmins();
810                for (Iterator iter = messageContainerAdmins.values().iterator(); iter.hasNext();) {
811                    MessageContainerAdmin mca = (MessageContainerAdmin) iter.next();
812                    l.add(mca);
813                }
814            }
815            
816            MessageContainerAdmin answer[] = new MessageContainerAdmin[l.size()];
817            l.toArray(answer);
818            return answer;
819        }
820        
821        
822        /**
823         * Add a message to a dead letter queue
824         * @param deadLetterName
825         * @param message
826         * @throws JMSException
827         */
828        public void sendToDeadLetterQueue(String deadLetterName,ActiveMQMessage expiredMessage) throws JMSException {
829           if (persistentQueueMCM != null) {
830                            Transaction original = TransactionManager.getContexTransaction();
831                            try {
832                                    TransactionManager.setContexTransaction(null);
833                                    persistentQueueMCM.sendToDeadLetterQueue(deadLetterName, expiredMessage);
834                                    log.debug(expiredMessage + " sent to DLQ: " + deadLetterName);
835                            } finally {
836                                    TransactionManager.setContexTransaction(original);
837                            }
838                    }          
839        }
840    
841        /**
842             * send a message to the broker within a transaction public void
843             * sendTransactedMessage(final BrokerClient client, final String
844             * transactionId, final ActiveMQMessage message) throws JMSException {
845             * getTransactionFor(message).addPostCommitTask(new
846             * SendMessageTransactionTask(client, message)); }
847             */
848        
849        /**
850         * Acknowledge consumption of a message within a transaction
851        public void acknowledgeTransactedMessage(final BrokerClient client, final String transactionId, final MessageAck ack) throws JMSException {
852            Transaction transaction;
853            if (ack.isXaTransacted()) {
854                try {
855                    transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId));
856                }
857                catch (XAException e) {
858                    throw (JMSException) new JMSException(e.getMessage()).initCause(e);
859                }
860            }
861            else {
862                transaction = transactionManager.getLocalTransaction(transactionId);
863            }
864            transaction.addPostCommitTask(new MessageAckTransactionTask(client, ack));
865            transaction.addPostRollbackTask(new RedeliverMessageTransactionTask(client, ack));
866    
867            // we need to tell the dispatcher that we can now accept another message
868            // even though we don't really ack the message until the commit
869            // this is because if we have a prefetch value of 1, we can never consume 2 messages
870            // in a transaction, since the ack for the first message never arrives until the commit
871            for (int i = 0; i < containerManagers.length; i++) {
872                containerManagers[i].acknowledgeTransactedMessage(client, transactionId, ack);
873            }
874        }
875         */
876    
877    
878        /**
879         * @param message
880         * @return
881         * @throws JMSException
882        private Transaction getTransactionFor(ActiveMQMessage message) throws JMSException {
883            String transactionId = message.getTransactionId();        
884            if (message.isXaTransacted()) {
885                try {
886                    return transactionManager.getXATransaction(new ActiveMQXid(transactionId));
887                }
888                catch (XAException e) {
889                    throw (JMSException) new JMSException(e.getMessage()).initCause(e);
890                }
891            }
892            return transactionManager.getLocalTransaction(transactionId);
893        }
894    
895    
896        public void acknowledgeMessageRecover(MessageAck ack) {
897        }
898        public void sendMessageRecover(ActiveMQMessage message) throws JMSException {
899        }
900         */
901    
902        /**
903         * Associates a Transaction with the current thread.  Once this call is finished,
904         * the Transactio ncan be obtained via TransactionManager.getContexTransaction().
905         * @param message
906         * @throws JMSException
907         */
908        private final void associateTransaction(ActiveMQMessage message) throws JMSException {
909            Transaction transaction;
910            if( message.isPartOfTransaction() ) {
911                if (message.isXaTransacted()) {
912                    try {
913                        transaction = transactionManager.getXATransaction((ActiveMQXid) message.getTransactionId());
914                    }
915                    catch (XAException e) {
916                        throw (JMSException) new JMSException(e.getMessage()).initCause(e);
917                    }
918                } else {
919                    transaction = transactionManager.getLocalTransaction((String) message.getTransactionId());
920                }
921                
922            } else {
923                transaction = null;
924            }                
925            TransactionManager.setContexTransaction(transaction);
926        }
927    
928        private void disAssociateTransaction() {
929            TransactionManager.setContexTransaction(null);
930        }
931        
932        /**
933         * Associates a Transaction with the current thread.  Once this call is finished,
934         * the Transactio ncan be obtained via TransactionManager.getContexTransaction().
935         * @param ack
936         * @throws JMSException
937         */
938        private void associateTransaction(MessageAck ack) throws JMSException {
939            Transaction transaction;
940            if( ack.isPartOfTransaction() ) {
941                if (ack.isXaTransacted()) {
942                    try {
943                        transaction = transactionManager.getXATransaction((ActiveMQXid) ack.getTransactionId());
944                    }
945                    catch (XAException e) {
946                        throw (JMSException) new JMSException(e.getMessage()).initCause(e);
947                    }
948                } else {
949                    transaction = transactionManager.getLocalTransaction((String) ack.getTransactionId());
950                }
951                
952            } else {
953                transaction = null;
954            }                
955            TransactionManager.setContexTransaction(transaction);
956        }
957        
958        private String sanitizeString(String in) {
959            String result = null;
960            if (in != null) {
961                result = in.replace(':', '_');
962                result = result.replace('/', '_');
963                result = result.replace('\\', '_');
964            }
965            return result;
966        }
967    
968        /**
969         * @return Returns the memoryManager.
970         */
971        public MemoryBoundedObjectManager getMemoryManager() {
972            return memoryManager;
973        }
974    
975    
976        /**
977         * @return Returns the queueManager.
978         */
979        public MemoryBoundedQueueManager getQueueManager() {
980            return queueManager;
981        }
982        
983        
984        public String getName() {
985            return getBrokerName();
986        }
987        
988        
989        public String toString (){
990            return "broker: " + getName();
991        }
992    
993        /**
994         * @see org.activemq.broker.Broker#getBrokerInfo()
995         */
996        public BrokerInfo getBrokerInfo(){
997           return brokerInfo;
998        }
999        
1000        protected void setBrokercontainer(BrokerContainerImpl container){
1001            this.brokerContainer = container;
1002        }
1003        
1004        protected BrokerContainerImpl getBrokerContainer(){
1005            return brokerContainer;
1006        }
1007    
1008        
1009        
1010    
1011    }