1 /**
2 * Copyright 2003-2006 Greg Luck
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.CacheManager;
20 import net.sf.ehcache.CacheException;
21 import net.sf.ehcache.Cache;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.util.Date;
26 import java.util.List;
27 import java.util.ArrayList;
28 import java.util.Iterator;
29 import java.rmi.NotBoundException;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34 /**
35 * A peer provider which discovers peers using Multicast.
36 * <p/>
37 * Hosts can be in three different levels of conformance with the Multicast specification (RFC1112), according to the requirements they meet.
38 * <ol>
39 * <li>Level 0 is the "no support for IP Multicasting" level. Lots of hosts and routers in the Internet are in this state,
40 * as multicast support is not mandatory in IPv4 (it is, however, in IPv6).
41 * Not too much explanation is needed here: hosts in this level can neither send nor receive multicast packets.
42 * They must ignore the ones sent by other multicast capable hosts.
43 * <li>Level 1 is the "support for sending but not receiving multicast IP datagrams" level.
44 * Thus, note that it is not necessary to join a multicast group to be able to send datagrams to it.
45 * Very few additions are needed in the IP module to make a "Level 0" host "Level 1-compliant".
46 * <li>Level 2 is the "full support for IP multicasting" level.
47 * Level 2 hosts must be able to both send and receive multicast traffic.
48 * They must know the way to join and leave multicast groups and to propagate this information to multicast routers.
49 * Thus, they must include an Internet Group Management Protocol (IGMP) implementation in their TCP/IP stack.
50 * </ol>
51 * <p/>
52 * The list of CachePeers is maintained via heartbeats. rmiUrls are looked up using RMI and converted to CachePeers on
53 * registration. On lookup any stale references are removed.
54 *
55 * @author Greg Luck
56 * @version $Id: MulticastRMICacheManagerPeerProvider.java 52 2006-04-24 14:50:03Z gregluck $
57 */
58 public final class MulticastRMICacheManagerPeerProvider extends RMICacheManagerPeerProvider implements CacheManagerPeerProvider {
59
60 /**
61 * The number of ms until the peer is considered to be offline. Once offline it will not be sent
62 * notifications.
63 */
64 public static final int STALE_PEER_TIME_MS = 11000;
65
66 private static final Log LOG = LogFactory.getLog(MulticastRMICacheManagerPeerProvider.class.getName());
67
68
69 private final MulticastKeepaliveHeartbeatReceiver heartBeatReceiver;
70 private final MulticastKeepaliveHeartbeatSender heartBeatSender;
71
72 /**
73 * Creates and starts a multicast peer provider
74 *
75 * @param groupMulticastAddress 224.0.0.1 to 239.255.255.255 e.g. 230.0.0.1
76 * @param groupMulticastPort 1025 to 65536 e.g. 4446
77 */
78 public MulticastRMICacheManagerPeerProvider(CacheManager cacheManager, InetAddress groupMulticastAddress,
79 Integer groupMulticastPort) {
80 super(cacheManager);
81 heartBeatReceiver = new MulticastKeepaliveHeartbeatReceiver(this, groupMulticastAddress, groupMulticastPort);
82 heartBeatSender = new MulticastKeepaliveHeartbeatSender(cacheManager, groupMulticastAddress, groupMulticastPort);
83 }
84
85 /**
86 * {@inheritDoc}
87 */
88 public final void init() throws CacheException {
89 try {
90 heartBeatReceiver.init();
91 heartBeatSender.init();
92 } catch (IOException exception) {
93 LOG.error("Error starting heartbeat. Error was: " + exception.getMessage(), exception);
94 throw new CacheException(exception.getMessage());
95 }
96 }
97
98 /**
99 * Register a new peer.
100 * @param rmiUrl
101 */
102 public final synchronized void registerPeer(String rmiUrl) {
103
104 try {
105 CachePeer cachePeer = lookupRemoteCachePeer(rmiUrl);
106 CachePeerEntry cachePeerEntry = new CachePeerEntry(cachePeer, new Date());
107 peerUrls.put(rmiUrl, cachePeerEntry);
108 } catch (IOException e) {
109 if (LOG.isDebugEnabled()) {
110 LOG.debug("Unable to lookup remote cache peer for " + rmiUrl + ". Removing from peer list. Cause was: "
111 + e.getMessage());
112 }
113 peerUrls.remove(rmiUrl);
114 } catch (NotBoundException e) {
115 peerUrls.remove(rmiUrl);
116 if (LOG.isDebugEnabled()) {
117 LOG.debug("Unable to lookup remote cache peer for " + rmiUrl + ". Removing from peer list. Cause was: "
118 + e.getMessage());
119 }
120 } catch (Throwable t) {
121 LOG.error("Unable to lookup remote cache peer for " + rmiUrl
122 + ". Cause was not due to an IOException or NotBoundException which will occur in normal operation:" +
123 " " + t.getMessage());
124 }
125 }
126
127 /**
128 * @return a list of {@link CachePeer} peers, excluding the local peer.
129 */
130 public final synchronized List listRemoteCachePeers(Cache cache) throws CacheException {
131 List remoteCachePeers = new ArrayList();
132 List staleList = new ArrayList();
133 synchronized (peerUrls) {
134 for (Iterator iterator = peerUrls.keySet().iterator(); iterator.hasNext();) {
135 String rmiUrl = (String) iterator.next();
136 String rmiUrlCacheName = extractCacheName(rmiUrl);
137 try {
138 if (!rmiUrlCacheName.equals(cache.getName())) {
139 continue;
140 }
141 CachePeerEntry cachePeerEntry = (CachePeerEntry) peerUrls.get(rmiUrl);
142 Date date = cachePeerEntry.date;
143 if (!stale(date)) {
144 CachePeer cachePeer = cachePeerEntry.cachePeer;
145 remoteCachePeers.add(cachePeer);
146 } else {
147 if (LOG.isDebugEnabled()) {
148 LOG.debug("rmiUrl " + rmiUrl + " is stale. Either the remote peer is shutdown or the " +
149 "network connectivity has been interrupted. Will be removed from list of remote cache peers");
150 }
151 staleList.add(rmiUrl);
152 }
153 } catch (Exception exception) {
154 LOG.error(exception.getMessage(), exception);
155 throw new CacheException("Unable to list remote cache peers. Error was " + exception.getMessage());
156 }
157 }
158
159 for (int i = 0; i < staleList.size(); i++) {
160 String rmiUrl = (String) staleList.get(i);
161 peerUrls.remove(rmiUrl);
162 }
163 }
164 return remoteCachePeers;
165 }
166
167
168 /**
169 * Shutdown the heartbeat
170 */
171 public final void dispose() {
172 heartBeatSender.dispose();
173 heartBeatReceiver.dispose();
174 }
175
176 /**
177 * Whether the entry should be considered stale.
178 * This will depend on the type of RMICacheManagerPeerProvider.
179 * This method should be overridden for implementations that go stale based on date
180 *
181 * @param date the date the entry was created
182 * @return true if stale
183 */
184 protected final boolean stale(Date date) {
185 long now = System.currentTimeMillis();
186 return date.getTime() < (now - STALE_PEER_TIME_MS);
187 }
188
189
190 /**
191 * Entry containing a looked up CachePeer and date
192 */
193 protected static final class CachePeerEntry {
194
195 private final CachePeer cachePeer;
196 private final Date date;
197
198 /**
199 * Constructor
200 *
201 * @param cachePeer the cache peer part of this entry
202 * @param date the date part of this entry
203 */
204 public CachePeerEntry(CachePeer cachePeer, Date date) {
205 this.cachePeer = cachePeer;
206 this.date = date;
207 }
208
209 /**
210 * @return the cache peer part of this entry
211 */
212 public final CachePeer getCachePeer() {
213 return cachePeer;
214 }
215
216
217 /**
218 * @return the date part of this entry
219 */
220 public final Date getDate() {
221 return date;
222 }
223
224 }
225
226 }