001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport;
020    import java.net.InetAddress;
021    import java.net.URI;
022    import java.util.HashMap;
023    import java.util.Map;
024    
025    import javax.jms.JMSException;
026    
027    import org.activemq.ConfigurationException;
028    import org.activemq.broker.BrokerContainer;
029    import org.activemq.util.MapHelper;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    
033    /**
034     * A {@link NetworkConnector}which uses discovery to find remote brokers to connect to
035     * 
036     * @version $Revision: 1.1.1.1 $
037     */
038    public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener {
039        private static final Log log = LogFactory.getLog(DiscoveryNetworkConnector.class);
040        private Map channelMap = new HashMap();
041    
042        public DiscoveryNetworkConnector(BrokerContainer brokerContainer) {
043            super(brokerContainer);
044        }
045    
046        public void start() throws JMSException {
047            DiscoveryAgent discoveryAgent = getBrokerContainer().getDiscoveryAgent();
048            if (discoveryAgent == null) {
049                throw new ConfigurationException("Must be configured with a discoveryAgent property");
050            }
051            discoveryAgent.addDiscoveryListener(this);
052            super.start();
053        }
054    
055        public void addService(final DiscoveryEvent event) {
056            try {
057                Map details = event.getServiceDetails();
058                if (!getLocalBrokerName().equals(details.get("brokerName"))) {
059                    String url = MapHelper.getString(details, "connectURL");
060                    if (url != null) {
061                        addChannel(url, details);
062                    }
063                }
064            }
065            catch (Exception e) {
066                log.warn("Add service failed", e);
067            }
068        }
069    
070        public void removeService(final DiscoveryEvent event) {
071            try {
072                Map details = event.getServiceDetails();
073                if (!getLocalBrokerName().equals(details.get("brokerName"))) {
074                    String url = MapHelper.getString(details, "connectURL");
075                    if (url != null) {
076                        removeChannel(url, details);
077                    }
078                }
079            }
080            catch (Exception e) {
081                log.warn("remove service failed", e);
082            }
083        }
084    
085        // Implementation methods
086        //-------------------------------------------------------------------------
087        protected synchronized void addChannel(String url, Map details) {
088            NetworkChannel channel = (NetworkChannel) channelMap.get(url);
089            if (channel == null) {
090                try {
091                    final String RELIABLE = "reliable:";
092                    boolean isReliable = false;
093                String urlStr = url.toLowerCase().trim();
094                String realURL = url;
095                if (urlStr.startsWith(RELIABLE)){
096                    isReliable = true;
097                    urlStr = urlStr.substring(RELIABLE.length());
098                }
099                //String realURL = url;
100                URI temp = new URI(urlStr);
101                String hostName = temp.getHost();
102                if (hostName != null && InetAddress.getByName(hostName).equals(InetAddress.getLocalHost())){
103                    temp = new URI(temp.getScheme(), temp.getUserInfo(), "localhost", temp.getPort(),
104                            temp.getPath(), temp.getQuery(), temp.getFragment());
105                    realURL = isReliable? RELIABLE+temp.toString() : temp.toString();
106                }
107                channel = createNetworkChannel(realURL);
108                channel.setUri(realURL);
109        
110                log.info(getLocalBrokerName() + ": Adding new NeworkChannel on: " + url + "{resolved=" + realURL +"} with details: " + details);
111                channel.start();
112                channelMap.put(url, channel);
113                }catch(Exception e){
114                    log.error("Failed to add NetworkChannel",e);
115                }
116            }
117        }
118    
119        protected synchronized void removeChannel(String url, Map details) {
120            NetworkChannel channel = (NetworkChannel) channelMap.remove(url);
121            if (channel != null) {
122                log.info(getLocalBrokerName() + ": Removing NeworkChannel: " + channel);
123                try {
124                    channel.stop();
125                }
126                catch (JMSException e) {
127                    log.info("Failed to stop channel: " + channel + ". Reason: " + e, e);
128                }
129            }
130        }
131    
132        protected String getLocalBrokerName() {
133            return getBrokerContainer().getBroker().getBrokerName();
134        }
135    }