001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport; 020 import java.net.InetAddress; 021 import java.net.URI; 022 import java.util.HashMap; 023 import java.util.Map; 024 025 import javax.jms.JMSException; 026 027 import org.activemq.ConfigurationException; 028 import org.activemq.broker.BrokerContainer; 029 import org.activemq.util.MapHelper; 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 033 /** 034 * A {@link NetworkConnector}which uses discovery to find remote brokers to connect to 035 * 036 * @version $Revision: 1.1.1.1 $ 037 */ 038 public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener { 039 private static final Log log = LogFactory.getLog(DiscoveryNetworkConnector.class); 040 private Map channelMap = new HashMap(); 041 042 public DiscoveryNetworkConnector(BrokerContainer brokerContainer) { 043 super(brokerContainer); 044 } 045 046 public void start() throws JMSException { 047 DiscoveryAgent discoveryAgent = getBrokerContainer().getDiscoveryAgent(); 048 if (discoveryAgent == null) { 049 throw new ConfigurationException("Must be configured with a discoveryAgent property"); 050 } 051 discoveryAgent.addDiscoveryListener(this); 052 super.start(); 053 } 054 055 public void addService(final DiscoveryEvent event) { 056 try { 057 Map details = event.getServiceDetails(); 058 if (!getLocalBrokerName().equals(details.get("brokerName"))) { 059 String url = MapHelper.getString(details, "connectURL"); 060 if (url != null) { 061 addChannel(url, details); 062 } 063 } 064 } 065 catch (Exception e) { 066 log.warn("Add service failed", e); 067 } 068 } 069 070 public void removeService(final DiscoveryEvent event) { 071 try { 072 Map details = event.getServiceDetails(); 073 if (!getLocalBrokerName().equals(details.get("brokerName"))) { 074 String url = MapHelper.getString(details, "connectURL"); 075 if (url != null) { 076 removeChannel(url, details); 077 } 078 } 079 } 080 catch (Exception e) { 081 log.warn("remove service failed", e); 082 } 083 } 084 085 // Implementation methods 086 //------------------------------------------------------------------------- 087 protected synchronized void addChannel(String url, Map details) { 088 NetworkChannel channel = (NetworkChannel) channelMap.get(url); 089 if (channel == null) { 090 try { 091 final String RELIABLE = "reliable:"; 092 boolean isReliable = false; 093 String urlStr = url.toLowerCase().trim(); 094 String realURL = url; 095 if (urlStr.startsWith(RELIABLE)){ 096 isReliable = true; 097 urlStr = urlStr.substring(RELIABLE.length()); 098 } 099 //String realURL = url; 100 URI temp = new URI(urlStr); 101 String hostName = temp.getHost(); 102 if (hostName != null && InetAddress.getByName(hostName).equals(InetAddress.getLocalHost())){ 103 temp = new URI(temp.getScheme(), temp.getUserInfo(), "localhost", temp.getPort(), 104 temp.getPath(), temp.getQuery(), temp.getFragment()); 105 realURL = isReliable? RELIABLE+temp.toString() : temp.toString(); 106 } 107 channel = createNetworkChannel(realURL); 108 channel.setUri(realURL); 109 110 log.info(getLocalBrokerName() + ": Adding new NeworkChannel on: " + url + "{resolved=" + realURL +"} with details: " + details); 111 channel.start(); 112 channelMap.put(url, channel); 113 }catch(Exception e){ 114 log.error("Failed to add NetworkChannel",e); 115 } 116 } 117 } 118 119 protected synchronized void removeChannel(String url, Map details) { 120 NetworkChannel channel = (NetworkChannel) channelMap.remove(url); 121 if (channel != null) { 122 log.info(getLocalBrokerName() + ": Removing NeworkChannel: " + channel); 123 try { 124 channel.stop(); 125 } 126 catch (JMSException e) { 127 log.info("Failed to stop channel: " + channel + ". Reason: " + e, e); 128 } 129 } 130 } 131 132 protected String getLocalBrokerName() { 133 return getBrokerContainer().getBroker().getBrokerName(); 134 } 135 }