001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.remote;
020    
021    import javax.jms.JMSException;
022    
023    import org.activemq.broker.BrokerConnector;
024    import org.activemq.broker.BrokerContainer;
025    import org.activemq.broker.impl.BrokerConnectorImpl;
026    import org.activemq.broker.impl.BrokerContainerImpl;
027    import org.activemq.io.WireFormat;
028    import org.activemq.transport.NetworkChannel;
029    import org.activemq.transport.NetworkConnector;
030    import org.activemq.transport.RemoteNetworkConnector;
031    import org.activemq.transport.vm.VmTransportChannel;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
036    
037    /**
038     * A <CODE>RemoteTransportChannel</CODE> creates an embedded broker that creates a remote connection to another
039     * broker. This connection type is designed for reliable connections, that can use the storage mechansims of an embedded
040     * broker to be decoupled from the remote broker - i.e. for connections that need to be reliable, don't block but maybe
041     * using a transport across an unreliable network connection
042     * <P>
043     * 
044     * <P>
045     * An example of the expected format is: <CODE>remote://tcp://remotebroker:5060</CODE>
046     * <P>
047     * 
048     * @version $Revision: 1.1.1.1 $
049     */
050    public class RemoteTransportChannel extends VmTransportChannel{
051    
052        private static final Log log = LogFactory.getLog(RemoteTransportChannel.class);
053        private WireFormat wireFormat;
054        private String remoteUserName;
055        private String remotePassword;
056        private String brokerName;
057        private String remoteLocation;
058        private BrokerContainer container;
059        private NetworkConnector networkConnector;
060        private SynchronizedBoolean brokerConnectorStarted = new SynchronizedBoolean(false);
061    
062        /**
063         * Construct a RemoteTransportChannel
064         * 
065         * @param wireFormat
066         * @param peerURIs
067         * @throws JMSException
068         */
069        protected RemoteTransportChannel(WireFormat wireFormat,String remoteLocation) throws JMSException{
070            this.wireFormat = wireFormat;
071            this.remoteLocation = remoteLocation;
072        }
073    
074        /**
075         * @return true if the transport channel is active, this value will be false through reconnecting
076         */
077        public boolean isTransportConnected(){
078            return true;
079        }
080    
081        /**
082         * Some transports rely on an embedded broker (beer based protocols)
083         * 
084         * @return true if an embedded broker required
085         */
086        public boolean requiresEmbeddedBroker(){
087            return true;
088        }
089    
090        /**
091         * Provides a way to specify the client ID that this channel is using
092         * 
093         * @param clientID
094         */
095        public void setClientID(String clientID){
096            super.setClientID(clientID);
097            if(brokerConnectorStarted.commit(false,true)){
098                if(brokerName==null){
099                    brokerName = clientID;
100                }
101                if(container!=null){
102                    container.getBroker().getBrokerInfo().setBrokerName(brokerName);
103                    container.getBroker().getBrokerInfo().setClusterName(brokerName);
104                    try{
105                        container.start();
106                        Thread.sleep(1000);
107            
108                       
109                    }catch(Exception e){
110                        log.error("Failed to start brokerConnector",e);
111                    }
112                }
113            }
114        }
115    
116        public void stop(){
117            super.stop();
118            if(container!=null){
119                try{
120                    container.stop();
121                }catch(JMSException e){
122                    log.warn("Caught exception stopping Broker",e);
123                }
124            }
125        }
126    
127        /**
128         * Some transports that rely on an embedded broker need to create the connector used by the broker
129         * 
130         * @return the BrokerConnector or null if not applicable
131         * @throws JMSException
132         */
133        public BrokerConnector getEmbeddedBrokerConnector() throws JMSException{
134            try{
135                BrokerConnector brokerConnector = null;
136                if(container==null){
137    
138                    container = new BrokerContainerImpl("NotSet","NotSet");
139    
140                    networkConnector = networkConnector = new RemoteNetworkConnector(container);
141                    NetworkChannel channel = networkConnector.addNetworkChannel(remoteLocation);
142                    channel.setRemoteUserName(remoteUserName);
143                    channel.setRemotePassword(remotePassword);
144    
145                    container.addNetworkConnector(networkConnector);
146                    brokerConnector = new BrokerConnectorImpl(container);
147                   
148                }
149                return brokerConnector;
150            }catch(Exception e){
151                e.printStackTrace();
152                String errorStr = "Failed to get embedded connector";
153                log.error(errorStr,e);
154                JMSException jmsEx = new JMSException(errorStr);
155                jmsEx.setLinkedException(e);
156                throw jmsEx;
157            }
158        }
159    
160        /**
161         * @return Returns the remoteLocation.
162         */
163        public String getRemoteLocation(){
164            return remoteLocation;
165        }
166    
167        /**
168         * @param remoteLocation
169         *            The remoteLocation to set.
170         */
171        public void setRemoteLocation(String remoteLocation){
172            this.remoteLocation = remoteLocation;
173        }
174    
175        /**
176         * @return Returns the remotePassword.
177         */
178        public String getRemotePassword(){
179            return remotePassword;
180        }
181    
182        /**
183         * @param remotePassword
184         *            The remotePassword to set.
185         */
186        public void setRemotePassword(String remotePassword){
187            this.remotePassword = remotePassword;
188        }
189    
190        /**
191         * @return Returns the remoteUserName.
192         */
193        public String getRemoteUserName(){
194            return remoteUserName;
195        }
196    
197        /**
198         * @param remoteUserName
199         *            The remoteUserName to set.
200         */
201        public void setRemoteUserName(String remoteUserName){
202            this.remoteUserName = remoteUserName;
203        }
204    
205        /**
206         * @return Returns the wireFormat.
207         */
208        public WireFormat getWireFormat(){
209            return wireFormat;
210        }
211    
212        /**
213         * @param wireFormat
214         *            The wireFormat to set.
215         */
216        public void setWireFormat(WireFormat wireFormat){
217            this.wireFormat = wireFormat;
218        }
219    
220        /**
221         * @return Returns the brokerName.
222         */
223        public String getBrokerName(){
224            return brokerName;
225        }
226    
227        /**
228         * @param brokerName
229         *            The brokerName to set.
230         */
231        public void setBrokerName(String brokerName){
232            this.brokerName = brokerName;
233        }
234    
235    }