001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import javax.jms.JMSException; 023 import org.apache.commons.logging.Log; 024 import org.apache.commons.logging.LogFactory; 025 import org.activemq.broker.BrokerContainer; 026 import org.activemq.broker.impl.BrokerConnectorImpl; 027 import org.activemq.io.impl.DefaultWireFormat; 028 import org.activemq.message.BrokerInfo; 029 import org.activemq.transport.composite.CompositeTransportChannel; 030 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 031 032 /** 033 * Represents a Boondocks broker's connection with a single remote broker which bridges the two brokers to form a network. <p/> 034 * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are 035 * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local 036 * broker. 037 * 038 * @version $Revision: 1.1.1.1 $ 039 */ 040 public class RemoteNetworkChannel extends NetworkChannel implements TransportStatusEventListener { 041 private static final Log log = LogFactory.getLog(RemoteNetworkChannel.class); 042 private TransportChannel boondocksChannel; 043 044 /** 045 * Default Constructor 046 * 047 * @param tp 048 */ 049 public RemoteNetworkChannel(PooledExecutor tp) { 050 super(tp); 051 } 052 053 /** 054 * Constructor 055 * 056 * @param connector 057 * @param brokerContainer 058 * @param uri 059 */ 060 public RemoteNetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) { 061 super(connector,brokerContainer,uri); 062 } 063 064 065 /** 066 * @see org.activemq.transport.TransportStatusEventListener#statusChanged(org.activemq.transport.TransportStatusEvent) 067 */ 068 public void statusChanged(TransportStatusEvent event) { 069 if (event.getTransportChannel() == boondocksChannel) { 070 if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) { 071 try { 072 sendBrokerInfo(); 073 } 074 catch (JMSException e) { 075 log.error("Failed to send Broker Info", e); 076 } 077 } 078 } 079 else { 080 super.statusChanged(event); 081 } 082 } 083 084 085 /** 086 * remote:// can only make outgoing connections - we assume we can't 087 * accept incomming (duck!). So we initialize the transport channel 088 * from this side and create the broker client as well 089 * @throws JMSException 090 */ 091 092 protected void initialize() throws JMSException { 093 super.initialize(); 094 try { 095 boondocksChannel = TransportChannelProvider.create(new DefaultWireFormat(), new URI(uri)); 096 boondocksChannel.addTransportStatusEventListener(this); 097 if (boondocksChannel instanceof CompositeTransportChannel) { 098 CompositeTransportChannel composite = (CompositeTransportChannel)boondocksChannel; 099 composite.setMaximumRetries(maximumRetries); 100 composite.setFailureSleepTime(reconnectSleepTime); 101 composite.setIncrementTimeout(false); 102 } 103 boondocksChannel.start(); 104 //create our own broker connector ... 105 BrokerConnectorImpl connector = new BrokerConnectorImpl(getBrokerContainer(),"vm://uri",new DefaultWireFormat()); 106 connector.start(); 107 connector.addClient(boondocksChannel); 108 sendBrokerInfo(); 109 } 110 catch (URISyntaxException e) { 111 log.error("Could not parse uri: " + uri + " to make remote connector",e); 112 } 113 } 114 115 private void sendBrokerInfo() throws JMSException{ 116 //inform the other side we are a remote channel 117 if (boondocksChannel != null) { 118 BrokerInfo info = new BrokerInfo(); 119 info.setBrokerName(brokerContainer.getBroker().getBrokerName()); 120 info.setClusterName(brokerContainer.getBroker().getBrokerClusterName()); 121 info.setRemote(true); 122 boondocksChannel.asyncSend(info); 123 } 124 } 125 126 127 }