001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.peer;
020    import javax.jms.IllegalStateException;
021    import javax.jms.JMSException;
022    import org.apache.commons.logging.Log;
023    import org.apache.commons.logging.LogFactory;
024    import org.activemq.broker.BrokerConnector;
025    import org.activemq.broker.BrokerContainer;
026    import org.activemq.broker.impl.BrokerConnectorImpl;
027    import org.activemq.broker.impl.BrokerContainerImpl;
028    import org.activemq.io.WireFormat;
029    import org.activemq.store.vm.VMPersistenceAdapter;
030    import org.activemq.transport.DiscoveryNetworkConnector;
031    import org.activemq.transport.NetworkConnector;
032    import org.activemq.transport.TransportChannel;
033    import org.activemq.transport.multicast.MulticastDiscoveryAgent;
034    import org.activemq.transport.vm.VmTransportChannel;
035    import org.activemq.util.IdGenerator;
036    import org.activemq.util.URIHelper;
037    
038    /**
039     * A <CODE>PeerTransportChannel</CODE> creates an embedded broker and networks peers together to form a P-2-P network.
040     * <P>
041     * By default, <CODE>PeerTransportChannel</CODE> uses discovery to locate other peers, and uses a well known service
042     * name on the discovery
043     * <P>
044     * An example of the expected format is: <CODE>peer://development.net</CODE> where development.net is the service name
045     * used in discovery
046     * <P>
047     * 
048     * @version $Revision: 1.1.1.1 $
049     */
050    public class PeerTransportChannel extends VmTransportChannel {
051        private static final Log log = LogFactory.getLog(PeerTransportChannel.class);
052        protected static final String DEFAULT_BROKER_CONNECTOR_URI = "tcp://localhost:0";
053        protected WireFormat wireFormat;
054        protected TransportChannel channel;
055        protected String discoveryURI;
056        protected String remoteUserName;
057        protected String remotePassword;
058        protected String brokerName;
059        protected boolean doDiscovery;
060        protected String peerURIs;
061        protected String brokerConnectorURI;
062        protected String serviceName;
063        protected BrokerConnector brokerConnector;
064        protected boolean remote;
065        protected boolean persistent=false;
066    
067       
068        /**
069         * Construct a PeerTransportChannel
070         * 
071         * @param wireFormat
072         * @param serviceName
073         * @throws JMSException
074         */
075        protected PeerTransportChannel(WireFormat wireFormat, String serviceName) throws JMSException {
076            this.wireFormat = wireFormat;
077            this.serviceName = serviceName;
078            this.discoveryURI = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI;
079            IdGenerator idGen = new IdGenerator();
080            this.brokerName = idGen.generateId();
081            this.brokerConnectorURI = DEFAULT_BROKER_CONNECTOR_URI;
082            this.doDiscovery = true;
083            if (serviceName == null || serviceName.length() == 0) {
084                throw new IllegalStateException("No service name specified for peer:// protocol");
085            }
086        }
087    
088        /**
089         * @return true if the transport channel is active, this value will be false through reconnecting
090         */
091        public boolean isTransportConnected() {
092            return true;
093        }
094    
095        /**
096         * Some transports rely on an embedded broker (beer based protocols)
097         * 
098         * @return true if an embedded broker required
099         */
100        public boolean requiresEmbeddedBroker() {
101            return true;
102        }
103    
104        /**
105         * Some transports that rely on an embedded broker need to create the connector used by the broker
106         * 
107         * @return the BrokerConnector or null if not applicable
108         * @throws JMSException
109         */
110        public BrokerConnector getEmbeddedBrokerConnector() throws JMSException {
111            try {
112                if (brokerConnector == null) {
113                    BrokerContainer container = new BrokerContainerImpl(brokerName, serviceName);
114                    if( !persistent ) {
115                        container.setPersistenceAdapter(new VMPersistenceAdapter());
116                    }
117                    NetworkConnector networkConnector = null;
118                    if (doDiscovery) {
119                        networkConnector = new DiscoveryNetworkConnector(container);
120                        MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent(serviceName);
121                        container.setDiscoveryAgent(agent);
122                    }
123                    if (peerURIs != null && peerURIs.length() > 0) {
124                        URIHelper peers = new URIHelper(peerURIs);
125                        networkConnector = createNetworkConnector(container);
126                        while (peers.hasNext()) {
127                            String peerURL  = peers.getNext();
128                            networkConnector.addNetworkChannel(peerURL);
129                        }
130                    }
131                    container.addNetworkConnector(networkConnector);
132                    URIHelper helper = new URIHelper(brokerConnectorURI);
133                    brokerConnector = new BrokerConnectorImpl(container, helper.getNext(), wireFormat);
134                    while (helper.hasNext()) {
135                        new BrokerConnectorImpl(container, helper.getNext(), wireFormat);
136                    }
137                    container.start();
138                }
139                return brokerConnector;
140            }
141            catch (Exception e) {
142                e.printStackTrace();
143                String errorStr = "Failed to get embedded connector";
144                log.error(errorStr, e);
145                JMSException jmsEx = new JMSException(errorStr);
146                jmsEx.setLinkedException(e);
147                throw jmsEx;
148            }
149        }
150        
151        /**
152         * Create a NetworkConnector
153         * @param container
154         * @return the NetworkConnector
155         */
156        protected NetworkConnector createNetworkConnector(BrokerContainer container){
157            return new NetworkConnector(container);
158        }
159    
160        /**
161         * @return Returns the brokerDiscoveryURI.
162         */
163        public String getDiscoveryURI() {
164            return discoveryURI;
165        }
166    
167        /**
168         * @param discoveryURI The brokerDiscoveryURI to set.
169         */
170        public void setDiscoveryURI(String discoveryURI) {
171            this.discoveryURI = discoveryURI;
172        }
173    
174        /**
175         * @return Returns the brokerName.
176         */
177        public String getBrokerName() {
178            return brokerName;
179        }
180    
181        /**
182         * @param brokerName The brokerName to set.
183         */
184        public void setBrokerName(String brokerName) {
185            this.brokerName = brokerName;
186        }
187    
188        /**
189         * @return Returns the doDiscovery.
190         */
191        public boolean isDoDiscovery() {
192            return doDiscovery;
193        }
194    
195        /**
196         * @param doDiscovery The doDiscovery to set.
197         */
198        public void setDoDiscovery(boolean doDiscovery) {
199            this.doDiscovery = doDiscovery;
200        }
201    
202        /**
203         * @return Returns the wireFormat.
204         */
205        public WireFormat getWireFormat() {
206            return wireFormat;
207        }
208    
209        /**
210         * @param wireFormat The wireFormat to set.
211         */
212        public void setWireFormat(WireFormat wireFormat) {
213            this.wireFormat = wireFormat;
214        }
215    
216        /**
217         * @return Returns the remotePassword.
218         */
219        public String getRemotePassword() {
220            return remotePassword;
221        }
222    
223        /**
224         * @param remotePassword The remotePassword to set.
225         */
226        public void setRemotePassword(String remotePassword) {
227            this.remotePassword = remotePassword;
228        }
229    
230        /**
231         * @return Returns the remoteUserName.
232         */
233        public String getRemoteUserName() {
234            return remoteUserName;
235        }
236    
237        /**
238         * @param remoteUserName The remoteUserName to set.
239         */
240        public void setRemoteUserName(String remoteUserName) {
241            this.remoteUserName = remoteUserName;
242        }
243    
244        /**
245         * @return Returns the brokerConnectorURI.
246         */
247        public String getBrokerConnectorURI() {
248            return brokerConnectorURI;
249        }
250    
251        /**
252         * @param brokerConnectorURI The brokerConnectorURI to set.
253         */
254        public void setBrokerConnectorURI(String brokerConnectorURI) {
255            this.brokerConnectorURI = brokerConnectorURI;
256        }
257    
258        /**
259         * @return Returns the peerURIs.
260         */
261        public String getPeerURIs() {
262            return peerURIs;
263        }
264    
265        /**
266         * @param peerURIs The peerURIs to set.
267         */
268        public void setPeerURIs(String peerURIs) {
269            this.peerURIs = peerURIs;
270        }
271    
272        /**
273         * @return Returns the serviceName.
274         */
275        public String getServiceName() {
276            return serviceName;
277        }
278    
279        /**
280         * @param serviceName The serviceName to set.
281         */
282        public void setServiceName(String serviceName) {
283            this.serviceName = serviceName;
284        }
285        
286        /**
287         * @return Returns the remote.
288         */
289        public boolean isRemote() {
290            return remote;
291        }
292        /**
293         * @param remote The remote to set.
294         */
295        public void setRemote(boolean remote) {
296            this.remote = remote;
297        }
298    
299        /**
300         * @return Returns the persistent.
301         */
302        public boolean isPersistent() {
303            return persistent;
304        }
305        /**
306         * @param persistent The persistent to set.
307         */
308        public void setPersistent(boolean persistent) {
309            this.persistent = persistent;
310        }
311    }