001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.spring;
019    
020    import java.util.ArrayList;
021    import java.util.List;
022    
023    import javax.jms.Message;
024    import javax.jms.MessageListener;
025    
026    /**
027     * A simple consumer which is useful for testing which can be used to wait until the consumer has received
028     * a specific number of messages.
029     *
030     * @author Mike Perham
031     * @version $Revision$
032     */
033    public class TestingConsumer implements MessageListener {
034        private List messages = new ArrayList();
035        private Object semaphore;
036    
037        public TestingConsumer() {
038            this(new Object());
039        }
040    
041        public TestingConsumer(Object semaphore) {
042            this.semaphore = semaphore;
043        }
044    
045        /**
046         * @return all the messages on the list so far, clearing the buffer
047         */
048        public synchronized List flushMessages() {
049            List answer = new ArrayList(messages);
050            messages.clear();
051            return answer;
052        }
053    
054        public synchronized void onMessage(Message message) {
055            messages.add(message);
056            synchronized (semaphore) {
057                semaphore.notifyAll();
058            }
059        }
060    
061        public void waitForMessageToArrive() {
062            waitForMessagesToArrive(1);
063        }
064    
065        public void waitForMessagesToArrive(int messageCount) {
066            System.out.println("Waiting for message to arrive");
067    
068            long start = System.currentTimeMillis();
069    
070            while (System.currentTimeMillis() - start < 10000) {
071                try {
072                    if (hasReceivedMessages(messageCount)) {
073                        break;
074                    }
075                    synchronized (semaphore) {
076                        semaphore.wait(1000);
077                    }
078                }
079                catch (InterruptedException e) {
080                    System.out.println("Caught: " + e);
081                }
082            }
083            long end = System.currentTimeMillis() - start;
084    
085            System.out.println("End of wait for " + end + " millis");
086        }
087    
088        protected boolean hasReceivedMessage() {
089            return messages.isEmpty();
090        }
091    
092        protected synchronized boolean hasReceivedMessages(int messageCount) {
093            return messages.size() >= messageCount;
094        }
095    
096    
097    }