View Javadoc

1   /**
2    *  Copyright 2003-2006 Greg Luck
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.CacheManager;
20  import net.sf.ehcache.CacheException;
21  import net.sf.ehcache.Cache;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.util.Date;
26  import java.util.List;
27  import java.util.ArrayList;
28  import java.util.Iterator;
29  import java.rmi.NotBoundException;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  
34  /**
35   * A peer provider which discovers peers using Multicast.
36   * <p/>
37   * Hosts can be in three different levels of conformance with the Multicast specification (RFC1112), according to the requirements they meet.
38   * <ol>
39   * <li>Level 0 is the "no support for IP Multicasting" level. Lots of hosts and routers in the Internet are in this state,
40   * as multicast support is not mandatory in IPv4 (it is, however, in IPv6).
41   * Not too much explanation is needed here: hosts in this level can neither send nor receive multicast packets.
42   * They must ignore the ones sent by other multicast capable hosts.
43   * <li>Level 1 is the "support for sending but not receiving multicast IP datagrams" level.
44   * Thus, note that it is not necessary to join a multicast group to be able to send datagrams to it.
45   * Very few additions are needed in the IP module to make a "Level 0" host "Level 1-compliant".
46   * <li>Level 2 is the "full support for IP multicasting" level.
47   * Level 2 hosts must be able to both send and receive multicast traffic.
48   * They must know the way to join and leave multicast groups and to propagate this information to multicast routers.
49   * Thus, they must include an Internet Group Management Protocol (IGMP) implementation in their TCP/IP stack.
50   * </ol>
51   * <p/>
52   * The list of CachePeers is maintained via heartbeats. rmiUrls are looked up using RMI and converted to CachePeers on
53   * registration. On lookup any stale references are removed.
54   *
55   * @author Greg Luck
56   * @version $Id: MulticastRMICacheManagerPeerProvider.java 52 2006-04-24 14:50:03Z gregluck $
57   */
58  public final class MulticastRMICacheManagerPeerProvider extends RMICacheManagerPeerProvider implements CacheManagerPeerProvider {
59  
60      /**
61       * The number of ms until the peer is considered to be offline. Once offline it will not be sent
62       * notifications.
63       */
64      public static final int STALE_PEER_TIME_MS = 11000;
65  
66      private static final Log LOG = LogFactory.getLog(MulticastRMICacheManagerPeerProvider.class.getName());
67  
68  
69      private final MulticastKeepaliveHeartbeatReceiver heartBeatReceiver;
70      private final MulticastKeepaliveHeartbeatSender heartBeatSender;
71  
72      /**
73       * Creates and starts a multicast peer provider
74       *
75       * @param groupMulticastAddress 224.0.0.1 to 239.255.255.255 e.g. 230.0.0.1
76       * @param groupMulticastPort    1025 to 65536 e.g. 4446
77       */
78      public MulticastRMICacheManagerPeerProvider(CacheManager cacheManager, InetAddress groupMulticastAddress,
79                                                  Integer groupMulticastPort) {
80          super(cacheManager);
81          heartBeatReceiver = new MulticastKeepaliveHeartbeatReceiver(this, groupMulticastAddress, groupMulticastPort);
82          heartBeatSender = new MulticastKeepaliveHeartbeatSender(cacheManager, groupMulticastAddress, groupMulticastPort);
83      }
84  
85      /**
86       * {@inheritDoc}
87       */
88      public final void init() throws CacheException {
89          try {
90              heartBeatReceiver.init();
91              heartBeatSender.init();
92          } catch (IOException exception) {
93              LOG.error("Error starting heartbeat. Error was: " + exception.getMessage(), exception);
94              throw new CacheException(exception.getMessage());
95          }
96      }
97  
98      /**
99       * Register a new peer.
100      * @param rmiUrl
101      */
102     public final synchronized void registerPeer(String rmiUrl) {
103 
104         try {
105             CachePeer cachePeer = lookupRemoteCachePeer(rmiUrl);
106             CachePeerEntry cachePeerEntry = new CachePeerEntry(cachePeer, new Date());
107             peerUrls.put(rmiUrl, cachePeerEntry);
108         } catch (IOException e) {
109             if (LOG.isDebugEnabled()) {
110                 LOG.debug("Unable to lookup remote cache peer for " + rmiUrl + ". Removing from peer list. Cause was: "
111                         + e.getMessage());
112             }
113             peerUrls.remove(rmiUrl);
114         } catch (NotBoundException e) {
115             peerUrls.remove(rmiUrl);
116             if (LOG.isDebugEnabled()) {
117                 LOG.debug("Unable to lookup remote cache peer for " + rmiUrl + ". Removing from peer list. Cause was: "
118                         + e.getMessage());
119             }
120         } catch (Throwable t) {
121             LOG.error("Unable to lookup remote cache peer for " + rmiUrl
122                     + ". Cause was not due to an IOException or NotBoundException which will occur in normal operation:" +
123                     " " + t.getMessage());
124         }
125     }
126 
127     /**
128      * @return a list of {@link CachePeer} peers, excluding the local peer.
129      */
130     public final synchronized List listRemoteCachePeers(Cache cache) throws CacheException {
131         List remoteCachePeers = new ArrayList();
132         List staleList = new ArrayList();
133         synchronized (peerUrls) {
134             for (Iterator iterator = peerUrls.keySet().iterator(); iterator.hasNext();) {
135                 String rmiUrl = (String) iterator.next();
136                 String rmiUrlCacheName = extractCacheName(rmiUrl);
137                 try {
138                     if (!rmiUrlCacheName.equals(cache.getName())) {
139                         continue;
140                     }
141                     CachePeerEntry cachePeerEntry = (CachePeerEntry) peerUrls.get(rmiUrl);
142                     Date date = cachePeerEntry.date;
143                     if (!stale(date)) {
144                         CachePeer cachePeer = cachePeerEntry.cachePeer;
145                         remoteCachePeers.add(cachePeer);
146                     } else {
147                         if (LOG.isDebugEnabled()) {
148                             LOG.debug("rmiUrl " + rmiUrl + " is stale. Either the remote peer is shutdown or the " +
149                                     "network connectivity has been interrupted. Will be removed from list of remote cache peers");
150                         }
151                         staleList.add(rmiUrl);
152                     }
153                 } catch (Exception exception) {
154                     LOG.error(exception.getMessage(), exception);
155                     throw new CacheException("Unable to list remote cache peers. Error was " + exception.getMessage());
156                 }
157             }
158             //Must remove entries after we have finished iterating over them
159             for (int i = 0; i < staleList.size(); i++) {
160                 String rmiUrl = (String) staleList.get(i);
161                 peerUrls.remove(rmiUrl);
162             }
163         }
164         return remoteCachePeers;
165     }
166 
167 
168     /**
169      * Shutdown the heartbeat
170      */
171     public final void dispose() {
172         heartBeatSender.dispose();
173         heartBeatReceiver.dispose();
174     }
175 
176     /**
177      * Whether the entry should be considered stale.
178      * This will depend on the type of RMICacheManagerPeerProvider.
179      * This method should be overridden for implementations that go stale based on date
180      *
181      * @param date the date the entry was created
182      * @return true if stale
183      */
184     protected final boolean stale(Date date) {
185         long now = System.currentTimeMillis();
186         return date.getTime() < (now - STALE_PEER_TIME_MS);
187     }
188 
189 
190     /**
191      * Entry containing a looked up CachePeer and date
192      */
193     protected static final class CachePeerEntry {
194 
195         private final CachePeer cachePeer;
196         private final Date date;
197 
198         /**
199          * Constructor
200          *
201          * @param cachePeer the cache peer part of this entry
202          * @param date      the date part of this entry
203          */
204         public CachePeerEntry(CachePeer cachePeer, Date date) {
205             this.cachePeer = cachePeer;
206             this.date = date;
207         }
208 
209         /**
210          * @return the cache peer part of this entry
211          */
212         public final CachePeer getCachePeer() {
213             return cachePeer;
214         }
215 
216 
217         /**
218          * @return the date part of this entry
219          */
220         public final Date getDate() {
221             return date;
222         }
223 
224     }
225 
226 }