1 /** 2 * Copyright 2003-2006 Greg Luck 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package net.sf.ehcache.distribution; 18 19 import org.apache.commons.logging.Log; 20 import org.apache.commons.logging.LogFactory; 21 22 import java.io.IOException; 23 import java.net.DatagramPacket; 24 import java.net.InetAddress; 25 import java.net.MulticastSocket; 26 import java.util.List; 27 28 import net.sf.ehcache.CacheManager; 29 30 /** 31 * Sends heartbeats to a multicast group containing a compressed list of URLs. Supports up to approximately 32 * 500 configured caches. 33 * 34 * @author Greg Luck 35 * @version $Id: MulticastKeepaliveHeartbeatSender.java 211 2006-10-23 03:57:08Z gregluck $ 36 */ 37 public final class MulticastKeepaliveHeartbeatSender { 38 39 40 private static final Log LOG = LogFactory.getLog(MulticastKeepaliveHeartbeatSender.class.getName()); 41 private static final long HEARTBEAT_INTERVAL = 5000; 42 43 private final InetAddress groupMulticastAddress; 44 private final Integer groupMulticastPort; 45 private MulticastServerThread serverThread; 46 private boolean stopped; 47 private final CacheManager cacheManager; 48 49 50 /** 51 * Constructor 52 * 53 * @param multicastAddress 54 * @param multicastPort 55 */ 56 public MulticastKeepaliveHeartbeatSender(CacheManager cacheManager, 57 InetAddress multicastAddress, Integer multicastPort) { 58 this.cacheManager = cacheManager; 59 this.groupMulticastAddress = multicastAddress; 60 this.groupMulticastPort = multicastPort; 61 62 } 63 64 /** 65 * Start the heartbeat thread 66 */ 67 public final void init() { 68 serverThread = new MulticastServerThread(); 69 serverThread.start(); 70 } 71 72 /** 73 * Shutdown this heartbeat sender 74 */ 75 public final synchronized void dispose() { 76 stopped = true; 77 notifyAll(); 78 serverThread.interrupt(); 79 } 80 81 /** 82 * A thread which sends a multicast heartbeat every second 83 */ 84 private final class MulticastServerThread extends Thread { 85 86 private MulticastSocket socket; 87 private byte[] compressedUrlList; 88 private int cachePeersHash; 89 90 91 /** 92 * Constructor 93 */ 94 public MulticastServerThread() { 95 super("Multicast Server Thread"); 96 setDaemon(true); 97 } 98 99 public final void run() { 100 try { 101 socket = new MulticastSocket(groupMulticastPort.intValue()); 102 socket.joinGroup(groupMulticastAddress); 103 104 while (!stopped) { 105 byte[] buffer = createCachePeersPayload(); 106 DatagramPacket packet = new DatagramPacket(buffer, buffer.length, groupMulticastAddress, 107 groupMulticastPort.intValue()); 108 socket.send(packet); 109 110 try { 111 synchronized (this) { 112 wait(HEARTBEAT_INTERVAL); 113 } 114 } catch (InterruptedException e) { 115 if (!stopped) { 116 LOG.error("Error receiving heartbeat. Initial cause was " + e.getMessage(), e); 117 } 118 } 119 } 120 closeSocket(); 121 122 } catch (IOException e) { 123 LOG.debug(e); 124 } 125 } 126 127 /** 128 * Creates a gzipped payload. 129 * <p/> 130 * The last gzipped payload is retained and only recalculated if the list of cache peers 131 * has changed. 132 * 133 * @return a gzipped byte[] 134 */ 135 private byte[] createCachePeersPayload() { 136 List localCachePeers = cacheManager.getCachePeerListener().getBoundCachePeers(); 137 int newCachePeersHash = localCachePeers.hashCode(); 138 if (cachePeersHash != newCachePeersHash) { 139 cachePeersHash = newCachePeersHash; 140 byte[] uncompressedUrlList = PayloadUtil.assembleUrlList(localCachePeers); 141 compressedUrlList = PayloadUtil.gzip(uncompressedUrlList); 142 if (compressedUrlList.length > PayloadUtil.MTU) { 143 LOG.fatal("Heartbeat is not working. Configure fewer caches for replication. " + 144 "Size is " + compressedUrlList.length + " but should be no greater than" + 145 PayloadUtil.MTU); 146 } 147 } 148 return compressedUrlList; 149 } 150 151 152 /** 153 * Interrupts this thread. 154 * <p/> 155 * <p> Unless the current thread is interrupting itself, which is 156 * always permitted, the {@link #checkAccess() checkAccess} method 157 * of this thread is invoked, which may cause a {@link 158 * SecurityException} to be thrown. 159 * <p/> 160 * <p> If this thread is blocked in an invocation of the {@link 161 * Object#wait() wait()}, {@link Object#wait(long) wait(long)}, or {@link 162 * Object#wait(long,int) wait(long, int)} methods of the {@link Object} 163 * class, or of the {@link #join()}, {@link #join(long)}, {@link 164 * #join(long,int)}, {@link #sleep(long)}, or {@link #sleep(long,int)}, 165 * methods of this class, then its interrupt status will be cleared and it 166 * will receive an {@link InterruptedException}. 167 * <p/> 168 * <p> If this thread is blocked in an I/O operation upon an {@link 169 * java.nio.channels.InterruptibleChannel </code>interruptible 170 * channel<code>} then the channel will be closed, the thread's interrupt 171 * status will be set, and the thread will receive a {@link 172 * java.nio.channels.ClosedByInterruptException}. 173 * <p/> 174 * <p> If this thread is blocked in a {@link java.nio.channels.Selector} 175 * then the thread's interrupt status will be set and it will return 176 * immediately from the selection operation, possibly with a non-zero 177 * value, just as if the selector's {@link 178 * java.nio.channels.Selector#wakeup wakeup} method were invoked. 179 * <p/> 180 * <p> If none of the previous conditions hold then this thread's interrupt 181 * status will be set. </p> 182 * 183 * @throws SecurityException if the current thread cannot modify this thread 184 */ 185 public final void interrupt() { 186 closeSocket(); 187 super.interrupt(); 188 } 189 190 private void closeSocket() { 191 try { 192 if (socket != null && !socket.isClosed()) { 193 try { 194 socket.leaveGroup(groupMulticastAddress); 195 } catch (IOException e) { 196 LOG.error("Error leaving multicast group. Message was " + e.getMessage()); 197 } 198 socket.close(); 199 } 200 } catch (NoSuchMethodError e) { 201 LOG.debug("socket.isClosed is not supported by JDK1.3"); 202 try { 203 socket.leaveGroup(groupMulticastAddress); 204 } catch (IOException ex) { 205 LOG.error("Error leaving multicast group. Message was " + ex.getMessage()); 206 } 207 socket.close(); 208 } 209 } 210 211 212 } 213 }