001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.transport; 019 020 import java.util.ArrayList; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.Map; 025 026 import javax.jms.JMSException; 027 028 import org.activemq.ActiveMQPrefetchPolicy; 029 import org.activemq.broker.BrokerContainer; 030 import org.activemq.service.Service; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 034 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 035 036 /** 037 * Represents a connector to one or more remote brokers. 038 * This class manages a number of {@link NetworkChannel} instances 039 * which may or may not be connected to a 040 * remote broker at any point in time. 041 * <p/> 042 * The implementation of this class could use a fixed number of 043 * configured {@link NetworkChannel} instances or could use 044 * discovery to find them. 045 * 046 * @version $Revision: 1.1.1.1 $ 047 */ 048 public class NetworkConnector implements Service { 049 private static final Log log = LogFactory.getLog(NetworkConnector.class); 050 051 private BrokerContainer brokerContainer; 052 private List networkChannels = new ArrayList(); 053 private Map localDetails = new HashMap(); 054 private String remoteUserName; 055 private String remotePassword; 056 protected PooledExecutor threadPool; 057 private ActiveMQPrefetchPolicy localPrefetchPolicy = new ActiveMQPrefetchPolicy(); 058 private ActiveMQPrefetchPolicy remotePrefetchPolicy = new ActiveMQPrefetchPolicy(); 059 060 061 public NetworkConnector(BrokerContainer brokerContainer) { 062 this.brokerContainer = brokerContainer; 063 this.threadPool = new PooledExecutor(); 064 } 065 066 public void start() throws JMSException { 067 for (Iterator iter = networkChannels.iterator(); iter.hasNext();) { 068 NetworkChannel networkChannel = (NetworkChannel) iter.next(); 069 networkChannel.setBrokerContainer(getBrokerContainer()); 070 networkChannel.setThreadPool(threadPool); 071 networkChannel.start(); 072 } 073 } 074 075 public void stop() throws JMSException { 076 for (Iterator iter = networkChannels.iterator(); iter.hasNext();) { 077 NetworkChannel networkChannel = (NetworkChannel) iter.next(); 078 try { 079 networkChannel.stop(); 080 } 081 catch (JMSException e) { 082 log.warn("Failed to stop network channel: " + e, e); 083 } 084 } 085 } 086 087 public void setTransportChannelListener(TransportChannelListener listener) { 088 } 089 090 091 092 093 // Properties 094 //------------------------------------------------------------------------- 095 096 public BrokerContainer getBrokerContainer() { 097 return brokerContainer; 098 } 099 100 101 /** 102 * @return Returns the threadPool. 103 */ 104 public PooledExecutor getThreadPool() { 105 return threadPool; 106 } 107 108 109 public List getNetworkChannels() { 110 return networkChannels; 111 } 112 113 public Map getLocalDetails() { 114 return localDetails; 115 } 116 117 public void setLocalDetails(Map localDetails) { 118 this.localDetails = localDetails; 119 } 120 121 public String getRemotePassword() { 122 return remotePassword; 123 } 124 125 public void setRemotePassword(String remotePassword) { 126 this.remotePassword = remotePassword; 127 } 128 129 public String getRemoteUserName() { 130 return remoteUserName; 131 } 132 133 public void setRemoteUserName(String remoteUserName) { 134 this.remoteUserName = remoteUserName; 135 } 136 137 138 /** 139 * Sets a list of {@link NetworkChannel} instances 140 * 141 * @param networkChannels 142 */ 143 public void setNetworkChannels(List networkChannels) { 144 this.networkChannels = networkChannels; 145 } 146 147 /** 148 * Adds a new network channel for the given URI 149 * 150 * @param uri 151 * @return 152 */ 153 public NetworkChannel addNetworkChannel(String uri) throws JMSException { 154 NetworkChannel networkChannel = createNetworkChannel(uri); 155 addNetworkChannel(networkChannel); 156 return networkChannel; 157 } 158 159 160 /** 161 * Adds a new network channel 162 */ 163 public void addNetworkChannel(NetworkChannel networkChannel) throws JMSException { 164 configure(networkChannel); 165 networkChannels.add(networkChannel); 166 } 167 168 /** 169 * Removes a network channel 170 */ 171 public void removeNetworkChannel(NetworkChannel networkChannel) { 172 networkChannels.remove(networkChannel); 173 } 174 175 public ActiveMQPrefetchPolicy getLocalPrefetchPolicy() { 176 return localPrefetchPolicy; 177 } 178 179 public void setLocalPrefetchPolicy(ActiveMQPrefetchPolicy localPrefetchPolicy) { 180 this.localPrefetchPolicy = localPrefetchPolicy; 181 } 182 183 public ActiveMQPrefetchPolicy getRemotePrefetchPolicy() { 184 return remotePrefetchPolicy; 185 } 186 187 public void setRemotePrefetchPolicy(ActiveMQPrefetchPolicy remotePrefetchPolicy) { 188 this.remotePrefetchPolicy = remotePrefetchPolicy; 189 } 190 191 192 // Expose individual prefetch properties as individual properties 193 //------------------------------------------------------------------------- 194 public int getLocalDurableTopicPrefetch() { 195 return localPrefetchPolicy.getDurableTopicPrefetch(); 196 } 197 198 public void setLocalDurableTopicPrefetch(int durableTopicPrefetch) { 199 localPrefetchPolicy.setDurableTopicPrefetch(durableTopicPrefetch); 200 } 201 202 public int getLocalQueuePrefetch() { 203 return localPrefetchPolicy.getQueuePrefetch(); 204 } 205 206 public void setLocalQueuePrefetch(int queuePrefetch) { 207 localPrefetchPolicy.setQueuePrefetch(queuePrefetch); 208 } 209 210 public int getLocalQueueBrowserPrefetch() { 211 return localPrefetchPolicy.getQueueBrowserPrefetch(); 212 } 213 214 public void setLocalQueueBrowserPrefetch(int queueBrowserPrefetch) { 215 localPrefetchPolicy.setQueueBrowserPrefetch(queueBrowserPrefetch); 216 } 217 218 public int getLocalTopicPrefetch() { 219 return localPrefetchPolicy.getTopicPrefetch(); 220 } 221 222 public void setLocalTopicPrefetch(int topicPrefetch) { 223 localPrefetchPolicy.setTopicPrefetch(topicPrefetch); 224 } 225 226 227 public int getRemoteDurableTopicPrefetch() { 228 return remotePrefetchPolicy.getDurableTopicPrefetch(); 229 } 230 231 public void setRemoteDurableTopicPrefetch(int durableTopicPrefetch) { 232 remotePrefetchPolicy.setDurableTopicPrefetch(durableTopicPrefetch); 233 } 234 235 public int getRemoteQueuePrefetch() { 236 return remotePrefetchPolicy.getQueuePrefetch(); 237 } 238 239 public void setRemoteQueuePrefetch(int queuePrefetch) { 240 remotePrefetchPolicy.setQueuePrefetch(queuePrefetch); 241 } 242 243 public int getRemoteQueueBrowserPrefetch() { 244 return remotePrefetchPolicy.getQueueBrowserPrefetch(); 245 } 246 247 public void setRemoteQueueBrowserPrefetch(int queueBrowserPrefetch) { 248 remotePrefetchPolicy.setQueueBrowserPrefetch(queueBrowserPrefetch); 249 } 250 251 public int getRemoteTopicPrefetch() { 252 return remotePrefetchPolicy.getTopicPrefetch(); 253 } 254 255 public void setRemoteTopicPrefetch(int topicPrefetch) { 256 remotePrefetchPolicy.setTopicPrefetch(topicPrefetch); 257 } 258 259 260 // Implementation methods 261 //------------------------------------------------------------------------- 262 263 264 /** 265 * Create a channel from the url 266 * @param url 267 * @return 268 */ 269 protected NetworkChannel createNetworkChannel(String url) { 270 NetworkChannel answer = new NetworkChannel(this,getBrokerContainer(), url); 271 answer.setRemoteUserName(getRemoteUserName()); 272 answer.setRemotePassword(getRemotePassword()); 273 return answer; 274 } 275 276 /** 277 * Performs any network connector based configuration; such as setting the dispatch policies 278 * 279 * @param networkChannel 280 */ 281 protected void configure(NetworkChannel networkChannel) throws JMSException { 282 networkChannel.setLocalPrefetchPolicy(localPrefetchPolicy); 283 networkChannel.setRemotePrefetchPolicy(remotePrefetchPolicy); 284 } 285 286 }