View Javadoc

1   /**
2    *  Copyright 2003-2006 Greg Luck
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import junit.framework.TestCase;
20  import net.sf.ehcache.Cache;
21  import net.sf.ehcache.CacheException;
22  import net.sf.ehcache.CacheManager;
23  import net.sf.ehcache.Element;
24  import net.sf.ehcache.AbstractCacheTest;
25  import net.sf.ehcache.StopWatch;
26  import net.sf.ehcache.ThreadKiller;
27  import net.sf.ehcache.event.CountingCacheEventListener;
28  
29  import java.io.Serializable;
30  import java.io.IOException;
31  import java.util.Arrays;
32  import java.util.Date;
33  import java.util.List;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  
38  /**
39   * Tests replication of Cache events
40   * <p/>
41   * Note these tests need a live network interface running in multicast mode to work
42   * <p/>
43   * If running involving RMIAsynchronousCacheReplicator individually the test will fail because
44   * the VM will gobble up the SoftReferences rather than allocating more memory. Uncomment the
45   * forceVMGrowth() method usage in setup.
46   *
47   * @author Greg Luck
48   * @version $Id: RMICacheReplicatorTest.java 141 2006-06-30 01:56:32Z gregluck $
49   */
50  public class RMICacheReplicatorTest extends TestCase {
51  
52      private static final Log LOG = LogFactory.getLog(RMICacheReplicatorTest.class.getName());
53  
54      private static final boolean ASYNCHRONOUS = true;
55      private static final boolean SYNCHRONOUS = false;
56  
57      /**
58       * CacheManager 1 in the cluster
59       */
60      protected CacheManager manager1;
61      /**
62       * CacheManager 2 in the cluster
63       */
64      protected CacheManager manager2;
65      /**
66       * CacheManager 3 in the cluster
67       */
68      protected CacheManager manager3;
69      /**
70       * CacheManager 4 in the cluster
71       */
72      protected CacheManager manager4;
73      /**
74       * CacheManager 5 in the cluster
75       */
76      protected CacheManager manager5;
77      /**
78       * CacheManager 6 in the cluster
79       */
80      protected CacheManager manager6;
81  
82      /**
83       * The name of the cache under test
84       */
85      protected String cacheName = "sampleCache1";
86      /**
87       * CacheManager 1 of 2s cache being replicated
88       */
89      protected Cache cache1;
90  
91      /**
92       * CacheManager 2 of 2s cache being replicated
93       */
94      protected Cache cache2;
95  
96  
97      /**
98       * {@inheritDoc}
99       * Sets up two caches: cache1 is local. cache2 is to be receive updates
100      *
101      * @throws Exception
102      */
103     protected void setUp() throws Exception {
104         if (JVMUtil.isSingleRMIRegistryPerVM()) {
105             return;
106         }
107 
108         //Required to get SoftReference tests to pass. The VM clean up SoftReferences rather than allocating
109         // memory to -Xmx!
110         //forceVMGrowth();
111         //System.gc();
112 
113         CountingCacheEventListener.resetCounters();
114         manager1 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed1.xml");
115         manager2 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed2.xml");
116         manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed3.xml");
117         manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed4.xml");
118         manager5 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed5.xml");
119 
120         manager1.getCache(cacheName).removeAll();
121 
122         cache1 = manager1.getCache(cacheName);
123         cache1.removeAll();
124 
125         cache2 = manager2.getCache(cacheName);
126         cache2.removeAll();
127 
128         //allow cluster to be established
129         Thread.sleep(6000);
130 
131     }
132 
133     private void forceVMGrowth() {
134         byte[] forceVMGrowth = new byte[50000000];
135     }
136 
137 
138     /**
139      * {@inheritDoc}
140      *
141      * @throws Exception
142      */
143     protected void tearDown() throws Exception {
144 
145         if (JVMUtil.isSingleRMIRegistryPerVM()) {
146             return;
147         }
148 
149         if (manager1 != null) {
150             manager1.shutdown();
151         }
152         if (manager2 != null) {
153             manager2.shutdown();
154         }
155         if (manager3 != null) {
156             manager3.shutdown();
157         }
158         if (manager4 != null) {
159             manager4.shutdown();
160         }
161 
162         if (manager5 != null) {
163             manager5.shutdown();
164         }
165 
166         if (manager6 != null) {
167             manager6.shutdown();
168         }
169 
170     }
171 
172     /**
173      * 5 cache managers should means that each cache has four remote peers
174      */
175     public void testRemoteCachePeersEqualsNumberOfCacheManagersInCluster() {
176 
177         if (JVMUtil.isSingleRMIRegistryPerVM()) {
178             return;
179         }
180 
181 
182         CacheManagerPeerProvider provider = manager1.getCachePeerProvider();
183         List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
184         assertEquals(4, remotePeersOfCache1.size());
185     }
186 
187     /**
188      * Does a new cache manager in the cluster get detected?
189      */
190     public void testRemoteCachePeersDetectsNewCacheManager() throws InterruptedException {
191 
192         if (JVMUtil.isSingleRMIRegistryPerVM()) {
193             return;
194         }
195 
196         CacheManagerPeerProvider provider = manager1.getCachePeerProvider();
197         List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
198         assertEquals(4, remotePeersOfCache1.size());
199 
200         //Add new CacheManager to cluster
201         manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed6.xml");
202 
203         //Allow detection to occur
204         Thread.sleep(1010);
205 
206         remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
207         assertEquals(5, remotePeersOfCache1.size());
208     }
209 
210     /**
211      * Does a down cache manager in the cluster get removed?
212      */
213     public void testRemoteCachePeersDetectsDownCacheManager() throws InterruptedException {
214 
215         if (JVMUtil.isSingleRMIRegistryPerVM()) {
216             return;
217         }
218 
219 
220         CacheManagerPeerProvider provider = manager1.getCachePeerProvider();
221         List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
222         assertEquals(4, remotePeersOfCache1.size());
223 
224         //Drop a CacheManager from the cluster
225         manager5.shutdown();
226 
227         //Allow change detection to occur. Heartbeat 1 second and is not stale until 5000
228         Thread.sleep(11010);
229         remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
230 
231 
232         assertEquals(3, remotePeersOfCache1.size());
233     }
234 
235     /**
236      * Does a down cache manager in the cluster get removed?
237      */
238     public void testRemoteCachePeersDetectsDownCacheManagerSlow() throws InterruptedException {
239 
240         if (JVMUtil.isSingleRMIRegistryPerVM()) {
241             return;
242         }
243 
244         CacheManagerPeerProvider provider = manager1.getCachePeerProvider();
245         List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
246         assertEquals(4, remotePeersOfCache1.size());
247 
248         //Drop a CacheManager from the cluster
249         manager5.shutdown();
250 
251         //Insufficient time for it to timeout
252         remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
253         assertEquals(4, remotePeersOfCache1.size());
254     }
255 
256     /**
257      * Tests put and remove initiated from cache1 in a cluster
258      * <p/>
259      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
260      */
261     public void testPutProgagatesFromAndToEveryCacheManagerAndCache() throws CacheException, InterruptedException {
262 
263         if (JVMUtil.isSingleRMIRegistryPerVM()) {
264             return;
265         }
266 
267         //Put
268         String[] cacheNames = manager1.getCacheNames();
269         Arrays.sort(cacheNames);
270         for (int i = 0; i < cacheNames.length; i++) {
271             String name = cacheNames[i];
272             manager1.getCache(name).put(new Element("" + i, new Integer(i)));
273             //Add some non serializable elements that should not get propagated
274             manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
275         }
276 
277         waitForProgagate();
278 
279         int count2 = 0;
280         int count3 = 0;
281         int count4 = 0;
282         int count5 = 0;
283         for (int i = 0; i < cacheNames.length; i++) {
284             String name = cacheNames[i];
285             Element element2 = manager2.getCache(name).get("" + i);
286             if (element2 != null) {
287                 count2++;
288             }
289             Element element3 = manager3.getCache(name).get("" + i);
290             if (element3 != null) {
291                 count3++;
292             }
293             Element element4 = manager4.getCache(name).get("" + i);
294             if (element4 != null) {
295                 count4++;
296             }
297             Element element5 = manager5.getCache(name).get("" + i);
298             if (element5 != null) {
299                 count5++;
300             }
301             //LOG.debug("element propagated to cache named " + name + ": " + element);
302         }
303         assertEquals(55, count2);
304         assertEquals(55, count3);
305         assertEquals(55, count4);
306         assertEquals(55, count5);
307 
308     }
309 
310     /**
311      * Performance and capacity tests.
312      * <p/>
313      * The numbers given are for the remote peer tester (java -jar ehcache-test.jar ehcache-distributed1.xml)
314      * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
315      * it has fully received.
316      * <p/>
317      * r37 and earlier - initial implementation
318      * 38 seconds to get all notifications with 6 peers, 2000 Elements and 400 byte payload
319      * 18 seconds to get all notifications with 2 peers, 2000 Elements and 400 byte payload
320      * 40 seconds to get all notifications with 2 peers, 2000 Elements and 10k payload
321      * 22 seconds to get all notifications with 2 peers, 2000 Elements and 1k payload
322      * 26 seconds to get all notifications with 2 peers, 200 Elements and 100k payload
323      * <p/>
324      * r38 - RMI stub lookup on registration rather than at each lookup. Saves quite a few lookups. Also change to 5 second heartbeat
325      * 38 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (1 second heartbeat)
326      * 16 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
327      * 13 seconds to get 2000 notifications with 2 peers, Elements with 400 byte payload
328      * <p/>
329      * r39 - Batching asyn replicator. Send all queued messages in one RMI call once per second.
330      * 2 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
331      */
332     public void testBigPutsProgagatesAsynchronous() throws CacheException, InterruptedException {
333 
334         if (JVMUtil.isSingleRMIRegistryPerVM()) {
335             return;
336         }
337 
338         //Give everything a chance to startup
339         StopWatch stopWatch = new StopWatch();
340         Integer index = null;
341         for (int i = 0; i < 2; i++) {
342             for (int j = 0; j < 1000; j++) {
343                 index = new Integer(((1000 * i) + j));
344                 cache1.put(new Element(index,
345                         "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
346                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
347                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
348                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
349                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
350             }
351 
352         }
353         long elapsed = stopWatch.getElapsedTime();
354         long putTime = ((elapsed / 1000));
355         LOG.info("Put Elapsed time: " + putTime);
356         //assertTrue(putTime < 8);
357 
358         assertEquals(2000, cache1.getSize());
359 
360         Thread.sleep(2000);
361         assertEquals(2000, manager2.getCache("sampleCache1").getSize());
362         assertEquals(2000, manager3.getCache("sampleCache1").getSize());
363         assertEquals(2000, manager4.getCache("sampleCache1").getSize());
364         assertEquals(2000, manager5.getCache("sampleCache1").getSize());
365 
366     }
367 
368 
369     /**
370      * Drive everything to point of breakage within a 64MB VM.
371      */
372     public void xTestHugePutsBreaksAsynchronous() throws CacheException, InterruptedException {
373 
374         if (JVMUtil.isSingleRMIRegistryPerVM()) {
375             return;
376         }
377 
378         //Give everything a chance to startup
379         StopWatch stopWatch = new StopWatch();
380         Integer index = null;
381         for (int i = 0; i < 500; i++) {
382             for (int j = 0; j < 1000; j++) {
383                 index = new Integer(((1000 * i) + j));
384                 cache1.put(new Element(index,
385                         "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
386                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
387                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
388                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
389                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
390             }
391 
392         }
393         long elapsed = stopWatch.getElapsedTime();
394         long putTime = ((elapsed / 1000));
395         LOG.info("Put Elapsed time: " + putTime);
396         //assertTrue(putTime < 8);
397 
398         assertEquals(100000, cache1.getSize());
399 
400         Thread.sleep(100000);
401         assertEquals(20000, manager2.getCache("sampleCache1").getSize());
402         assertEquals(20000, manager3.getCache("sampleCache1").getSize());
403         assertEquals(20000, manager4.getCache("sampleCache1").getSize());
404         assertEquals(20000, manager5.getCache("sampleCache1").getSize());
405 
406     }
407 
408 
409     /**
410      * Performance and capacity tests.
411      * <p/>
412      * The numbers given are for the remote peer tester (java -jar ehcache-test.jar ehcache-distributed1.xml)
413      * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
414      * it has fully received.
415      * <p/>
416      * 4 seconds to get all remove notifications with 6 peers, 5000 Elements and 400 byte payload
417      */
418     public void testBigRemovesProgagatesAsynchronous() throws CacheException, InterruptedException {
419 
420         if (JVMUtil.isSingleRMIRegistryPerVM()) {
421             return;
422         }
423 
424         //Give everything a chance to startup
425         Integer index = null;
426         for (int i = 0; i < 5; i++) {
427             for (int j = 0; j < 1000; j++) {
428                 index = new Integer(((1000 * i) + j));
429                 cache1.put(new Element(index,
430                         "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
431                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
432                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
433                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
434                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
435             }
436 
437         }
438         Thread.sleep(5000);
439         assertEquals(5000, cache1.getSize());
440         assertEquals(5000, manager2.getCache("sampleCache1").getSize());
441         assertEquals(5000, manager3.getCache("sampleCache1").getSize());
442         assertEquals(5000, manager4.getCache("sampleCache1").getSize());
443         assertEquals(5000, manager5.getCache("sampleCache1").getSize());
444 
445         //Let the disk stores catch up before the next stage of the test
446         Thread.sleep(2000);
447 
448         StopWatch stopWatch = new StopWatch();
449 
450         for (int i = 0; i < 5; i++) {
451             for (int j = 0; j < 1000; j++) {
452                 index = new Integer(((1000 * i) + j));
453                 cache1.remove(index);
454             }
455         }
456 
457 
458         int timeForPropagate = 10000;
459 
460         Thread.sleep(timeForPropagate);
461         assertEquals(0, cache1.getSize());
462         assertEquals(0, manager2.getCache("sampleCache1").getSize());
463         assertEquals(0, manager3.getCache("sampleCache1").getSize());
464         assertEquals(0, manager4.getCache("sampleCache1").getSize());
465         assertEquals(0, manager5.getCache("sampleCache1").getSize());
466 
467         LOG.info("Remove Elapsed time: " + timeForPropagate);
468 
469 
470     }
471 
472 
473     /**
474      * Performance and capacity tests.
475      * <p/>
476      * 5 seconds to send all notifications synchronously with 5 peers, 2000 Elements and 400 byte payload
477      * The numbers given below are for the remote peer tester (java -jar ehcache-test.jar ehcache-distributed1.xml)
478      * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
479      * it has fully received.
480      */
481     public void testBigPutsProgagatesSynchronous() throws CacheException, InterruptedException {
482 
483         if (JVMUtil.isSingleRMIRegistryPerVM()) {
484             return;
485         }
486 
487         //Give everything a chance to startup
488         StopWatch stopWatch = new StopWatch();
489         Integer index;
490         for (int i = 0; i < 2; i++) {
491             for (int j = 0; j < 1000; j++) {
492                 index = new Integer(((1000 * i) + j));
493                 manager1.getCache("sampleCache3").put(new Element(index,
494                         "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
495                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
496                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
497                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
498                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
499             }
500 
501         }
502         long elapsed = stopWatch.getElapsedTime();
503         long putTime = ((elapsed / 1000));
504         LOG.info("Put and Propagate Synchronously Elapsed time: " + putTime + " seconds");
505 
506         assertEquals(2000, manager1.getCache("sampleCache3").getSize());
507         assertEquals(2000, manager2.getCache("sampleCache3").getSize());
508         assertEquals(2000, manager3.getCache("sampleCache3").getSize());
509         assertEquals(2000, manager4.getCache("sampleCache3").getSize());
510         assertEquals(2000, manager5.getCache("sampleCache3").getSize());
511 
512     }
513 
514 
515     /**
516      * Test various cache configurations for cache1 - explicit setting of:
517      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
518      */
519     public void testPutWithExplicitReplicationConfig() throws InterruptedException {
520         if (JVMUtil.isSingleRMIRegistryPerVM()) {
521             return;
522         }
523         putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
524     }
525 
526 
527     /**
528      * Test various cache configurations for cache1 - explicit setting of:
529      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
530      */
531     public void testPutWithThreadKiller() throws InterruptedException {
532         if (JVMUtil.isSingleRMIRegistryPerVM()) {
533             return;
534         }
535         putTestWithThreadKiller(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
536     }
537 
538     /**
539      * CacheEventListeners that are not CacheReplicators should receive cache events originated from receipt
540      * of a remote event by a CachePeer.
541      */
542     public void testRemotelyReceivedPutNotifiesCountingListener() throws InterruptedException {
543         if (JVMUtil.isSingleRMIRegistryPerVM()) {
544             return;
545         }
546         putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
547         assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager1.getCache("sampleCache1")).size());
548         assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager2.getCache("sampleCache1")).size());
549 
550     }
551 
552     /**
553      * Test various cache configurations for cache1 - explicit setting of:
554      * properties="replicateAsynchronously=false, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
555      */
556     public void testPutWithExplicitReplicationSynchronousConfig() throws InterruptedException {
557         if (JVMUtil.isSingleRMIRegistryPerVM()) {
558             return;
559         }
560         putTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
561     }
562 
563 
564     /**
565      * Test put replicated for cache4 - no properties.
566      * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
567      */
568     public void testPutWithEmptyReplicationPropertiesConfig() throws InterruptedException {
569         if (JVMUtil.isSingleRMIRegistryPerVM()) {
570             return;
571         }
572         putTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
573     }
574 
575     /**
576      * Test put replicated for cache4 - missing replicatePuts property.
577      * replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
578      * should equal replicateAsynchronously=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
579      */
580     public void testPutWithOneMissingReplicationPropertyConfig() throws InterruptedException {
581         if (JVMUtil.isSingleRMIRegistryPerVM()) {
582             return;
583         }
584         putTest(manager1.getCache("sampleCache5"), manager2.getCache("sampleCache5"), ASYNCHRONOUS);
585     }
586 
587 
588     /**
589      * Tests put and remove initiated from cache1 in a cluster
590      * <p/>
591      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
592      */
593     public void putTest(Cache fromCache, Cache toCache, boolean asynchronous) throws CacheException, InterruptedException {
594 
595         Serializable key = new Date();
596         Serializable value = new Date();
597         Element sourceElement = new Element(key, value);
598 
599         //Put
600         fromCache.put(sourceElement);
601         int i = 0;
602 
603         if (asynchronous) {
604             waitForProgagate();
605         }
606 
607         int j = 0;
608 
609         Thread.sleep(5000);
610 
611         LOG.info("" + manager1.getCache("sampleCache1").getSize());
612         LOG.info("" + manager2.getCache("sampleCache1").getSize());
613         LOG.info("" + manager3.getCache("sampleCache1").getSize());
614         LOG.info("" + manager4.getCache("sampleCache1").getSize());
615         LOG.info("" + manager5.getCache("sampleCache1").getSize());
616 
617     }
618 
619     /**
620      * Tests put and remove initiated from cache1 in a cluster
621      * <p/>
622      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
623      */
624     public void putTestWithThreadKiller(Cache fromCache, Cache toCache, boolean asynchronous)
625             throws CacheException, InterruptedException {
626 
627         fromCache.put(new Element("thread killer", new ThreadKiller()));
628         if (asynchronous) {
629             waitForProgagate();
630         }
631 
632         Serializable key = new Date();
633         Serializable value = new Date();
634         Element sourceElement = new Element(key, value);
635 
636         //Put
637         fromCache.put(sourceElement);
638 
639         if (asynchronous) {
640             waitForProgagate();
641         }
642 
643         //Should have been replicated to toCache.
644         Element deliveredElement = toCache.get(key);
645         assertEquals(sourceElement, deliveredElement);
646 
647     }
648 
649 
650     /**
651      * Checks that a put received from a remote cache notifies any registered listeners.
652      * <p/>
653      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
654      */
655     public void testRemotePutNotificationGetsToOtherListeners() throws CacheException, InterruptedException {
656 
657         if (JVMUtil.isSingleRMIRegistryPerVM()) {
658             return;
659         }
660 
661         Serializable key = new Date();
662         Serializable value = new Date();
663         Element element1 = new Element(key, value);
664 
665         //Put
666         cache1.put(new Element("1", new Date()));
667         cache1.put(new Element("2", new Date()));
668         cache1.put(new Element("3", new Date()));
669 
670         //Nonserializable and non deliverable put
671         Object nonSerializableObject = new Object();
672         cache1.put(new Element(nonSerializableObject, new Object()));
673 
674 
675         waitForProgagate();
676 
677         //local initiating cache's counting listener should have been notified
678         assertEquals(4, CountingCacheEventListener.getCacheElementsPut(cache1).size());
679         //remote receiving caches' counting listener should have been notified
680         assertEquals(3, CountingCacheEventListener.getCacheElementsPut(cache2).size());
681 
682         //Update
683         cache1.put(new Element("1", new Date()));
684         cache1.put(new Element("2", new Date()));
685         cache1.put(new Element("3", new Date()));
686 
687         //Nonserializable and non deliverable put
688         cache1.put(new Element(nonSerializableObject, new Object()));
689 
690         waitForProgagate();
691 
692         //local initiating cache's counting listener should have been notified
693         assertEquals(4, CountingCacheEventListener.getCacheElementsUpdated(cache1).size());
694         //remote receiving caches' counting listener should have been notified
695         assertEquals(3, CountingCacheEventListener.getCacheElementsUpdated(cache2).size());
696 
697         //Remove
698         cache1.remove("1");
699         cache1.remove("2");
700         cache1.remove("3");
701         cache1.remove(nonSerializableObject);
702 
703         waitForProgagate();
704 
705         //local initiating cache's counting listener should have been notified
706         assertEquals(4, CountingCacheEventListener.getCacheElementsRemoved(cache1).size());
707         //remote receiving caches' counting listener should have been notified
708         assertEquals(3, CountingCacheEventListener.getCacheElementsRemoved(cache2).size());
709 
710     }
711 
712 
713     /**
714      * Test various cache configurations for cache1 - explicit setting of:
715      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
716      */
717     public void testRemoveWithExplicitReplicationConfig() throws InterruptedException {
718         if (JVMUtil.isSingleRMIRegistryPerVM()) {
719             return;
720         }
721         removeTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
722     }
723 
724     /**
725      * Test various cache configurations for cache1 - explicit setting of:
726      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
727      */
728     public void testRemoveWithExplicitReplicationSynchronousConfig() throws InterruptedException {
729         if (JVMUtil.isSingleRMIRegistryPerVM()) {
730             return;
731         }
732         removeTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
733     }
734 
735 
736     /**
737      * Test put replicated for cache4 - no properties.
738      * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
739      */
740     public void testRemoveWithEmptyReplicationPropertiesConfig() throws InterruptedException {
741         if (JVMUtil.isSingleRMIRegistryPerVM()) {
742             return;
743         }
744         removeTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
745     }
746 
747     /**
748      * Tests put and remove initiated from a cache to another cache in a cluster
749      * <p/>
750      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
751      */
752     public void removeTest(Cache fromCache, Cache toCache, boolean asynchronous) throws CacheException, InterruptedException {
753 
754         Serializable key = new Date();
755         Serializable value = new Date();
756         Element element1 = new Element(key, value);
757 
758         //Put
759         fromCache.put(element1);
760 
761         if (asynchronous) {
762             waitForProgagate();
763         }
764 
765         //Should have been replicated to cache2.
766         Element element2 = toCache.get(key);
767         assertEquals(element1, element2);
768 
769         //Remove
770         fromCache.remove(key);
771         if (asynchronous) {
772             waitForProgagate();
773         }
774 
775         //Should have been replicated to cache2.
776         element2 = toCache.get(key);
777         assertNull(element2);
778 
779     }
780 
781 
782     /**
783      * Test various cache configurations for cache1 - explicit setting of:
784      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
785      */
786     public void testUpdateWithExplicitReplicationConfig() throws Exception {
787         if (JVMUtil.isSingleRMIRegistryPerVM()) {
788             return;
789         }
790         updateViaCopyTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
791     }
792 
793     /**
794      * Test various cache configurations for cache1 - explicit setting of:
795      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
796      */
797     public void testUpdateWithExplicitReplicationSynchronousConfig() throws Exception {
798         if (JVMUtil.isSingleRMIRegistryPerVM()) {
799             return;
800         }
801         updateViaCopyTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
802     }
803 
804 
805     /**
806      * Test put replicated for cache4 - no properties.
807      * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
808      */
809     public void testUpdateWithEmptyReplicationPropertiesConfig() throws Exception {
810         if (JVMUtil.isSingleRMIRegistryPerVM()) {
811             return;
812         }
813         updateViaCopyTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
814     }
815 
816     /**
817      * Tests put and update through copy initiated from cache1 in a cluster
818      * <p/>
819      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
820      */
821     public void updateViaCopyTest(Cache fromCache, Cache toCache, boolean asynchronous) throws Exception {
822 
823         fromCache.removeAll();
824         toCache.removeAll();
825 
826         Serializable key = new Date();
827         Serializable value = new Date();
828         Element element1 = new Element(key, value);
829 
830         //Put
831         fromCache.put(element1);
832         if (asynchronous) {
833             waitForProgagate();
834         }
835 
836         //Should have been replicated to cache2.
837         Element element2 = toCache.get(key);
838         assertEquals(element1, element2);
839 
840         //Update
841         Element updatedElement1 = new Element(key, new Date());
842 
843         fromCache.put(updatedElement1);
844         if (asynchronous) {
845             waitForProgagate();
846         }
847 
848         //Should have been replicated to cache2.
849         Element receivedUpdatedElement2 = toCache.get(key);
850         assertEquals(updatedElement1, receivedUpdatedElement2);
851 
852     }
853 
854 
855     /**
856      * Tests put and update through invalidation initiated from cache1 in a cluster
857      * <p/>
858      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
859      */
860     public void testUpdateViaInvalidate() throws CacheException, InterruptedException, IOException {
861 
862         if (JVMUtil.isSingleRMIRegistryPerVM()) {
863             return;
864         }
865 
866         cache1 = manager1.getCache("sampleCache2");
867         cache1.removeAll();
868 
869         cache2 = manager2.getCache("sampleCache2");
870         cache2.removeAll();
871 
872         Serializable key = "1";
873         Serializable value = new Date();
874         Element element1 = new Element(key, value);
875 
876         //Put
877         cache1.put(element1);
878         waitForProgagate();
879 
880         //Should have been replicated to cache2.
881         Element element2 = cache2.get(key);
882         assertEquals(element1, element2);
883 
884         //Update
885         cache1.put(element1);
886         waitForProgagate();
887 
888         //Should have been removed in cache2.
889         element2 = cache2.get(key);
890         assertNull(element2);
891 
892     }
893 
894     /**
895      * What happens when two cache instances replicate to each other and a change is initiated
896      */
897     public void testInfiniteNotificationsLoop() throws InterruptedException {
898 
899         if (JVMUtil.isSingleRMIRegistryPerVM()) {
900             return;
901         }
902 
903         Serializable key = "1";
904         Serializable value = new Date();
905         Element element = new Element(key, value);
906 
907         //Put
908         cache1.put(element);
909         waitForProgagate();
910 
911         //Should have been replicated to cache2.
912         Element element2 = cache2.get(key);
913         assertEquals(element, element2);
914 
915         //Remove
916         cache1.remove(key);
917         assertNull(cache1.get(key));
918 
919         //Should have been replicated to cache2.
920         waitForProgagate();
921         element2 = cache2.get(key);
922         assertNull(element2);
923 
924         //Put into 2
925         Element element3 = new Element("3", "ddsfds");
926         cache2.put(element3);
927         waitForProgagate();
928         Element element4 = cache2.get("3");
929         assertEquals(element3, element4);
930 
931     }
932 
933 
934     /**
935      * Need to wait for async
936      *
937      * @throws InterruptedException
938      */
939     protected void waitForProgagate() throws InterruptedException {
940         Thread.sleep(2000);
941     }
942 
943     /**
944      * Need to wait for async
945      *
946      * @throws InterruptedException
947      */
948     protected void waitForSlowProgagate() throws InterruptedException {
949         Thread.sleep(6000);
950     }
951 
952 }