001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.activemq.benchmark;
019    
020    import javax.jms.Destination;
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    import javax.jms.MessageConsumer;
024    import javax.jms.MessageListener;
025    import javax.jms.Session;
026    import javax.jms.TextMessage;
027    import javax.jms.Topic;
028    
029    /**
030     * @author James Strachan
031     * @version $Revision$
032     */
033    public class Consumer extends BenchmarkSupport implements MessageListener {
034    
035        public static void main(String[] args) {
036            Consumer tool = new Consumer();
037            if (args.length > 0) {
038                tool.setUrl(args[0]);
039            }
040            if (args.length > 1) {
041                tool.setTopic(parseBoolean(args[1]));
042            }
043            if (args.length > 2) {
044                tool.setSubject(args[2]);
045            }
046            if (args.length > 3) {
047                tool.setDurable(parseBoolean(args[3]));
048            }
049            if (args.length > 4) {
050                tool.setConnectionCount(Integer.parseInt(args[4]));
051            }
052    
053            try {
054                tool.run();
055            }
056            catch (Exception e) {
057                System.out.println("Caught: " + e);
058                e.printStackTrace();
059            }
060        }
061    
062        public Consumer() {
063        }
064    
065        public void run() throws JMSException {
066            start();
067            subscribe();
068        }
069    
070        protected void subscribe() throws JMSException {
071            for (int i = 0; i < subjects.length; i++) {
072                subscribe(subjects[i]);
073            }
074        }
075    
076        protected void subscribe(String subject) throws JMSException {
077            Session session = createSession();
078    
079            Destination destination = createDestination(session, subject);
080    
081            System.out.println("Consuming on : " + destination + " of type: " + destination.getClass().getName());
082    
083            MessageConsumer consumer = null;
084            if (isDurable() && isTopic()) {
085                consumer = session.createDurableSubscriber((Topic) destination, getClass().getName());
086            }
087            else {
088                consumer = session.createConsumer(destination);
089            }
090            consumer.setMessageListener(this);
091            addResource(consumer);
092        }
093    
094        public void onMessage(Message message) {
095            try {
096                TextMessage textMessage = (TextMessage) message;
097    
098                // lets force the content to be deserialized
099                String text = textMessage.getText();
100                count(1);
101                
102                // lets count the messages
103    
104                //message.acknowledge();
105            }
106            catch (JMSException e) {
107                // TODO Auto-generated catch block
108                e.printStackTrace();
109            }
110        }
111    
112    }