1 /** 2 * Copyright 2003-2006 Greg Luck 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package net.sf.ehcache.distribution; 18 19 import net.sf.ehcache.CacheManager; 20 import org.apache.commons.logging.Log; 21 import org.apache.commons.logging.LogFactory; 22 23 import java.io.IOException; 24 import java.net.DatagramPacket; 25 import java.net.InetAddress; 26 import java.net.MulticastSocket; 27 import java.rmi.RemoteException; 28 import java.util.StringTokenizer; 29 30 /** 31 * Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there. 32 * <p/> 33 * Our own multicast heartbeats are ignored. 34 * 35 * @author Greg Luck 36 * @version $Id: MulticastKeepaliveHeartbeatReceiver.java 52 2006-04-24 14:50:03Z gregluck $ 37 */ 38 public final class MulticastKeepaliveHeartbeatReceiver { 39 40 private static final Log LOG = LogFactory.getLog(MulticastKeepaliveHeartbeatReceiver.class.getName()); 41 42 private final InetAddress groupMulticastAddress; 43 private final Integer groupMulticastPort; 44 private MulticastReceiverThread receiverThread; 45 private MulticastSocket socket; 46 private boolean stopped; 47 private final MulticastRMICacheManagerPeerProvider peerProvider; 48 49 /** 50 * Constructor. 51 * 52 * @param peerProvider 53 * @param multicastAddress 54 * @param multicastPort 55 */ 56 public MulticastKeepaliveHeartbeatReceiver( 57 MulticastRMICacheManagerPeerProvider peerProvider, InetAddress multicastAddress, Integer multicastPort) { 58 this.peerProvider = peerProvider; 59 this.groupMulticastAddress = multicastAddress; 60 this.groupMulticastPort = multicastPort; 61 } 62 63 /** 64 * Start. 65 * @throws IOException 66 */ 67 final void init() throws IOException { 68 69 socket = new MulticastSocket(groupMulticastPort.intValue()); 70 socket.joinGroup(groupMulticastAddress); 71 receiverThread = new MulticastReceiverThread(); 72 receiverThread.start(); 73 } 74 75 /** 76 * Shutdown the heartbeat. 77 */ 78 public final void dispose() { 79 stopped = true; 80 receiverThread.interrupt(); 81 } 82 83 84 /** 85 * A multicast receiver which continously receives heartbeats. 86 */ 87 private final class MulticastReceiverThread extends Thread { 88 89 90 91 public final void run() { 92 byte[] buf = new byte[PayloadUtil.MTU]; 93 while (!stopped) { 94 DatagramPacket packet = new DatagramPacket(buf, buf.length); 95 try { 96 socket.receive(packet); 97 byte[] payload = packet.getData(); 98 processPayload(payload); 99 100 101 } catch (IOException e) { 102 if (!stopped) { 103 LOG.error("Error receiving heartbeat. " + e.getMessage() + ". Initial cause was " + e.getMessage(), e); 104 } 105 } 106 } 107 } 108 109 private void processPayload(byte[] compressedPayload) { 110 byte[] payload = PayloadUtil.ungzip(compressedPayload); 111 String rmiUrls = new String(payload); 112 if (self(rmiUrls)) { 113 return; 114 } 115 rmiUrls = rmiUrls.trim(); 116 if (LOG.isTraceEnabled()) { 117 LOG.trace("rmiUrls received " + rmiUrls); 118 } 119 for (StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls, 120 PayloadUtil.URL_DELIMITER); stringTokenizer.hasMoreTokens();) { 121 String rmiUrl = stringTokenizer.nextToken(); 122 registerNotification(rmiUrl); 123 } 124 } 125 126 127 /** 128 * @param rmiUrls 129 * @return true if our own hostname and listener port are found in the list. This then means we have 130 * caught our onw multicast, and should be ignored. 131 */ 132 private boolean self(String rmiUrls) { 133 CacheManager cacheManager = peerProvider.getCacheManager(); 134 CachePeer peer = (CachePeer) cacheManager.getCachePeerListener().getBoundCachePeers().get(0); 135 String cacheManagerUrlBase = null; 136 try { 137 cacheManagerUrlBase = peer.getUrlBase(); 138 } catch (RemoteException e) { 139 LOG.error("Error geting url base"); 140 } 141 int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase); 142 return baseUrlMatch != -1; 143 } 144 145 private void registerNotification(String rmiUrl) { 146 peerProvider.registerPeer(rmiUrl); 147 } 148 149 150 /** 151 * {@inheritDoc} 152 */ 153 public final void interrupt() { 154 try { 155 socket.leaveGroup(groupMulticastAddress); 156 } catch (IOException e) { 157 LOG.error("Error leaving group"); 158 } 159 socket.close(); 160 super.interrupt(); 161 } 162 } 163 164 165 }