001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.transport.composite; 019 020 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 021 import org.apache.commons.logging.Log; 022 import org.apache.commons.logging.LogFactory; 023 import org.activemq.TimeoutExpiredException; 024 import org.activemq.io.WireFormat; 025 import org.activemq.message.Packet; 026 import org.activemq.message.PacketListener; 027 import org.activemq.message.Receipt; 028 import org.activemq.message.ReceiptHolder; 029 import org.activemq.transport.TransportChannel; 030 import org.activemq.transport.TransportChannelProvider; 031 import org.activemq.transport.TransportChannelSupport; 032 import org.activemq.transport.TransportStatusEvent; 033 import org.activemq.transport.TransportStatusEventListener; 034 035 import javax.jms.ExceptionListener; 036 import javax.jms.JMSException; 037 import java.net.URI; 038 import java.util.ArrayList; 039 import java.util.Collections; 040 import java.util.List; 041 042 /** 043 * A Compsite implementation of a TransportChannel 044 * 045 * @version $Revision: 1.2 $ 046 */ 047 public class CompositeTransportChannel extends TransportChannelSupport implements TransportStatusEventListener { 048 private static final Log log = LogFactory.getLog(CompositeTransportChannel.class); 049 050 protected List uris; 051 protected URI currentURI; 052 protected TransportChannel channel; 053 protected SynchronizedBoolean closed; 054 protected SynchronizedBoolean started; 055 protected int maximumRetries = 10; 056 protected long failureSleepTime = 500L; 057 protected long establishConnectionTimeout = 30000L; 058 protected long maximumTimeout = 30000L; 059 protected boolean incrementTimeout = true; 060 061 062 public CompositeTransportChannel(WireFormat wireFormat) { 063 super(wireFormat); 064 this.uris = Collections.synchronizedList(new ArrayList()); 065 closed = new SynchronizedBoolean(false); 066 started = new SynchronizedBoolean(false); 067 } 068 069 public CompositeTransportChannel(WireFormat wireFormat, List uris) { 070 this(wireFormat); 071 this.uris.addAll(uris); 072 } 073 074 public String toString() { 075 return "CompositeTransportChannel: " + channel; 076 } 077 078 public void start() throws JMSException { 079 if (started.commit(false, true)) { 080 // // Since we could take a LONG time to connect to one of the servers in the connection list, 081 // // do the connect async so that the client has a chance to stop() the channel if he needs to shutdown 082 // // before a connection is established. 083 // new Thread() { 084 // public void run() { 085 // try { 086 establishConnection(establishConnectionTimeout); 087 // } catch (JMSException e) { 088 // if(getExceptionListener()!=null) { 089 // getExceptionListener().onException(e); 090 // } 091 // } 092 fireStatusEvent(new TransportStatusEvent(CompositeTransportChannel.this,TransportStatusEvent.CONNECTED)); 093 // } 094 // }.start(); 095 } 096 } 097 098 /** 099 * close the channel 100 */ 101 public void stop() { 102 if (closed.commit(false, true)) { 103 if (channel != null) { 104 try { 105 channel.stop(); 106 } 107 catch (Exception e) { 108 log.warn("Caught while closing: " + e + ". Now Closed", e); 109 } 110 finally { 111 channel = null; 112 super.stop(); 113 } 114 } 115 } 116 } 117 118 /** 119 * Forces disconnect by delegating to the child channel 120 */ 121 public void forceDisconnect() { 122 if (channel != null) channel.forceDisconnect(); 123 } 124 125 public Receipt send(Packet packet) throws JMSException { 126 return getChannel().send(packet); 127 } 128 129 130 public Receipt send(Packet packet, int timeout) throws JMSException { 131 return getChannel().send(packet, timeout); 132 } 133 134 135 public void asyncSend(Packet packet) throws JMSException { 136 getChannel().asyncSend(packet); 137 } 138 139 public ReceiptHolder asyncSendWithReceipt(Packet packet) throws JMSException { 140 return getChannel().asyncSendWithReceipt(packet); 141 } 142 143 public void setPacketListener(PacketListener listener) { 144 super.setPacketListener(listener); 145 if (channel != null) { 146 channel.setPacketListener(listener); 147 } 148 } 149 150 public void setExceptionListener(ExceptionListener listener) { 151 super.setExceptionListener(listener); 152 if (channel != null) { 153 channel.setExceptionListener(listener); 154 } 155 } 156 157 158 public boolean isMulticast() { 159 return false; 160 } 161 162 // Properties 163 //------------------------------------------------------------------------- 164 165 166 /** 167 * Return the maximum amount of time spent trying to establish a connection 168 * or a negative number to keep going forever 169 * 170 * @return 171 */ 172 public long getEstablishConnectionTimeout() { 173 return establishConnectionTimeout; 174 } 175 176 public void setEstablishConnectionTimeout(long establishConnectionTimeout) { 177 this.establishConnectionTimeout = establishConnectionTimeout; 178 } 179 180 public int getMaximumRetries() { 181 return maximumRetries; 182 } 183 184 public void setMaximumRetries(int maximumRetries) { 185 this.maximumRetries = maximumRetries; 186 } 187 188 public long getFailureSleepTime() { 189 return failureSleepTime; 190 } 191 192 public void setFailureSleepTime(long failureSleepTime) { 193 this.failureSleepTime = failureSleepTime; 194 } 195 196 public List getUris() { 197 return uris; 198 } 199 200 public void setUris(List list) { 201 synchronized (uris) { 202 uris.clear(); 203 uris.addAll(list); 204 } 205 } 206 207 208 /** 209 * @return Returns the incrementTimeout. 210 */ 211 public boolean isIncrementTimeout() { 212 return incrementTimeout; 213 } 214 /** 215 * @param incrementTimeout The incrementTimeout to set. 216 */ 217 public void setIncrementTimeout(boolean incrementTimeout) { 218 this.incrementTimeout = incrementTimeout; 219 } 220 /** 221 * @return Returns the maximumTimeout. 222 */ 223 public long getMaximumTimeout() { 224 return maximumTimeout; 225 } 226 /** 227 * @param maximumTimeout The maximumTimeout to set. 228 */ 229 public void setMaximumTimeout(long maximumTimeout) { 230 this.maximumTimeout = maximumTimeout; 231 } 232 233 /** 234 * @param clientID set the clientID 235 */ 236 public void setClientID(String clientID) { 237 super.setClientID(clientID); 238 239 /* If there is an existing channel, reflect the new clientID to the channel */ 240 if (channel != null) { 241 channel.setClientID(clientID); 242 } 243 } 244 245 /** 246 * Can this wireformat process packets of this version 247 * @param version the version number to test 248 * @return true if can accept the version 249 */ 250 public boolean canProcessWireFormatVersion(int version){ 251 return channel != null ? channel.canProcessWireFormatVersion(version) : true; 252 } 253 254 /** 255 * @return the current version of this wire format 256 */ 257 public int getCurrentWireFormatVersion(){ 258 return channel != null ? channel.getCurrentWireFormatVersion() : 1; 259 } 260 261 /** 262 * Access to the current channel if one is active 263 * @throws JMSException if no channel is available 264 */ 265 public TransportChannel getChannel() throws JMSException { 266 if (channel == null) { 267 throw new JMSException("No TransportChannel connection available"); 268 } 269 return channel; 270 } 271 272 // Implementation methods 273 //------------------------------------------------------------------------- 274 275 protected void establishConnection(long timeout) throws JMSException { 276 // lets try connect 277 boolean connected = false; 278 long time = failureSleepTime; 279 long startTime = System.currentTimeMillis(); 280 for (int i = 0;!connected && (i < maximumRetries || maximumRetries <= 0) && !closed.get() && !isPendingStop();i++) { 281 List list = new ArrayList(getUris()); 282 if (i > 0) { 283 if (maximumRetries > 0 || timeout > 0) { 284 long current = System.currentTimeMillis(); 285 if (timeout >= 0) { 286 if (current + time > startTime + timeout) { 287 time = startTime + timeout - current; 288 } 289 } 290 if (current > startTime + timeout || time <= 0) { 291 throw new TimeoutExpiredException("Could not connect to any of the URIs: " + list); 292 } 293 } 294 log.info("Could not connect; sleeping for: " + time + " millis and trying again"); 295 try { 296 Thread.sleep(time); 297 } 298 catch (InterruptedException e) { 299 log.warn("Sleep interupted: " + e, e); 300 } 301 if (incrementTimeout && time < maximumTimeout) { 302 time *= 2; 303 time = time > maximumTimeout ? maximumTimeout : time; 304 } 305 } 306 while (!connected && !list.isEmpty() && !closed.get() && !isPendingStop()) { 307 URI uri = extractURI(list); 308 try { 309 attemptToConnect(uri); 310 configureChannel(); 311 connected = true; 312 currentURI = uri; 313 } 314 catch (JMSException e) { 315 log.info("Could not connect to: " + uri + ". Reason: " + e); 316 } 317 } 318 } 319 if (!connected && !closed.get()) { 320 StringBuffer buffer = new StringBuffer(""); 321 Object[] uriArray = getUris().toArray(); 322 for (int i = 0;i < uriArray.length;i++) { 323 buffer.append(uriArray[i]); 324 if (i < (uriArray.length - 1)) { 325 buffer.append(","); 326 } 327 } 328 JMSException jmsEx = new JMSException("Failed to connect to resource(s): " + buffer.toString()); 329 throw jmsEx; 330 } 331 } 332 333 334 protected void configureChannel() { 335 ExceptionListener exceptionListener = getExceptionListener(); 336 if (exceptionListener != null) { 337 channel.setExceptionListener(exceptionListener); 338 } 339 PacketListener packetListener = getPacketListener(); 340 if (packetListener != null) { 341 channel.setPacketListener(packetListener); 342 } 343 344 channel.addTransportStatusEventListener(this); 345 channel.setCachingEnabled(isCachingEnabled()); 346 channel.setNoDelay(isNoDelay()); 347 channel.setUsedInternally(isUsedInternally()); 348 } 349 350 protected URI extractURI(List list) throws JMSException { 351 int idx = 0; 352 if (list.size() > 1) { 353 do { 354 idx = (int) (Math.random() * list.size()); 355 } 356 while (idx < 0 || idx >= list.size()); 357 } 358 return (URI) list.remove(idx); 359 } 360 361 protected void attemptToConnect(URI uri) throws JMSException { 362 WireFormat wf = currentWireFormat.copy(); 363 channel = TransportChannelProvider.create(wf, uri); 364 if (started.get()) { 365 channel.start(); 366 } 367 } 368 369 public void statusChanged(TransportStatusEvent event) { 370 // Delegate to own listeners 371 //set the transport to 'this' 372 event.setTransportChannel(this); 373 fireStatusEvent(event); 374 } 375 376 public boolean isTransportConnected() { 377 return channel == null ? false : channel.isTransportConnected(); 378 } 379 380 public long getLastReceiptTimestamp() { 381 return channel == null ? System.currentTimeMillis() : channel.getLastReceiptTimestamp(); 382 } 383 }