001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.remote; 020 021 import javax.jms.JMSException; 022 023 import org.activemq.broker.BrokerConnector; 024 import org.activemq.broker.BrokerContainer; 025 import org.activemq.broker.impl.BrokerConnectorImpl; 026 import org.activemq.broker.impl.BrokerContainerImpl; 027 import org.activemq.io.WireFormat; 028 import org.activemq.transport.NetworkChannel; 029 import org.activemq.transport.NetworkConnector; 030 import org.activemq.transport.RemoteNetworkConnector; 031 import org.activemq.transport.vm.VmTransportChannel; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 035 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 036 037 /** 038 * A <CODE>RemoteTransportChannel</CODE> creates an embedded broker that creates a remote connection to another 039 * broker. This connection type is designed for reliable connections, that can use the storage mechansims of an embedded 040 * broker to be decoupled from the remote broker - i.e. for connections that need to be reliable, don't block but maybe 041 * using a transport across an unreliable network connection 042 * <P> 043 * 044 * <P> 045 * An example of the expected format is: <CODE>remote://tcp://remotebroker:5060</CODE> 046 * <P> 047 * 048 * @version $Revision: 1.1.1.1 $ 049 */ 050 public class RemoteTransportChannel extends VmTransportChannel{ 051 052 private static final Log log = LogFactory.getLog(RemoteTransportChannel.class); 053 private WireFormat wireFormat; 054 private String remoteUserName; 055 private String remotePassword; 056 private String brokerName; 057 private String remoteLocation; 058 private BrokerContainer container; 059 private NetworkConnector networkConnector; 060 private SynchronizedBoolean brokerConnectorStarted = new SynchronizedBoolean(false); 061 062 /** 063 * Construct a RemoteTransportChannel 064 * 065 * @param wireFormat 066 * @param peerURIs 067 * @throws JMSException 068 */ 069 protected RemoteTransportChannel(WireFormat wireFormat,String remoteLocation) throws JMSException{ 070 this.wireFormat = wireFormat; 071 this.remoteLocation = remoteLocation; 072 } 073 074 /** 075 * @return true if the transport channel is active, this value will be false through reconnecting 076 */ 077 public boolean isTransportConnected(){ 078 return true; 079 } 080 081 /** 082 * Some transports rely on an embedded broker (beer based protocols) 083 * 084 * @return true if an embedded broker required 085 */ 086 public boolean requiresEmbeddedBroker(){ 087 return true; 088 } 089 090 /** 091 * Provides a way to specify the client ID that this channel is using 092 * 093 * @param clientID 094 */ 095 public void setClientID(String clientID){ 096 super.setClientID(clientID); 097 if(brokerConnectorStarted.commit(false,true)){ 098 if(brokerName==null){ 099 brokerName = clientID; 100 } 101 if(container!=null){ 102 container.getBroker().getBrokerInfo().setBrokerName(brokerName); 103 container.getBroker().getBrokerInfo().setClusterName(brokerName); 104 try{ 105 container.start(); 106 Thread.sleep(1000); 107 108 109 }catch(Exception e){ 110 log.error("Failed to start brokerConnector",e); 111 } 112 } 113 } 114 } 115 116 public void stop(){ 117 super.stop(); 118 if(container!=null){ 119 try{ 120 container.stop(); 121 }catch(JMSException e){ 122 log.warn("Caught exception stopping Broker",e); 123 } 124 } 125 } 126 127 /** 128 * Some transports that rely on an embedded broker need to create the connector used by the broker 129 * 130 * @return the BrokerConnector or null if not applicable 131 * @throws JMSException 132 */ 133 public BrokerConnector getEmbeddedBrokerConnector() throws JMSException{ 134 try{ 135 BrokerConnector brokerConnector = null; 136 if(container==null){ 137 138 container = new BrokerContainerImpl("NotSet","NotSet"); 139 140 networkConnector = networkConnector = new RemoteNetworkConnector(container); 141 NetworkChannel channel = networkConnector.addNetworkChannel(remoteLocation); 142 channel.setRemoteUserName(remoteUserName); 143 channel.setRemotePassword(remotePassword); 144 145 container.addNetworkConnector(networkConnector); 146 brokerConnector = new BrokerConnectorImpl(container); 147 148 } 149 return brokerConnector; 150 }catch(Exception e){ 151 e.printStackTrace(); 152 String errorStr = "Failed to get embedded connector"; 153 log.error(errorStr,e); 154 JMSException jmsEx = new JMSException(errorStr); 155 jmsEx.setLinkedException(e); 156 throw jmsEx; 157 } 158 } 159 160 /** 161 * @return Returns the remoteLocation. 162 */ 163 public String getRemoteLocation(){ 164 return remoteLocation; 165 } 166 167 /** 168 * @param remoteLocation 169 * The remoteLocation to set. 170 */ 171 public void setRemoteLocation(String remoteLocation){ 172 this.remoteLocation = remoteLocation; 173 } 174 175 /** 176 * @return Returns the remotePassword. 177 */ 178 public String getRemotePassword(){ 179 return remotePassword; 180 } 181 182 /** 183 * @param remotePassword 184 * The remotePassword to set. 185 */ 186 public void setRemotePassword(String remotePassword){ 187 this.remotePassword = remotePassword; 188 } 189 190 /** 191 * @return Returns the remoteUserName. 192 */ 193 public String getRemoteUserName(){ 194 return remoteUserName; 195 } 196 197 /** 198 * @param remoteUserName 199 * The remoteUserName to set. 200 */ 201 public void setRemoteUserName(String remoteUserName){ 202 this.remoteUserName = remoteUserName; 203 } 204 205 /** 206 * @return Returns the wireFormat. 207 */ 208 public WireFormat getWireFormat(){ 209 return wireFormat; 210 } 211 212 /** 213 * @param wireFormat 214 * The wireFormat to set. 215 */ 216 public void setWireFormat(WireFormat wireFormat){ 217 this.wireFormat = wireFormat; 218 } 219 220 /** 221 * @return Returns the brokerName. 222 */ 223 public String getBrokerName(){ 224 return brokerName; 225 } 226 227 /** 228 * @param brokerName 229 * The brokerName to set. 230 */ 231 public void setBrokerName(String brokerName){ 232 this.brokerName = brokerName; 233 } 234 235 }