001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.benchmark; 019 020 import javax.jms.Destination; 021 import javax.jms.JMSException; 022 import javax.jms.Message; 023 import javax.jms.MessageConsumer; 024 import javax.jms.MessageListener; 025 import javax.jms.Session; 026 import javax.jms.TextMessage; 027 import javax.jms.Topic; 028 029 /** 030 * @author James Strachan 031 * @version $Revision$ 032 */ 033 public class Consumer extends BenchmarkSupport implements MessageListener { 034 035 public static void main(String[] args) { 036 Consumer tool = new Consumer(); 037 if (args.length > 0) { 038 tool.setUrl(args[0]); 039 } 040 if (args.length > 1) { 041 tool.setTopic(parseBoolean(args[1])); 042 } 043 if (args.length > 2) { 044 tool.setSubject(args[2]); 045 } 046 if (args.length > 3) { 047 tool.setDurable(parseBoolean(args[3])); 048 } 049 if (args.length > 4) { 050 tool.setConnectionCount(Integer.parseInt(args[4])); 051 } 052 053 try { 054 tool.run(); 055 } 056 catch (Exception e) { 057 System.out.println("Caught: " + e); 058 e.printStackTrace(); 059 } 060 } 061 062 public Consumer() { 063 } 064 065 public void run() throws JMSException { 066 start(); 067 subscribe(); 068 } 069 070 protected void subscribe() throws JMSException { 071 for (int i = 0; i < subjects.length; i++) { 072 subscribe(subjects[i]); 073 } 074 } 075 076 protected void subscribe(String subject) throws JMSException { 077 Session session = createSession(); 078 079 Destination destination = createDestination(session, subject); 080 081 System.out.println("Consuming on : " + destination + " of type: " + destination.getClass().getName()); 082 083 MessageConsumer consumer = null; 084 if (isDurable() && isTopic()) { 085 consumer = session.createDurableSubscriber((Topic) destination, getClass().getName()); 086 } 087 else { 088 consumer = session.createConsumer(destination); 089 } 090 consumer.setMessageListener(this); 091 addResource(consumer); 092 } 093 094 public void onMessage(Message message) { 095 try { 096 TextMessage textMessage = (TextMessage) message; 097 098 // lets force the content to be deserialized 099 String text = textMessage.getText(); 100 count(1); 101 102 // lets count the messages 103 104 //message.acknowledge(); 105 } 106 catch (JMSException e) { 107 // TODO Auto-generated catch block 108 e.printStackTrace(); 109 } 110 } 111 112 }