001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport;
020    import java.util.Iterator;
021    import java.util.Map;
022    import javax.jms.JMSException;
023    import javax.jms.Session;
024    import org.apache.commons.logging.Log;
025    import org.apache.commons.logging.LogFactory;
026    import org.activemq.ActiveMQConnection;
027    import org.activemq.ActiveMQConnectionFactory;
028    import org.activemq.ActiveMQPrefetchPolicy;
029    import org.activemq.advisories.ConnectionAdvisor;
030    import org.activemq.advisories.ConnectionAdvisoryEvent;
031    import org.activemq.advisories.ConnectionAdvisoryEventListener;
032    import org.activemq.broker.BrokerClient;
033    import org.activemq.broker.BrokerContainer;
034    import org.activemq.broker.ConsumerInfoListener;
035    import org.activemq.message.ActiveMQDestination;
036    import org.activemq.message.BrokerInfo;
037    import org.activemq.message.ConsumerInfo;
038    import org.activemq.message.Receipt;
039    import org.activemq.service.MessageContainerManager;
040    import org.activemq.service.Service;
041    import org.activemq.transport.composite.CompositeTransportChannel;
042    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
043    import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
044    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
045    
046    /**
047     * Represents a broker's connection with a single remote broker which bridges the two brokers to form a network. <p/>
048     * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are
049     * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local
050     * broker.
051     * 
052     * @version $Revision: 1.1.1.1 $
053     */
054    public class NetworkChannel
055            implements
056                Service,
057                ConsumerInfoListener,
058                ConnectionAdvisoryEventListener,
059                TransportStatusEventListener {
060        private static final Log log = LogFactory.getLog(NetworkChannel.class);
061        protected String uri;
062        protected BrokerContainer brokerContainer;
063        protected ActiveMQConnection localConnection;
064        protected ActiveMQConnection remoteConnection;
065        protected ConcurrentHashMap topicConsumerMap;
066        protected ConcurrentHashMap queueConsumerMap;
067        protected String remoteUserName;
068        protected String remotePassword;
069        protected String remoteBrokerName;
070        protected String remoteClusterName;
071        protected int maximumRetries = 0;
072        protected long reconnectSleepTime = 2000L;
073        protected PooledExecutor threadPool;
074        private boolean remote = false;
075        private SynchronizedBoolean started = new SynchronizedBoolean(false);
076        private SynchronizedBoolean connected = new SynchronizedBoolean(false);
077        private SynchronizedBoolean stopped = new SynchronizedBoolean(false);
078        private ConnectionAdvisor connectionAdvisor;
079        private ActiveMQPrefetchPolicy localPrefetchPolicy;
080        private ActiveMQPrefetchPolicy remotePrefetchPolicy;
081        private boolean demandBasedForwarding = true;
082    
083        /**
084         * Default constructor
085         */
086        public NetworkChannel() {
087            this.topicConsumerMap = new ConcurrentHashMap();
088            this.queueConsumerMap = new ConcurrentHashMap();
089        }
090    
091        /**
092         * Default Constructor
093         * 
094         * @param tp
095         */
096        public NetworkChannel(PooledExecutor tp) {
097            this();
098            this.threadPool = tp;
099        }
100    
101        /**
102         * Constructor
103         * 
104         * @param connector
105         * @param brokerContainer
106         * @param uri
107         */
108        public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) {
109            this(connector.threadPool);
110            this.brokerContainer = brokerContainer;
111            this.uri = uri;
112        }
113    
114        /**
115         * Create a NetworkConnector from a TransportChannel
116         * 
117         * @param connector
118         * @param brokerContainer
119         * @param channel
120         * @param remoteBrokerName
121         * @param remoteclusterName
122         * @throws JMSException
123         */
124        public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, TransportChannel channel,
125                String remoteBrokerName, String remoteclusterName) throws JMSException {
126            this(connector.threadPool);
127            this.brokerContainer = brokerContainer;
128            this.uri = "";
129            this.remoteBrokerName = remoteBrokerName;
130            this.remoteClusterName = remoteclusterName;
131            ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory();
132            fac.setJ2EEcompliant(false);
133            fac.setTurboBoost(true);
134            remoteConnection = new ActiveMQConnection(fac, remoteUserName, remotePassword, channel);
135            remoteConnection.setClientID("Boondocks:" + remoteClusterName + ":" + remoteBrokerName);
136            remoteConnection.setQuickClose(true);
137            remoteConnection.start();
138            BrokerInfo info = new BrokerInfo();
139            info.setBrokerName(brokerContainer.getBroker().getBrokerName());
140            info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
141            channel.asyncSend(info);
142            remote = true;
143        }
144    
145        /**
146         * @see org.activemq.transport.TransportStatusEventListener#statusChanged(org.activemq.transport.TransportStatusEvent)
147         */
148        public void statusChanged(TransportStatusEvent event) {
149           if (event != null
150                    && (event.getChannelStatus() == TransportStatusEvent.CONNECTED
151                                    || event.getChannelStatus() == TransportStatusEvent.RECONNECTED)) {
152               connected.set(true);
153           }else {
154               connected.set(false);
155           }
156        }
157    
158        private void doSetConnected() {
159            synchronized (connected) {
160                connected.set(true);
161                connected.notifyAll();
162            }
163        }
164    
165        /**
166         * @return text info on this
167         */
168        public String toString() {
169            return "NetworkChannel{ " + ", uri = '" + uri + "' " + ", remoteBrokerName = '" + remoteBrokerName + "' "
170                    + " }";
171        }
172    
173        /**
174         * Start the channel
175         */
176        public void start() {
177            if (started.commit(false, true)) {
178                try {
179                    stopped.set(false);
180                    threadPool.execute(new Runnable() {
181                        public void run() {
182                            String originalName = Thread.currentThread().getName();
183                            try {
184                                Thread.currentThread().setName("NetworkChannel Initiator to " + uri);
185                                initialize();
186                                startSubscriptions();
187                                log.info("Started NetworkChannel to " + uri);
188                            }
189                            catch (JMSException jmsEx) {
190                                log.error("Failed to start NetworkChannel: " + uri, jmsEx);
191                            }
192                            finally {
193                                Thread.currentThread().setName(originalName);
194                            }
195                        }
196                    });
197                }
198                catch (InterruptedException e) {
199                    log.warn("Failed to start - interuppted", e);
200                }
201            }
202        }
203    
204        /**
205         * stop the channel
206         * 
207         * @throws JMSException on error
208         */
209        public void stop() throws JMSException {
210            if (started.commit(true, false)) {
211                stopped.set(true);
212                topicConsumerMap.clear();
213                if (remoteConnection != null) {
214                    remoteConnection.close();
215                    remoteConnection = null;
216                }
217                if (localConnection != null) {
218                    localConnection.close();
219                    localConnection = null;
220                }
221                for (Iterator i = topicConsumerMap.values().iterator();i.hasNext();) {
222                    NetworkMessageBridge consumer = (NetworkMessageBridge) i.next();
223                    consumer.stop();
224                }
225            }
226        }
227    
228        /**
229         * Listen for new Consumer events at this broker
230         * 
231         * @param client
232         * @param info
233         */
234        public void onConsumerInfo(final BrokerClient client, final ConsumerInfo info) {
235            String brokerName = client.getBrokerConnector().getBrokerInfo().getBrokerName();
236            if (!client.isClusteredConnection()) {
237                if (connected.get()) {
238                    if (!info.hasVisited(remoteBrokerName)) {
239                        if (info.isStarted()) {
240                            addConsumerInfo(info);
241                        }
242                        else {
243                            removeConsumerInfo(info);
244                        }
245                    }
246                }
247                else {
248                    try {
249                        threadPool.execute(new Runnable() {
250                            public void run() {
251                                if (!client.isClusteredConnection()) {
252                                    if (!info.hasVisited(remoteBrokerName)) {
253                                        synchronized (connected) {
254                                            while (!connected.get() && !stopped.get()) {
255                                                try {
256                                                    connected.wait(500);
257                                                }
258                                                catch (InterruptedException e) {
259                                                    log.debug("interuppted", e);
260                                                }
261                                            }
262                                            if (info.isStarted()) {
263                                                addConsumerInfo(info);
264                                            }
265                                            else {
266                                                removeConsumerInfo(info);
267                                            }
268                                        }
269                                    }
270                                }
271                            }
272                        });
273                    }
274                    catch (InterruptedException e) {
275                        log.warn("Failed to process ConsumerInfo: " + info, e);
276                    }
277                }
278            }
279        }
280    
281        /**
282         * @return the uri of the broker(s) this channel is connected to
283         */
284        public String getUri() {
285            return uri;
286        }
287    
288        /**
289         * set the uri of the broker(s) this channel is connected to
290         * 
291         * @param uri
292         */
293        public void setUri(String uri) {
294            this.uri = uri;
295        }
296    
297        /**
298         * @return Returns the remotePassword.
299         */
300        public String getRemotePassword() {
301            return remotePassword;
302        }
303    
304        /**
305         * @param remotePassword The remotePassword to set.
306         */
307        public void setRemotePassword(String remotePassword) {
308            this.remotePassword = remotePassword;
309        }
310    
311        /**
312         * @return Returns the remoteUserName.
313         */
314        public String getRemoteUserName() {
315            return remoteUserName;
316        }
317    
318        /**
319         * @param remoteUserName The remoteUserName to set.
320         */
321        public void setRemoteUserName(String remoteUserName) {
322            this.remoteUserName = remoteUserName;
323        }
324    
325        /**
326         * @return Returns the brokerContainer.
327         */
328        public BrokerContainer getBrokerContainer() {
329            return brokerContainer;
330        }
331    
332        /**
333         * @param brokerContainer The brokerContainer to set.
334         */
335        public void setBrokerContainer(BrokerContainer brokerContainer) {
336            this.brokerContainer = brokerContainer;
337        }
338    
339        public int getMaximumRetries() {
340            return maximumRetries;
341        }
342    
343        public void setMaximumRetries(int maximumRetries) {
344            this.maximumRetries = maximumRetries;
345        }
346    
347        public long getReconnectSleepTime() {
348            return reconnectSleepTime;
349        }
350    
351        public void setReconnectSleepTime(long reconnectSleepTime) {
352            this.reconnectSleepTime = reconnectSleepTime;
353        }
354    
355        public String getRemoteBrokerName() {
356            return remoteBrokerName;
357        }
358    
359        public void setRemoteBrokerName(String remoteBrokerName) {
360            this.remoteBrokerName = remoteBrokerName;
361        }
362    
363        /**
364         * @return Returns the threadPool.
365         */
366        protected PooledExecutor getThreadPool() {
367            return threadPool;
368        }
369    
370        /**
371         * @param threadPool The threadPool to set.
372         */
373        protected void setThreadPool(PooledExecutor threadPool) {
374            this.threadPool = threadPool;
375        }
376    
377        private synchronized ActiveMQConnection getLocalConnection() throws JMSException {
378            if (localConnection == null) {
379                initializeLocal();
380            }
381            return localConnection;
382        }
383    
384        private synchronized ActiveMQConnection getRemoteConnection() throws JMSException {
385            if (remoteConnection == null) {
386                initializeRemote();
387            }
388            return remoteConnection;
389        }
390    
391        /**
392         * @return Returns the localPrefetchPolicy.
393         */
394        public ActiveMQPrefetchPolicy getLocalPrefetchPolicy() {
395            return localPrefetchPolicy;
396        }
397    
398        /**
399         * @param localPrefetchPolicy The localPrefetchPolicy to set.
400         */
401        public void setLocalPrefetchPolicy(ActiveMQPrefetchPolicy localPrefetchPolicy) {
402            this.localPrefetchPolicy = localPrefetchPolicy;
403        }
404    
405        /**
406         * @return Returns the remotePrefetchPolicy.
407         */
408        public ActiveMQPrefetchPolicy getRemotePrefetchPolicy() {
409            return remotePrefetchPolicy;
410        }
411    
412        /**
413         * @param remotePrefetchPolicy The remotePrefetchPolicy to set.
414         */
415        public void setRemotePrefetchPolicy(ActiveMQPrefetchPolicy remotePrefetchPolicy) {
416            this.remotePrefetchPolicy = remotePrefetchPolicy;
417        }
418    
419        /**
420         * @return Returns the demandBasedForwarding.
421         */
422        public boolean isDemandBasedForwarding() {
423            return demandBasedForwarding;
424        }
425    
426        /**
427         * @param demandBasedForwarding The demandBasedForwarding to set.
428         */
429        public void setDemandBasedForwarding(boolean demandBasedForwarding) {
430            this.demandBasedForwarding = demandBasedForwarding;
431        }
432    
433        // Implementation methods
434        //-------------------------------------------------------------------------
435        /**
436         * Implementation of ConnectionAdvisoryEventListener
437         * 
438         * @param event
439         */
440        public void onEvent(ConnectionAdvisoryEvent event) {
441            String localBrokerName = brokerContainer.getBroker().getBrokerName();
442            if (!event.getInfo().isClosed()) {
443                brokerContainer.registerRemoteClientID(event.getInfo().getClientId());
444            }
445            else {
446                brokerContainer.deregisterRemoteClientID(event.getInfo().getClientId());
447            }
448        }
449    
450        private void addConsumerInfo(ConsumerInfo info) {
451            addConsumerInfo(info.getDestination(), info.getDestination().isTopic(), info.isDurableTopic());
452        }
453    
454        private void addConsumerInfo(ActiveMQDestination destination, boolean topic, boolean durableTopic) {
455            Map map = topic ? topicConsumerMap : queueConsumerMap;
456            NetworkMessageBridge bridge = (NetworkMessageBridge) map.get(destination.getPhysicalName());
457            if (bridge == null) {
458                bridge = createBridge(map, destination, durableTopic);
459            }
460            else if (durableTopic && !bridge.isDurableTopic() && !demandBasedForwarding) {
461                //upgrade our subscription
462                bridge.decrementReferenceCount();
463                upgradeBridge(bridge);
464            }
465            bridge.incrementReferenceCount();
466        }
467    
468        private void upgradeBridge(NetworkMessageBridge bridge) {
469            try {
470                remoteConnection.stop();
471                bridge.upgrade();
472            }
473            catch (JMSException e) {
474                log.warn("Could not upgrade the NetworkMessageBridge to a durable subscription for destination: "
475                        + bridge.getDestination(), e);
476            }
477            try {
478                remoteConnection.start();
479            }
480            catch (JMSException e) {
481                log.error("Failed to restart the NetworkMessageBridge", e);
482            }
483        }
484    
485        private NetworkMessageBridge createBridge(Map map, ActiveMQDestination destination, boolean durableTopic) {
486            NetworkMessageBridge bridge = new NetworkMessageBridge();
487            try {
488                bridge.setDestination(destination);
489                bridge.setDurableTopic(durableTopic);
490                bridge.setLocalBrokerName(brokerContainer.getBroker().getBrokerName());
491                bridge.setLocalSession(getLocalConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE));
492                bridge.setRemoteSession(getRemoteConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE));
493                map.put(destination.getPhysicalName(), bridge);
494                bridge.start();
495                log.info("started NetworkMessageBridge for destination: " + destination + " -- NetworkChannel: "
496                        + this.toString());
497            }
498            catch (JMSException jmsEx) {
499                log.error("Failed to start NetworkMessageBridge for destination: " + destination, jmsEx);
500            }
501            return bridge;
502        }
503    
504        private void removeConsumerInfo(final ConsumerInfo info) {
505            final String physicalName = info.getDestination().getPhysicalName();
506            Map map = (demandBasedForwarding || info.getDestination().isTopic()) ? topicConsumerMap : queueConsumerMap;
507            final NetworkMessageBridge bridge = (NetworkMessageBridge) map.get(physicalName);
508            if (bridge != null) {
509                if (bridge.decrementReferenceCount() <= 0) {
510                    try {
511                        threadPool.execute(new Runnable() {
512                            public void run() {
513                                bridge.stop();
514                                topicConsumerMap.remove(physicalName);
515                                log.info("stopped MetworkMessageBridge for destination: " + info.getDestination());
516                            }
517                        });
518                    }
519                    catch (InterruptedException e) {
520                        log.warn("got interrupted stoping NetworkBridge", e);
521                    }
522                }
523            }
524        }
525    
526        private void startSubscriptions() {
527            if (!demandBasedForwarding) {
528                if (!remote) {
529                    MessageContainerManager mcm = brokerContainer.getBroker().getPersistentTopicContainerManager();
530                    if (mcm != null) {
531                        Map map = mcm.getLocalDestinations();
532                        startSubscriptions(map, true, true);
533                    }
534                    mcm = brokerContainer.getBroker().getTransientTopicContainerManager();
535                    if (mcm != null) {
536                        Map map = mcm.getLocalDestinations();
537                        startSubscriptions(map, true, false);
538                    }
539                    mcm = brokerContainer.getBroker().getTransientQueueContainerManager();
540                    if (mcm != null) {
541                        Map map = mcm.getLocalDestinations();
542                        startSubscriptions(map, false, false);
543                    }
544                    mcm = brokerContainer.getBroker().getPersistentQueueContainerManager();
545                    if (mcm != null) {
546                        Map map = mcm.getLocalDestinations();
547                        startSubscriptions(map, false, false);
548                    }
549                }
550            }
551        }
552    
553        private void startSubscriptions(Map destinations, boolean topic, boolean durableTopic) {
554            if (destinations != null) {
555                for (Iterator i = destinations.values().iterator();i.hasNext();) {
556                    ActiveMQDestination dest = (ActiveMQDestination) i.next();
557                    addConsumerInfo(dest, topic, durableTopic);
558                }
559            }
560        }
561    
562        protected void initialize() throws JMSException {
563            // force lazy construction
564            initializeLocal();
565            initializeRemote();
566            brokerContainer.getBroker().addConsumerInfoListener(NetworkChannel.this);
567        }
568    
569        private synchronized void initializeRemote() throws JMSException {
570            if (remoteConnection == null) {
571                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(remoteUserName, remotePassword, uri);
572                //factory.setTurboBoost(true);
573                factory.setJ2EEcompliant(false);
574                factory.setQuickClose(true);
575                factory.setInternalConnection(true);
576                remoteConnection = (ActiveMQConnection) factory.createConnection();
577                TransportChannel transportChannel = remoteConnection.getTransportChannel();
578                if (transportChannel instanceof CompositeTransportChannel) {
579                    CompositeTransportChannel composite = (CompositeTransportChannel) transportChannel;
580                    composite.setMaximumRetries(maximumRetries);
581                    composite.setFailureSleepTime(reconnectSleepTime);
582                    composite.setIncrementTimeout(false);
583                }
584                transportChannel.addTransportStatusEventListener(this);
585                remoteConnection.setClientID(brokerContainer.getBroker().getBrokerName() + "_NetworkChannel");
586                remoteConnection.start();
587                BrokerInfo info = new BrokerInfo();
588                info.setBrokerName(brokerContainer.getBroker().getBrokerName());
589                info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
590                Receipt receipt = remoteConnection.syncSendRequest(info);
591                if (receipt != null) {
592                    remoteBrokerName = receipt.getBrokerName();
593                    remoteClusterName = receipt.getClusterName();
594                }
595                connectionAdvisor = new ConnectionAdvisor(remoteConnection);
596                connectionAdvisor.addListener(this);
597                connectionAdvisor.start();
598                if (remotePrefetchPolicy != null) {
599                    remoteConnection.setPrefetchPolicy(remotePrefetchPolicy);
600                }
601            }
602            doSetConnected();
603        }
604    
605        private void initializeLocal() throws JMSException {
606            String brokerName = brokerContainer.getBroker().getBrokerName();
607            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName);
608            factory.setTurboBoost(true);
609            factory.setJ2EEcompliant(false);
610            factory.setBrokerName(brokerName);
611            factory.setQuickClose(true);
612            factory.setInternalConnection(true);
613            localConnection = (ActiveMQConnection) factory.createConnection();
614            localConnection.start();
615            BrokerInfo info = new BrokerInfo();
616            info.setBrokerName(remoteBrokerName);
617            info.setClusterName(remoteClusterName);
618            localConnection.asyncSendPacket(info);
619            if (localPrefetchPolicy != null) {
620                localConnection.setPrefetchPolicy(localPrefetchPolicy);
621            }
622        }
623        
624        /*private synchronized void releaseRemote() throws JMSException {
625            if (remoteConnection != null) {
626                    TransportChannel transportChannel = remoteConnection.getTransportChannel();
627                    transportChannel.stop();
628                if (connectionAdvisor != null) {
629                        connectionAdvisor.stop();
630                }
631                    try {
632                            remoteConnection.stop();
633                    } catch (JMSException e) {
634                            // ignore this exception, since the remote broker is most likely down 
635                    }
636                    remoteConnection = null;
637            }
638        }*/
639       
640    }