001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.multicast; 020 import java.io.IOException; 021 import java.io.Serializable; 022 import java.net.URI; 023 import java.net.URISyntaxException; 024 import java.util.Iterator; 025 import java.util.Map; 026 import javax.jms.JMSException; 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.activemq.io.impl.DefaultWireFormat; 030 import org.activemq.message.ActiveMQMessage; 031 import org.activemq.message.ActiveMQObjectMessage; 032 import org.activemq.message.Packet; 033 import org.activemq.message.PacketListener; 034 import org.activemq.transport.DiscoveryAgentSupport; 035 import org.activemq.transport.DiscoveryEvent; 036 import org.activemq.util.IdGenerator; 037 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 038 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 039 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 040 041 042 /** 043 * An agent used to discover other instances of a service 044 * 045 * @version $Revision: 1.1.1.1 $ 046 */ 047 public class MulticastDiscoveryAgent extends DiscoveryAgentSupport implements PacketListener, Runnable { 048 private static final Log log = LogFactory.getLog(MulticastDiscoveryAgent.class); 049 /** 050 * default URI used for discovery 051 */ 052 public static final String DEFAULT_DISCOVERY_URI = "multicast://224.1.2.3:6066"; 053 // private static final String KEEP_ALIVE_TYPE = "KEEP_ALIVE"; 054 private static final String SERVICE_TYPE = "SERVICE"; 055 private static final String ALIVE_TYPE = "ALIVE_TYPE"; 056 private static final String SERVICE_NAME = "SERVICE_NAME"; 057 private static final String CHANNEL_NAME = "CHANNEL_NAME"; 058 private static final long DEFAULT_KEEP_ALIVE_TIMEOUT = 5000; 059 private static final int DEFAULT_TIMEOUT_COUNT = 2; 060 private ConcurrentHashMap services; 061 private ConcurrentHashMap keepAliveMap; 062 private SynchronizedBoolean started; 063 private MulticastTransportChannel channel; 064 private Thread runner; 065 private IdGenerator idGen; 066 private String localId; 067 private URI uri; 068 private int timeoutCount; 069 private long keepAliveTimeout; 070 private long timeoutExpiration; 071 //private ActiveMQMessage keepAliveMessage; 072 private ActiveMQObjectMessage serviceMessage; 073 private String serviceName = ""; 074 private int timeToLive = 1; 075 private String channelName = "defaultChannel"; 076 077 /** 078 * Construct a discovery agent that uses multicast 079 * 080 * @param channelName 081 * @throws JMSException 082 */ 083 public MulticastDiscoveryAgent(String channelName) throws JMSException { 084 init(); 085 this.channelName = channelName; 086 try { 087 setUri(new URI(DEFAULT_DISCOVERY_URI)); 088 } 089 catch (URISyntaxException e) { 090 JMSException jmsEx = new JMSException("URI Syntax exception: " + e.getMessage()); 091 jmsEx.setLinkedException(e); 092 throw jmsEx; 093 } 094 } 095 096 public MulticastDiscoveryAgent(URI uri) { 097 init(); 098 this.uri = uri; 099 } 100 101 private void init() { 102 this.started = new SynchronizedBoolean(false); 103 this.services = new ConcurrentHashMap(); 104 this.keepAliveMap = new ConcurrentHashMap(); 105 this.idGen = new IdGenerator(); 106 this.localId = idGen.generateId(); 107 this.keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT; 108 this.timeoutCount = DEFAULT_TIMEOUT_COUNT; 109 this.timeoutExpiration = this.keepAliveTimeout * timeoutCount; 110 } 111 112 113 /** 114 * @return Returns the keepAliveTimeout. 115 */ 116 public long getKeepAliveTimeout() { 117 return keepAliveTimeout; 118 } 119 120 /** 121 * @param keepAliveTimeout The keepAliveTimeout to set. 122 */ 123 public void setKeepAliveTimeout(long keepAliveTimeout) { 124 this.keepAliveTimeout = keepAliveTimeout; 125 } 126 127 /** 128 * @return Returns the timeoutCount. 129 */ 130 public int getTimeoutCount() { 131 return timeoutCount; 132 } 133 134 /** 135 * @param timeoutCount The timeoutCount to set. 136 */ 137 public void setTimeoutCount(int timeoutCount) { 138 this.timeoutCount = timeoutCount; 139 } 140 141 /** 142 * @return Returns the localId. 143 */ 144 public String getLocalId() { 145 return localId; 146 } 147 148 /** 149 * @param localId The localId to set. 150 */ 151 public void setLocalId(String localId) { 152 this.localId = localId; 153 } 154 155 /** 156 * @return Returns the uri. 157 */ 158 public URI getUri() { 159 return uri; 160 } 161 162 /** 163 * @param uri The uri to set. 164 */ 165 public void setUri(URI uri) { 166 this.uri = uri; 167 } 168 169 /** 170 * @return the timeToLive of multicast packets used for discovery 171 */ 172 public int getTimeToLive() { 173 return this.timeToLive; 174 } 175 176 /** 177 * @param timeToLive The timeToLive for multicast packets used in discovery. 178 * @throws IOException 179 */ 180 public void setTimeToLive(int timeToLive) throws IOException { 181 this.timeToLive = timeToLive; 182 if (channel != null) { 183 channel.setTimeToLive(timeToLive); 184 } 185 } 186 187 /** 188 * @return Returns the channelName. 189 */ 190 public String getChannelName() { 191 return channelName; 192 } 193 194 /** 195 * @param channelName The channelName to set. 196 */ 197 public void setChannelName(String channelName) { 198 this.channelName = channelName; 199 } 200 201 /** 202 * @return a pretty print of this instance 203 */ 204 public String toString() { 205 return "MulticastDiscoveryAgent:" + serviceName; 206 } 207 208 /** 209 * @return the number of active services, including self 210 */ 211 public int getServicesCount() { 212 return (this.serviceMessage != null ? 1 : 0) + services.size(); 213 } 214 215 /** 216 * Register a service for other discover nodes 217 * 218 * @param name 219 * @param details 220 * @throws JMSException 221 */ 222 public void registerService(String name, Map details) throws JMSException { 223 if (this.serviceMessage != null){ 224 //notify the old service has stopped 225 this.serviceMessage.setBooleanProperty(ALIVE_TYPE, false); 226 sendService(); 227 } 228 this.serviceName = name; 229 this.serviceMessage = new ActiveMQObjectMessage(); 230 this.serviceMessage.setJMSType(SERVICE_TYPE); 231 this.serviceMessage.setStringProperty(SERVICE_NAME, name); 232 this.serviceMessage.setStringProperty(CHANNEL_NAME, channelName); 233 this.serviceMessage.setBooleanProperty(ALIVE_TYPE, true); 234 this.serviceMessage.setObject((Serializable) details); 235 sendService(); 236 } 237 238 /** 239 * start this discovery agent 240 * 241 * @throws JMSException 242 */ 243 public void start() throws JMSException { 244 if (started.commit(false, true)) { 245 this.timeoutExpiration = this.keepAliveTimeout * timeoutCount; 246 channel = new MulticastTransportChannel(new DefaultWireFormat(), uri); 247 248 channel.setClientID(localId); 249 channel.setPacketListener(this); 250 try { 251 channel.setTimeToLive(getTimeToLive()); 252 } 253 catch (IOException e) { 254 JMSException jmsEx = new JMSException("Set time to live failed"); 255 jmsEx.setLinkedException(e); 256 throw jmsEx; 257 } 258 log.info("Starting multicast discovery agent on URI: " + uri + " with clientID: " + channel.getClientID()); 259 260 channel.start(); 261 runner = new Thread(this); 262 runner.setName(toString()); 263 runner.setDaemon(true); 264 runner.setPriority(Thread.MAX_PRIORITY); 265 runner.start(); 266 sendService(); 267 fireServiceStarted(serviceMessage); 268 } 269 } 270 271 /** 272 * stop this discovery agent 273 * 274 * @throws JMSException 275 */ 276 public void stop() throws JMSException { 277 boolean doStop = false; 278 synchronized (started) { 279 doStop = started.get(); 280 if (doStop) { 281 if (this.serviceMessage != null){ 282 //notify the old service has stopped 283 this.serviceMessage.setBooleanProperty(ALIVE_TYPE, false); 284 sendService(); 285 } 286 channel.stop(); 287 started.set(false); 288 } 289 } 290 if (doStop) { 291 fireServiceStopped(serviceMessage); 292 } 293 } 294 295 /** 296 * send a keep alive message 297 */ 298 public void run() { 299 try { 300 int count = 0; 301 while (started.get()) { 302 sendService(); 303 log.debug(serviceName + " sent keep alive"); 304 if (++count >= timeoutCount) { 305 count = 0; 306 checkNodesAlive(); 307 } 308 Thread.sleep(getKeepAliveTimeout()); 309 } 310 } 311 catch (Throwable e) { 312 log.error(toString() + " run failed", e); 313 } 314 } 315 316 /** 317 * Consume multicast packets 318 * 319 * @param packet 320 */ 321 public void consume(Packet packet) { 322 try { 323 if (packet != null && packet.isJMSMessage()) { 324 ActiveMQMessage msg = (ActiveMQMessage) packet; 325 String receivedChannelName = msg.getStringProperty(CHANNEL_NAME); 326 if (receivedChannelName != null && receivedChannelName.equals(channelName)) { 327 String type = msg.getJMSType(); 328 if (type != null) { 329 if (type.equals(SERVICE_TYPE)) { 330 processService(msg); 331 } 332 else { 333 log.warn(toString() + " received Message of unknown type: " + type); 334 } 335 } 336 else { 337 log.error(toString() + " message type is null"); 338 } 339 } 340 else { 341 if (log.isDebugEnabled()) { 342 log.debug("Discarded discovery message for channel: " + receivedChannelName + " in channel: " + channelName); 343 } 344 } 345 } 346 else { 347 log.warn(toString() + " received unexpected packet: " + packet); 348 } 349 } 350 catch (Throwable e) { 351 log.error(toString() + " couldn't process packet: " + packet, e); 352 } 353 } 354 355 356 357 private void sendService() throws JMSException { 358 if (started.get() && channel != null && !channel.isPendingStop() && serviceMessage != null) { 359 channel.asyncSend(serviceMessage); 360 } 361 } 362 363 364 private void processService(ActiveMQMessage message) throws JMSException { 365 if (message != null) { 366 ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) message; 367 String name = objMsg.getStringProperty(SERVICE_NAME); 368 369 if (log.isDebugEnabled()) { 370 log.debug("Service message received for: " + name); 371 } 372 addService(name); 373 ActiveMQObjectMessage oldMsg = (ActiveMQObjectMessage) services.get(name); 374 services.put(name, objMsg); 375 if (oldMsg == null) { 376 fireServiceStarted(objMsg); 377 //send out that we are here! 378 sendService(); 379 } 380 if (message.getBooleanProperty(ALIVE_TYPE)) { 381 addService(name); 382 } 383 else { 384 removeService(name); 385 } 386 } 387 } 388 389 private void fireServiceStarted(ActiveMQObjectMessage message) throws JMSException { 390 if (message != null) { 391 String name = message.getStringProperty(SERVICE_NAME); 392 Map map = (Map) message.getObject(); 393 DiscoveryEvent event = new DiscoveryEvent(this, name, map); 394 fireAddService(event); 395 } 396 } 397 398 private void fireServiceStopped(ActiveMQObjectMessage message) throws JMSException { 399 if (message != null) { 400 String name = message.getStringProperty(SERVICE_NAME); 401 Map map = (Map) message.getObject(); 402 DiscoveryEvent event = new DiscoveryEvent(this, name, map); 403 fireRemoveService(event); 404 } 405 } 406 407 private void addService(String name) { 408 long timestamp = System.currentTimeMillis(); 409 SynchronizedLong activeTime = (SynchronizedLong) keepAliveMap.get(name); 410 if (activeTime == null) { 411 activeTime = new SynchronizedLong(0); 412 keepAliveMap.put(name, activeTime); 413 } 414 activeTime.set(timestamp); 415 } 416 417 private void removeService(String name) throws JMSException { 418 keepAliveMap.remove(name); 419 ActiveMQObjectMessage message = (ActiveMQObjectMessage) services.remove(name); 420 if (message != null) { 421 fireServiceStopped(message); 422 } 423 } 424 425 private void checkNodesAlive() throws JMSException { 426 long timestamp = System.currentTimeMillis(); 427 long timeout = timestamp - timeoutExpiration; 428 for (Iterator i = keepAliveMap.entrySet().iterator();i.hasNext();) { 429 Map.Entry entry = (Map.Entry) i.next(); 430 SynchronizedLong activeTime = (SynchronizedLong) entry.getValue(); 431 if (activeTime.get() < timeout) { 432 String name = entry.getKey().toString(); 433 removeService(name); 434 log.warn(serviceName + " Expiring node: " + name); 435 } 436 } 437 } 438 }