001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.tool;
019    
020    import javax.jms.Connection;
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    import javax.jms.MessageConsumer;
024    import javax.jms.MessageListener;
025    import javax.jms.Session;
026    import javax.jms.TextMessage;
027    import javax.jms.Topic;
028    import java.io.IOException;
029    
030    /**
031     * A simple tool for consuming messages
032     *
033     * @version $Revision$
034     */
035    public class ConsumerTool extends ToolSupport implements MessageListener {
036    
037        protected int count = 0;
038        protected int dumpCount = 10;
039        protected boolean verbose = true;
040        protected int maxiumMessages = 0;
041        private boolean pauseBeforeShutdown;
042    
043    
044        public static void main(String[] args) {
045            ConsumerTool tool = new ConsumerTool();
046            if (args.length > 0) {
047                tool.url = args[0];
048            }
049            if (args.length > 1) {
050                tool.topic = args[1].equalsIgnoreCase("true");
051            }
052            if (args.length > 2) {
053                tool.subject = args[2];
054            }
055            if (args.length > 3) {
056                tool.durable = args[3].equalsIgnoreCase("true");
057            }
058            if (args.length > 4) {
059                tool.maxiumMessages = Integer.parseInt(args[4]);
060            }
061            tool.run();
062        }
063    
064        public void run() {
065            try {
066                System.out.println("Connecting to URL: " + url);
067                System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
068                System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
069    
070                Connection connection = createConnection();
071                Session session = createSession(connection);
072                MessageConsumer consumer = null;
073                if (durable && topic) {
074                    consumer = session.createDurableSubscriber((Topic) destination, consumerName);
075                }
076                else {
077                    consumer = session.createConsumer(destination);
078                }
079                if (maxiumMessages <= 0) {
080                    consumer.setMessageListener(this);
081                }
082                connection.start();
083    
084                if (maxiumMessages > 0) {
085                    consumeMessagesAndClose(connection, session, consumer);
086                }
087            }
088            catch (Exception e) {
089                System.out.println("Caught: " + e);
090                e.printStackTrace();
091            }
092        }
093    
094        public void onMessage(Message message) {
095            try {
096                if (message instanceof TextMessage) {
097                    TextMessage txtMsg = (TextMessage) message;
098                    if (verbose) {
099                            
100                            String msg = txtMsg.getText();
101                            if( msg.length() > 50 )
102                                    msg = msg.substring(0, 50)+"...";
103                            
104                        System.out.println("Received: " + msg);
105                    }
106                }
107                else {
108                    if (verbose) {
109                        System.out.println("Received: " + message);
110                    }
111                }
112                /*
113                if (++count % dumpCount == 0) {
114                    dumpStats(connection);
115                }
116                */
117            }
118            catch (JMSException e) {
119                System.out.println("Caught: " + e);
120                e.printStackTrace();
121            }
122        }
123    
124    
125        protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
126            System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
127    
128            for (int i = 0; i < maxiumMessages; i++) {
129                Message message = consumer.receive();
130                onMessage(message);
131            }
132            System.out.println("Closing connection");
133            consumer.close();
134            session.close();
135            connection.close();
136            if (pauseBeforeShutdown) {
137                System.out.println("Press return to shut down");
138                System.in.read();
139            }
140        }
141    }