001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    
019    package org.activemq.broker.impl;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Set;
025    
026    import javax.jms.ExceptionListener;
027    import javax.jms.JMSException;
028    import javax.security.auth.Subject;
029    import javax.transaction.xa.XAException;
030    
031    import org.activemq.broker.BrokerAdmin;
032    import org.activemq.broker.BrokerClient;
033    import org.activemq.broker.BrokerConnector;
034    import org.activemq.io.util.SpooledBoundedActiveMQMessageQueue;
035    import org.activemq.message.ActiveMQMessage;
036    import org.activemq.message.ActiveMQXid;
037    import org.activemq.message.BrokerAdminCommand;
038    import org.activemq.message.BrokerInfo;
039    import org.activemq.message.CapacityInfo;
040    import org.activemq.message.CleanupConnectionInfo;
041    import org.activemq.message.ConnectionInfo;
042    import org.activemq.message.ConsumerInfo;
043    import org.activemq.message.DurableUnsubscribe;
044    import org.activemq.message.IntResponseReceipt;
045    import org.activemq.message.KeepAlive;
046    import org.activemq.message.MessageAck;
047    import org.activemq.message.Packet;
048    import org.activemq.message.PacketListener;
049    import org.activemq.message.ProducerInfo;
050    import org.activemq.message.Receipt;
051    import org.activemq.message.ResponseReceipt;
052    import org.activemq.message.SessionInfo;
053    import org.activemq.message.TransactionInfo;
054    import org.activemq.message.XATransactionInfo;
055    import org.activemq.transport.NetworkChannel;
056    import org.activemq.transport.NetworkConnector;
057    import org.activemq.transport.TransportChannel;
058    import org.activemq.util.IdGenerator;
059    import org.apache.commons.logging.Log;
060    import org.apache.commons.logging.LogFactory;
061    
062    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
063    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
064    import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
065    
066    /**
067     * A Broker client side proxy representing a JMS Connnection
068     * 
069     * @version $Revision: 1.1.1.1 $
070     */
071    public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener {
072        
073        private static final Log log = LogFactory.getLog(BrokerClientImpl.class);
074        private static final Log commandLog = LogFactory.getLog("org.activemq.broker.CommandTrace");
075        
076        private BrokerConnector brokerConnector;
077        private TransportChannel channel;
078        private ConnectionInfo connectionInfo;
079        private IdGenerator packetIdGenerator;
080        private SynchronizedBoolean closed;
081        private Set activeConsumers;
082        private CopyOnWriteArrayList consumers;
083        private CopyOnWriteArrayList producers;
084        private CopyOnWriteArrayList transactions;
085        private CopyOnWriteArrayList sessions;
086        private SynchronizedBoolean started;
087        private boolean brokerConnection;
088        private boolean clusteredConnection;
089        private String remoteBrokerName;
090        private int capacity = 100;
091        private SpooledBoundedActiveMQMessageQueue spoolQueue;
092        private boolean cleanedUp;
093        private boolean registered;
094        private ArrayList dispatchQueue = new ArrayList();
095        private Subject subject;
096        private boolean remoteNetworkConnector;
097    
098        /**
099         * Default Constructor of BrokerClientImpl
100         */
101        public BrokerClientImpl() {
102            this.packetIdGenerator = new IdGenerator();
103            this.closed = new SynchronizedBoolean(false);
104            this.started = new SynchronizedBoolean(false);
105            this.activeConsumers = new HashSet();
106            this.consumers = new CopyOnWriteArrayList();
107            this.producers = new CopyOnWriteArrayList();
108            this.transactions = new CopyOnWriteArrayList();
109            this.sessions = new CopyOnWriteArrayList();
110        }
111    
112        /**
113         * Initialize the BrokerClient
114         * 
115         * @param brokerConnector
116         * @param channel
117         */
118        public void initialize(BrokerConnector brokerConnector, TransportChannel channel) {
119            this.brokerConnector = brokerConnector;
120            this.channel = channel;
121            this.channel.setPacketListener(this);
122            this.channel.setExceptionListener(this);
123            log.trace("brokerConnectorConnector client initialized");
124        }
125    
126        /**
127         * @return the BrokerConnector this client is associated with
128         */
129        public BrokerConnector getBrokerConnector() {
130            return this.brokerConnector;
131        }
132    
133        /**
134         * @return the connection information for this client
135         */
136        public ConnectionInfo getConnectionInfo() {
137            return connectionInfo;
138        }
139    
140        /**
141         * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
142         */
143        public void onException(JMSException jmsEx) {
144            log.info("Client disconnected: " + this);
145            log.debug("Disconnect cuase: ", jmsEx);
146            close();
147        }
148    
149        /**
150         * @return pretty print for this brokerConnector-client
151         */
152        public String toString() {
153            String str = "brokerConnector-client:(" + hashCode() + ") ";
154            str += connectionInfo == null ? "" : connectionInfo.getClientId();
155            str += ": " + channel;
156            return str;
157        }
158    
159        /**
160         * Dispatch an ActiveMQMessage to the end client
161         * 
162         * @param message
163         */
164        public void dispatch(ActiveMQMessage message) {
165            if (!isSlowConsumer()) {
166                dispatchToClient(message);
167            }
168            else {
169                if (spoolQueue == null) {
170                    log.warn("Connection: " + connectionInfo.getClientId() + " is a slow consumer");
171                    String spoolName = brokerConnector.getBrokerInfo().getBrokerName() + "_" + connectionInfo.getClientId();
172                    try {
173                        spoolQueue = new SpooledBoundedActiveMQMessageQueue(brokerConnector.getBrokerContainer().getBroker()
174                                .getTempDir(), spoolName);
175                        final SpooledBoundedActiveMQMessageQueue bpq = spoolQueue;
176                        ThreadedExecutor exec = new ThreadedExecutor();
177                        exec.execute(new Runnable() {
178                            public void run() {
179                                while (!closed.get()) {
180                                    try {
181                                        Packet packet = bpq.dequeue();
182                                        if (packet != null) {
183                                            dispatchToClient(packet);
184                                        }
185                                    }
186                                    catch (InterruptedException e) {
187                                        log.warn("async dispatch got an interupt", e);
188                                    }
189                                    catch (JMSException e) {
190                                        log.error("async dispatch got an problem", e);
191                                    }
192                                }
193                            }
194                        });
195                    }
196                    catch (IOException e) {
197                        log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
198                        close();
199                    }
200                    catch (InterruptedException e) {
201                        log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
202                        close();
203                    }
204                }
205                if (spoolQueue != null) {
206                    try {
207                        spoolQueue.enqueue(message);
208                    }
209                    catch (JMSException e) {
210                        log.error(
211                                "Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer",
212                                e);
213                        close();
214                    }
215                }
216            }
217        }
218    
219        private void dispatchToClient(Packet message) {
220            if (started.get()) {
221                send(message);
222                
223            }
224            else {
225                boolean msgSent = false;
226                if (message.isJMSMessage()) {
227                    ActiveMQMessage jmsMsg = (ActiveMQMessage) message;
228                    if (jmsMsg.getJMSActiveMQDestination().isAdvisory()) {
229                        send(message);
230                        msgSent = true;
231                    }
232                }
233                if (!msgSent) {
234                    // If the connection is stopped.. we have to hold the message till it is started.
235                    synchronized (started) {
236                        dispatchQueue.add(message);
237                    }
238                }
239            }
240        }
241    
242        /**
243         * @return true if the peer for this Client is itself another Broker
244         */
245        public boolean isBrokerConnection() {
246            return brokerConnection;
247        }
248    
249        /**
250         * @return true id this client is part of a cluster
251         */
252        public boolean isClusteredConnection() {
253            return clusteredConnection;
254        }
255    
256        /**
257         * Get the Capacity for in-progress messages at the peer (probably a JMSConnection) Legimate values between 0-100. 0
258         * capacity representing that the peer cannot process any more messages at the current time
259         * 
260         * @return
261         */
262        public int getCapacity() {
263            return capacity;
264        }
265    
266        /**
267         * @return the client id of the remote connection
268         */
269        public String getClientID() {
270            if (connectionInfo != null) {
271                return connectionInfo.getClientId();
272            }
273            return null;
274        }
275    
276        /**
277         * @return the channel used
278         */
279        public TransportChannel getChannel() {
280            return channel;
281        }
282    
283        /**
284         * Get an indication if the peer should be considered as a slow consumer
285         * 
286         * @return true id the peer should be considered as a slow consumer
287         */
288        public boolean isSlowConsumer() {
289            return capacity <= 20; //don't want to fill the peer completely - as this may effect it's processing!
290        }
291    
292        /**
293         * Consume a Packet from the underlying TransportChannel for processing
294         * 
295         * @param packet
296         */
297        public void consume(Packet packet) {
298            if (packet != null) {
299                
300                if( commandLog.isDebugEnabled() )
301                    commandLog.debug("broker for "+getClientID()+" received: "+packet);
302    
303                Throwable requestEx = null;
304                boolean failed = false;
305                boolean receiptRequired = packet.isReceiptRequired();
306                short correlationId = packet.getId();
307                String brokerName = brokerConnector.getBrokerInfo().getBrokerName();
308                String clusterName = brokerConnector.getBrokerInfo().getClusterName();
309                try {
310                    if (brokerConnection) {
311                        if (remoteBrokerName != null && remoteBrokerName.length() > 0) {
312                            packet.addBrokerVisited(remoteBrokerName); //got from the remote broker
313                        }
314                        packet.addBrokerVisited(brokerName);
315                    }
316                    // Checks if the current broker has already processed this packet.
317                    // This is a lazy check, since the broker already received the packet,
318                    // but chooses not to process it for cases where there is no remote broker info.
319                    else {
320                            if (packet.hasVisited(brokerName)) {
321                                    // Packet has already been processed. Do not process again.
322                            return;
323                        } else {
324                                // Include this broker as a processor of the packet.
325                            packet.addBrokerVisited(brokerName);
326                        }   
327                    }
328                    
329                    if (packet.isJMSMessage()) {
330                        ActiveMQMessage message = (ActiveMQMessage) packet;
331                        
332                        if (!brokerConnection) {
333                            message.setEntryBrokerName(brokerName);
334                            message.setEntryClusterName(clusterName);
335                        }
336                        consumeActiveMQMessage(message);
337                    }
338                    else {
339                        switch (packet.getPacketType()) {
340                            case Packet.ACTIVEMQ_MSG_ACK : {
341                                MessageAck ack = (MessageAck) packet;
342                                consumeMessageAck(ack);
343                                break;
344                            }
345                            case Packet.XA_TRANSACTION_INFO : {
346                                XATransactionInfo info = (XATransactionInfo) packet;
347                                consumeXATransactionInfo(info);
348                                receiptRequired=info.isReceiptRequired();
349                                break;
350                            }
351                            case Packet.TRANSACTION_INFO : {
352                                TransactionInfo info = (TransactionInfo) packet;
353                                consumeTransactionInfo(info);
354                                break;
355                            }
356                            case Packet.CONSUMER_INFO : {
357                                ConsumerInfo info = (ConsumerInfo) packet;
358                                consumeConsumerInfo(info);
359                                break;
360                            }
361                            case Packet.PRODUCER_INFO : {
362                                ProducerInfo info = (ProducerInfo) packet;
363                                consumeProducerInfo(info);
364                                break;
365                            }
366                            case Packet.SESSION_INFO : {
367                                SessionInfo info = (SessionInfo) packet;
368                                consumeSessionInfo(info);
369                                break;
370                            }
371                            case Packet.ACTIVEMQ_CONNECTION_INFO : {
372                                ConnectionInfo info = (ConnectionInfo) packet;
373                                consumeConnectionInfo(info);
374                                break;
375                            }
376                            case Packet.DURABLE_UNSUBSCRIBE : {
377                                DurableUnsubscribe ds = (DurableUnsubscribe) packet;
378                                brokerConnector.durableUnsubscribe(this, ds);
379                                break;
380                            }
381                            case Packet.CAPACITY_INFO : {
382                                CapacityInfo info = (CapacityInfo) packet;
383                                consumeCapacityInfo(info);
384                                break;
385                            }
386                            case Packet.CAPACITY_INFO_REQUEST : {
387                                updateCapacityInfo(packet.getId());
388                                break;
389                            }
390                            case Packet.ACTIVEMQ_BROKER_INFO : {
391                                consumeBrokerInfo((BrokerInfo) packet);
392                                break;
393                            }
394                            case Packet.KEEP_ALIVE : {
395                                // Ignore as the packet contains no additional information to consume
396                                break;
397                            }
398                            case Packet.BROKER_ADMIN_COMMAND : {
399                                consumeBrokerAdminCommand((BrokerAdminCommand) packet);
400                                break;
401                            }
402                            case Packet.CLEANUP_CONNECTION_INFO : {
403                                consumeCleanupConnectionInfo((CleanupConnectionInfo) packet);
404                                break;
405                            }
406                            default : {
407                                log.warn("Unknown Packet received: " + packet);
408                                break;
409                            }
410                        }
411                    }
412                }
413                catch (Throwable e) {
414                    requestEx = e;
415                    log.warn("caught exception consuming packet: " + packet, e);
416                    failed = true;
417                }
418                if (receiptRequired){
419                    sendReceipt(correlationId, requestEx, failed);
420                }
421            }
422        }
423    
424        /**
425         * @param cleanupInfo
426         * @throws JMSException 
427         */
428        private void consumeCleanupConnectionInfo(CleanupConnectionInfo cleanupInfo) throws JMSException {
429            try {
430                
431                for (Iterator i = consumers.iterator(); i.hasNext();) {
432                    ConsumerInfo info = (ConsumerInfo) i.next();
433                    info.setStarted(false);
434                    this.brokerConnector.deregisterMessageConsumer(this, info);
435                }
436                for (Iterator i = producers.iterator(); i.hasNext();) {
437                    ProducerInfo info = (ProducerInfo) i.next();
438                    info.setStarted(false);
439                    this.brokerConnector.deregisterMessageProducer(this, info);
440                }
441                for (Iterator i = sessions.iterator(); i.hasNext();) {
442                    SessionInfo info = (SessionInfo) i.next();
443                    info.setStarted(false);
444                    this.brokerConnector.deregisterSession(this, info);
445                }
446                for (Iterator i = transactions.iterator(); i.hasNext();) {
447                    this.brokerConnector.rollbackTransaction(this, i.next().toString());
448                }
449                this.brokerConnector.deregisterClient(this, connectionInfo);
450                registered = false;
451                
452            } finally {
453                // whatever happens, lets make sure we unregister & clean things
454                // down
455                if (log.isDebugEnabled()) {
456                    log.debug(this + " has stopped");
457                }
458                this.consumers.clear();
459                this.producers.clear();
460                this.transactions.clear();
461                this.sessions.clear();
462            }
463    
464        }
465    
466        /**
467         * @param command
468         * @throws JMSException
469         */
470        private void consumeBrokerAdminCommand(BrokerAdminCommand command) throws JMSException {
471            BrokerAdmin brokerAdmin = brokerConnector.getBrokerContainer().getBroker().getBrokerAdmin();
472            if (BrokerAdminCommand.CREATE_DESTINATION.equals(command.getCommand())) {
473                brokerAdmin.createMessageContainer(command.getDestination());
474            }
475            else if (BrokerAdminCommand.DESTROY_DESTINATION.equals(command.getCommand())) {
476                brokerAdmin.destoryMessageContainer(command.getDestination());
477            }
478            else if (BrokerAdminCommand.EMPTY_DESTINATION.equals(command.getCommand())) {
479                brokerAdmin.getMessageContainerAdmin(command.getDestination()).empty();
480            }
481            else if (BrokerAdminCommand.SHUTDOWN_SERVER_VM.equals(command.getCommand())) {
482                if (Boolean.getBoolean("enable.vm.shutdown")) {
483                    log.info("processing command=" + BrokerAdminCommand.SHUTDOWN_SERVER_VM);
484                    System.exit(1);
485                } else
486                {
487                    log.warn("ignoring command=" + BrokerAdminCommand.SHUTDOWN_SERVER_VM + ", enable.vm.shutdown=false");
488                }
489            }
490            else {
491                throw new JMSException("Broker Admin Command type: " + command.getCommand() + " not recognized.");
492            }
493        }
494    
495        /**
496         * Register/deregister MessageConsumer with the Broker
497         * 
498         * @param info
499         * @throws JMSException
500         */
501        public void consumeConsumerInfo(ConsumerInfo info) throws JMSException {
502            String localBrokerName = brokerConnector.getBrokerInfo().getBrokerName();
503            if (info.isStarted()) {
504                consumers.add(info);
505                if (this.activeConsumers.add(info)) {
506                    this.brokerConnector.registerMessageConsumer(this, info);
507                }
508            }
509            else {
510                consumers.remove(info);
511                if (activeConsumers.remove(info)) {
512                    this.brokerConnector.deregisterMessageConsumer(this, info);
513                }
514            }
515        }
516    
517        /**
518         * Update the peer Connection about the Broker's capacity for messages
519         * 
520         * @param capacity
521         */
522        public void updateBrokerCapacity(int capacity) {
523            CapacityInfo info = new CapacityInfo();
524            info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
525            info.setCapacity(capacity);
526            info.setFlowControlTimeout(getFlowControlTimeout(capacity));
527            send(info);
528        }
529    
530        /**
531         * register with the Broker
532         * 
533         * @param info
534         * @throws JMSException
535         */
536        public void consumeConnectionInfo(ConnectionInfo info) throws JMSException {
537            this.connectionInfo = info;
538            if (info.isClosed()) {
539                try {
540                    cleanUp();
541                    if (info.isReceiptRequired()){
542                        sendReceipt(info.getId(), null, false);
543                    }
544                    info.setReceiptRequired(false);
545                    try {
546                        Thread.sleep(500);
547                    }
548                    catch (Throwable e) {
549                    }
550                }
551                finally {
552                    close();
553                }
554            }
555            else {
556                if (!registered) {
557                    this.brokerConnector.registerClient(this, info);
558                    registered = true;
559                }
560                synchronized (started) {
561                    //set transport hint
562                    if (info.getProperties() != null && info.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY) != null){
563                       boolean noDelay = new Boolean(info.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY)).booleanValue();
564                       channel.setNoDelay(noDelay);
565                        
566                    }
567                    if (!started.get() && info.isStarted()) {
568                        started.set(true);
569                        // Dispatch any queued
570                        log.debug(this + " has started running client version " + info.getClientVersion()
571                                + " , wire format = " + info.getWireFormatVersion());
572                        //go through consumers, producers, and sessions - setting their clientId (which might not have been set)
573                        for (Iterator i = consumers.iterator();i.hasNext();) {
574                            ConsumerInfo ci = (ConsumerInfo) i.next();
575                            ci.setClientId(info.getClientId());
576                        }
577                        for (Iterator i = producers.iterator();i.hasNext();) {
578                            ProducerInfo pi = (ProducerInfo) i.next();
579                            pi.setClientId(info.getClientId());
580                        }
581                        for (Iterator i = sessions.iterator();i.hasNext();) {
582                            SessionInfo si = (SessionInfo) i.next();
583                            si.setClientId(info.getClientId());
584                        }
585                        for (int i = 0;i < dispatchQueue.size();i++) {
586                            ActiveMQMessage msg = (ActiveMQMessage) dispatchQueue.get(i);
587                            dispatch(msg);
588                        }
589                        dispatchQueue.clear();
590                    }
591                    if (started.get() && !info.isStarted()) {
592                        started.set(false);
593                        log.debug(this + " has stopped");
594                    }
595                }
596            }
597        }
598    
599        /**
600         * start consuming messages
601         * 
602         * @throws JMSException
603         */
604        public void start() throws JMSException {
605            channel.start();
606        }
607    
608        /**
609         * stop consuming messages
610         * 
611         * @throws JMSException
612         */
613        public void stop() throws JMSException {
614            log.trace("Stopping channel: " + channel);
615            channel.stop();
616        }
617    
618        /**
619         * cleanup
620         */
621        public synchronized void cleanUp() {
622            // we could be called here from 2 different code paths
623            // based on if we get a transport failure or we do a clean shutdown
624            // so lets only run this stuff once
625            if (!cleanedUp) {
626                cleanedUp = true;
627                try {
628                    try {
629                        for (Iterator i = consumers.iterator();i.hasNext();) {
630                            ConsumerInfo info = (ConsumerInfo) i.next();
631                            info.setStarted(false);
632                            this.brokerConnector.deregisterMessageConsumer(this, info);
633                        }
634                        for (Iterator i = producers.iterator();i.hasNext();) {
635                            ProducerInfo info = (ProducerInfo) i.next();
636                            info.setStarted(false);
637                            this.brokerConnector.deregisterMessageProducer(this, info);
638                        }
639                        for (Iterator i = sessions.iterator();i.hasNext();) {
640                            SessionInfo info = (SessionInfo) i.next();
641                            info.setStarted(false);
642                            this.brokerConnector.deregisterSession(this, info);
643                        }
644                        for (Iterator i = transactions.iterator();i.hasNext();) {
645                            this.brokerConnector.rollbackTransaction(this, i.next().toString());
646                        }
647                    }
648                    finally {
649                        // whatever happens, lets make sure we unregister & clean things down
650                        if (log.isDebugEnabled()) {
651                            log.debug(this + " has stopped");
652                        }
653                        this.consumers.clear();
654                        this.producers.clear();
655                        this.transactions.clear();
656                        this.sessions.clear();
657                        this.brokerConnector.deregisterClient(this, connectionInfo);
658                        registered = false;
659                    }
660                }
661                catch (JMSException e) {
662                    log.warn("failed to de-register Broker client: " + e, e);
663                }
664            }
665            else {
666                log.debug("We are ignoring a duplicate cleanup() method called for: " + this);
667            }
668        }
669    
670        // Implementation methods
671        //-------------------------------------------------------------------------
672        protected void send(Packet packet) {
673            if (!closed.get()) {
674                try {
675                    if (brokerConnection) {
676                        String brokerName = brokerConnector.getBrokerContainer().getBroker().getBrokerName();
677                        packet.addBrokerVisited(brokerName);
678                        if (packet.hasVisited(remoteBrokerName)) {
679                            if (log.isDebugEnabled()) {
680                                log.debug("Not Forwarding: " + remoteBrokerName + " has already been visited by packet: "
681                                        + packet);
682                            }
683                            return;
684                        }
685                    }
686                    packet.setId(this.packetIdGenerator.getNextShortSequence());
687                    if( commandLog.isDebugEnabled() )
688                        commandLog.debug("broker for "+getClientID()+" sending: "+packet);
689                    this.channel.asyncSend(packet);
690                }
691                catch (JMSException e) {
692                    log.warn(this + " caught exception ", e);
693                    close();
694                }
695            }
696        }
697        
698        /**
699         * validate the connection
700         * @param timeout
701         * @throws JMSException
702         */
703            public void validateConnection(int timeout) throws JMSException {
704                    KeepAlive packet = new KeepAlive();
705                    packet.setReceiptRequired(true);
706                    packet.setId(this.packetIdGenerator.getNextShortSequence());
707                    // In most cases, if the transport is dead due to network errors
708                    // the network error will be recognised immediately and an exception 
709                    // thrown. If the duplicate client ids are due to misconfiguration, 
710                    // we make sure that we do not terminate the "right" connection 
711                    // prematurely by using a long timeout here. If the existing client
712                    // is working heavily and/or over a slow link, it might take some time
713                    // for it to respond. In such a case, the new client is misconfigured
714                    // and can wait for a while before being kicked out.
715    
716                    Receipt r = getChannel().send(packet, timeout);
717                    if (r == null) throw new JMSException("Client did not respond in time");
718    
719            }
720        
721        protected void close() {
722            if (closed.commit(false, true)) {
723                this.channel.stop();
724                log.debug(this + " has closed");
725            }
726        }
727    
728        /**
729         * Send message to Broker
730         * 
731         * @param message
732         * @throws JMSException
733         */
734        private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException {
735            this.brokerConnector.sendMessage(this, message);
736        }
737    
738        /**
739         * Send Message acknowledge to the Broker
740         * 
741         * @param ack
742         * @throws JMSException
743         */
744        private void consumeMessageAck(MessageAck ack) throws JMSException {
745            this.brokerConnector.acknowledgeMessage(this, ack);
746        }
747    
748        /**
749         * Handle transaction start/commit/rollback
750         * 
751         * @param info
752         * @throws JMSException
753         */
754        private void consumeTransactionInfo(TransactionInfo info) throws JMSException {
755            if (info.getType() == TransactionInfo.START) {
756                transactions.add(info.getTransactionId());
757                this.brokerConnector.startTransaction(this, info.getTransactionId());
758            }
759            else {
760                if (info.getType() == TransactionInfo.ROLLBACK) {
761                    this.brokerConnector.rollbackTransaction(this, info.getTransactionId());
762                }
763                else if (info.getType() == TransactionInfo.COMMIT) {
764                    this.brokerConnector.commitTransaction(this, info.getTransactionId());
765                }
766                transactions.remove(info.getTransactionId());
767            }
768        }
769    
770        /**
771         * Handle XA transaction start/prepare/commit/rollback
772         * 
773         * @param info
774         * @throws JMSException
775         * @throws XAException
776         */
777        private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException {
778            if (info.getType() == XATransactionInfo.START) {
779                this.brokerConnector.startTransaction(this, info.getXid());
780            }
781            else if (info.getType() == XATransactionInfo.XA_RECOVER) {
782                ActiveMQXid rc[] = this.brokerConnector.getPreparedTransactions(this);
783                if( info.isReceiptRequired()) {
784                    // We will be sending our own receipt..
785                    info.setReceiptRequired(false);
786                    // Send the receipt..
787                    ResponseReceipt receipt = new ResponseReceipt();
788                    receipt.setCorrelationId(info.getId());
789                    receipt.setResult(rc);
790                    send(receipt);
791                }
792            }
793            else if (info.getType() == XATransactionInfo.GET_RM_ID) {
794                String rc = this.brokerConnector.getResourceManagerId(this);
795                if( info.isReceiptRequired()) {
796                    // We will be sending our own receipt..
797                    info.setReceiptRequired(false);
798                    // Send the receipt..
799                    ResponseReceipt receipt = new ResponseReceipt();
800                    receipt.setId(this.packetIdGenerator.getNextShortSequence());
801                    receipt.setCorrelationId(info.getId());
802                    receipt.setResult(rc);
803                    send(receipt);
804                }
805            }
806            else if (info.getType() == XATransactionInfo.END) {
807                // we don't do anything..
808            }
809            else {
810                if (info.getType() == XATransactionInfo.PRE_COMMIT) {
811                    int rc = this.brokerConnector.prepareTransaction(this, info.getXid());
812                    // We will be sending our own receipt..
813                    if( info.isReceiptRequired()) {
814                        info.setReceiptRequired(false);
815                        // Send the receipt..
816                        IntResponseReceipt receipt = new IntResponseReceipt();
817                        receipt.setId(this.packetIdGenerator.getNextShortSequence());
818                        receipt.setCorrelationId(info.getId());
819                        receipt.setResult(rc);
820                        send(receipt);
821                    }
822                }
823                else if (info.getType() == XATransactionInfo.ROLLBACK) {
824                    this.brokerConnector.rollbackTransaction(this, info.getXid());
825                }
826                else if (info.getType() == XATransactionInfo.COMMIT_ONE_PHASE) {
827                    this.brokerConnector.commitTransaction(this, info.getXid(), true);
828                }
829                else if (info.getType() == XATransactionInfo.COMMIT) {
830                    this.brokerConnector.commitTransaction(this, info.getXid(), false);
831                }
832                else {
833                    throw new JMSException("Packet type: " + info.getType() + " not recognized.");
834                }
835            }
836        }
837    
838        /**
839         * register/deregister MessageProducer in the Broker
840         * 
841         * @param info
842         * @throws JMSException
843         */
844        private void consumeProducerInfo(ProducerInfo info) throws JMSException {
845            if (info.isStarted()) {
846                producers.add(info);
847                this.brokerConnector.registerMessageProducer(this, info);
848            }
849            else {
850                producers.remove(info);
851                this.brokerConnector.deregisterMessageProducer(this, info);
852            }
853        }
854    
855        /**
856         * register/deregister Session in a Broker
857         * 
858         * @param info
859         * @throws JMSException
860         */
861        private void consumeSessionInfo(SessionInfo info) throws JMSException {
862            if (info.isStarted()) {
863                sessions.add(info);
864                this.brokerConnector.registerSession(this, info);
865            }
866            else {
867                sessions.remove(info);
868                this.brokerConnector.deregisterSession(this, info);
869            }
870        }
871    
872        /**
873         * Update capacity for the peer
874         * 
875         * @param info
876         */
877        private void consumeCapacityInfo(CapacityInfo info) {
878            this.capacity = info.getCapacity();
879        }
880    
881        private void updateCapacityInfo(short correlationId) {
882            CapacityInfo info = new CapacityInfo();
883            info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
884            info.setCorrelationId(correlationId);
885            info.setCapacity(this.brokerConnector.getBrokerCapacity());
886            info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity()));
887            send(info);
888        }
889    
890        private long getFlowControlTimeout(int capacity) {
891            long result = -1;
892            if (capacity <= 0) {
893                result = 10000;
894            }
895            else if (capacity <= 10) {
896                result = 1000;
897            }
898            else if (capacity <= 20) {
899                result = 10;
900            }
901            return result;
902        }
903    
904        private void consumeBrokerInfo(final BrokerInfo info) {
905            brokerConnection = true;
906            started.set(true);
907            remoteBrokerName = info.getBrokerName();
908            if (remoteBrokerName == null || remoteBrokerName.length() == 0) {
909                log.warn("No remote broker name available!");
910            }
911            else {
912                if (log.isDebugEnabled()) {
913                   log.debug("Received broker info from: " + remoteBrokerName + " on client: " + channel);
914                }
915            }
916            String clusterName = getBrokerConnector().getBrokerContainer().getBroker().getBrokerClusterName();
917            if (clusterName.equals(info.getClusterName())) {
918                clusteredConnection = true;
919            }
920            if (!remoteNetworkConnector && info.isRemote()) {
921                try {
922                    final NetworkConnector networkConnector = new NetworkConnector(brokerConnector.getBrokerContainer());
923                    networkConnector.getThreadPool().execute(new Runnable() {
924                        public void run() {
925                            try {
926                                NetworkChannel networkChannel = new NetworkChannel(networkConnector, brokerConnector
927                                        .getBrokerContainer(), channel, info.getBrokerName(), info.getClusterName());
928                                networkConnector.addNetworkChannel(networkChannel);
929                                brokerConnector.getBrokerContainer().addNetworkConnector(networkConnector);
930                                networkConnector.start();
931                            }
932                            catch (JMSException e) {
933                                log.error("Failed to create reverse remote channel", e);
934                            }
935                        }
936                    });
937                    log.info("Started reverse remote channel to " + remoteBrokerName);
938                    remoteNetworkConnector = true;
939                }
940                catch (InterruptedException e) {
941                    log.error("Failed to create reverse remote channel", e);
942                }
943            }
944        }
945    
946    
947        private void sendReceipt(short correlationId, Throwable requestEx, boolean failed) {
948            Receipt receipt = new Receipt();
949            receipt.setCorrelationId(correlationId);
950            receipt.setBrokerName(brokerConnector.getBrokerInfo().getBrokerName());
951            receipt.setClusterName(brokerConnector.getBrokerInfo().getClusterName());
952            receipt.setException(requestEx);
953            receipt.setFailed(failed);
954            send(receipt);
955        }
956    
957        /**
958         * @param subject
959         */
960        public void setSubject(Subject subject) {
961            this.subject = subject;
962        }
963    
964        /**
965         * @return the subject
966         */
967        public Subject getSubject() {
968            return subject;
969        }
970    }