001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.transport; 019 020 import org.apache.commons.logging.Log; 021 import org.apache.commons.logging.LogFactory; 022 import org.activemq.ConfigurationException; 023 import org.activemq.io.WireFormat; 024 import org.activemq.transport.reliable.ReliableTransportChannel; 025 import org.activemq.transport.composite.CompositeTransportChannelFactory; 026 import org.activemq.util.Callback; 027 import org.activemq.util.ExceptionTemplate; 028 import org.activemq.util.MapHelper; 029 030 import javax.jms.JMSException; 031 import java.net.URI; 032 import java.net.URISyntaxException; 033 import java.util.List; 034 import java.util.Map; 035 import java.util.ArrayList; 036 import java.util.Iterator; 037 038 /** 039 * A {@link ReliableTransportChannel} which uses a {@link DiscoveryAgent} to discover remote broker 040 * instances and dynamically connect to them. 041 * 042 * @version $Revision: 1.1.1.1 $ 043 */ 044 public class DiscoveryTransportChannel extends ReliableTransportChannel implements DiscoveryListener { 045 private static final Log log = LogFactory.getLog(DiscoveryTransportChannel.class); 046 047 048 private DiscoveryAgent discoveryAgent; 049 private String remoteUserName; 050 private String remotePassword; 051 052 053 public DiscoveryTransportChannel(WireFormat wireFormat, DiscoveryAgent discoveryAgent) { 054 super(wireFormat); 055 this.discoveryAgent = discoveryAgent; 056 } 057 058 public void start() throws JMSException { 059 if (discoveryAgent == null) { 060 throw new ConfigurationException("Must be configured with a discoveryAgent property"); 061 } 062 063 // lets pass into the agent the broker name and connection details 064 discoveryAgent.addDiscoveryListener(this); 065 discoveryAgent.start(); 066 067 super.start(); 068 } 069 070 public void stop() { 071 ExceptionTemplate template = new ExceptionTemplate(); 072 template.run(new Callback() { 073 public void execute() throws Throwable { 074 discoveryAgent.stop(); 075 } 076 }); 077 template.run(new Callback() { 078 public void execute() throws Throwable { 079 DiscoveryTransportChannel.super.stop(); 080 } 081 }); 082 Throwable e = template.getFirstException(); 083 log.warn("Failed to stop the transport channel cleanly due to: " + e, e); 084 } 085 086 087 public synchronized void addService(DiscoveryEvent event) { 088 Map details = event.getServiceDetails(); 089 String url = MapHelper.getString(details, "connectURL"); 090 if (url != null) { 091 try { 092 List uris = parseURIs(new URI(url)); 093 for (Iterator uter = uris.iterator(); uter.hasNext();) { 094 URI uri = (URI) uter.next(); 095 addURI(uri, details); 096 } 097 } 098 catch (URISyntaxException e) { 099 log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 100 } 101 } 102 } 103 104 public synchronized void removeService(DiscoveryEvent event) { 105 Map details = event.getServiceDetails(); 106 String url = MapHelper.getString(details, "connectURL"); 107 if (url != null) { 108 try { 109 List uris = parseURIs(new URI(url)); 110 for (Iterator uter = uris.iterator(); uter.hasNext();) { 111 URI uri = (URI) uter.next(); 112 removeURI(uri); 113 } 114 } 115 catch (URISyntaxException e) { 116 log.warn("Could not remove remote URI: " + url + " due to bad URI syntax: " + e, e); 117 } 118 } 119 } 120 121 protected void addURI(URI uri, Map details) { 122 List urlList = getUris(); 123 if (!urlList.contains(uri)) { 124 log.info("Adding new broker connection URL: " + uri + " with details: " + details); 125 126 urlList.add(uri); 127 } 128 } 129 130 protected void removeURI(URI uri) { 131 synchronized (this) { 132 List urlList = getUris(); 133 if (urlList.remove(uri)) { 134 log.info("Removing broker connection URL: " + uri); 135 } 136 } 137 } 138 139 protected List parseURIs(URI uri) { 140 List answer = new ArrayList(); 141 try { 142 CompositeTransportChannelFactory.parseURIs(answer, uri); 143 } 144 catch (URISyntaxException e) { 145 log.warn("Failed to parse URI: " + uri, e); 146 answer.add(uri); 147 } 148 return answer; 149 } 150 151 // Properties 152 //------------------------------------------------------------------------- 153 public DiscoveryAgent getDiscoveryAgent() { 154 return discoveryAgent; 155 } 156 157 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 158 this.discoveryAgent = discoveryAgent; 159 } 160 161 162 public String getRemotePassword() { 163 return remotePassword; 164 } 165 166 public void setRemotePassword(String remotePassword) { 167 this.remotePassword = remotePassword; 168 } 169 170 public String getRemoteUserName() { 171 return remoteUserName; 172 } 173 174 public void setRemoteUserName(String remoteUserName) { 175 this.remoteUserName = remoteUserName; 176 } 177 178 }