001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.activemq.benchmark;
019    
020    import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
021    import org.activemq.ActiveMQConnectionFactory;
022    import org.activemq.util.IdGenerator;
023    
024    import javax.jms.Connection;
025    import javax.jms.Destination;
026    import javax.jms.JMSException;
027    import javax.jms.Session;
028    import java.text.NumberFormat;
029    import java.util.ArrayList;
030    import java.util.List;
031    
032    /**
033     * Abstract base class for some simple benchmark tools
034     *
035     * @author James Strachan
036     * @version $Revision$
037     */
038    public class BenchmarkSupport {
039    
040        protected int connectionCount = 1;
041        protected int batch = 1000;
042        protected Destination destination;
043        protected boolean embeddedBroker = false;
044        private boolean topic = true;
045        private boolean durable = false;
046    
047        private ActiveMQConnectionFactory factory;
048        private String url;
049        protected String[] subjects;
050        private long time = System.currentTimeMillis();
051        private int counter;
052        private List resources = new ArrayList();
053        private NumberFormat formatter = NumberFormat.getInstance();
054        private SynchronizedInt connectionCounter = new SynchronizedInt(0);
055        private IdGenerator idGenerator = new IdGenerator();
056    
057        public BenchmarkSupport() {
058        }
059    
060        public void start() {
061            System.out.println("Using: " + connectionCount + " connection(s)");
062            subjects = new String[connectionCount];
063            for (int i = 0; i < connectionCount; i++) {
064                subjects[i] = "BENCHMARK.FEED" + i;
065            }
066            if (useTimerLoop()) {
067                Thread timer = new Thread() {
068                    public void run() {
069                        timerLoop();
070                    }
071                };
072                timer.start();
073            }
074        }
075    
076        public String getUrl() {
077            return url;
078        }
079    
080        public void setUrl(String url) {
081            this.url = url;
082        }
083    
084        public boolean isTopic() {
085            return topic;
086        }
087    
088        public void setTopic(boolean topic) {
089            this.topic = topic;
090        }
091    
092        public ActiveMQConnectionFactory getFactory() {
093            return factory;
094        }
095    
096        public void setFactory(ActiveMQConnectionFactory factory) {
097            this.factory = factory;
098        }
099    
100        public void setSubject(String subject) {
101            connectionCount = 1;
102            subjects = new String[]{subject};
103        }
104    
105        public boolean isDurable() {
106            return durable;
107        }
108    
109        public void setDurable(boolean durable) {
110            this.durable = durable;
111        }
112    
113        public boolean isEmbeddedBroker() {
114            return embeddedBroker;
115        }
116    
117        public void setEmbeddedBroker(boolean embeddedBroker) {
118            this.embeddedBroker = embeddedBroker;
119        }
120    
121        public int getConnectionCount() {
122            return connectionCount;
123        }
124    
125        public void setConnectionCount(int connectionCount) {
126            this.connectionCount = connectionCount;
127        }
128    
129        protected Session createSession() throws JMSException {
130            if (factory == null) {
131                factory = createFactory();
132            }
133            Connection connection = factory.createConnection();
134            int value = connectionCounter.increment();
135            System.out.println("Created connection: " + value + " = " + connection);
136            if (durable) {
137                connection.setClientID(idGenerator.generateId());
138            }
139            addResource(connection);
140            connection.start();
141    
142            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
143            addResource(session);
144            return session;
145        }
146    
147        protected ActiveMQConnectionFactory createFactory() {
148            ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(getUrl());
149            if (embeddedBroker) {
150                answer.setUseEmbeddedBroker(true);
151            }
152            return answer;
153        }
154    
155        protected synchronized void count(int count) {
156            counter += count;
157            /*
158            if (counter > batch) {
159                counter = 0;
160                long current = System.currentTimeMillis();
161                double end = current - time;
162                end /= 1000;
163                time = current;
164    
165                System.out.println("Processed " + batch + " messages in " + end + " (secs)");
166            }
167            */
168        }
169    
170        protected synchronized int resetCount() {
171            int answer = counter;
172            counter = 0;
173            return answer;
174        }
175    
176    
177        protected void timerLoop() {
178            int times = 0;
179            int total = 0;
180            int dumpVmStatsFrequency = 10;
181            Runtime runtime = Runtime.getRuntime();
182    
183            while (true) {
184                try {
185                    Thread.sleep(1000);
186                }
187                catch (InterruptedException e) {
188                    e.printStackTrace();
189                }
190                int processed = resetCount();
191                double average = 0;
192                if (processed > 0) {
193                    total += processed;
194                    times++;
195                }
196                if (times > 0) {
197                    average = total / times;
198                }
199    
200                long oldtime = time;
201                time = System.currentTimeMillis();
202    
203                double diff = time - oldtime;
204    
205                System.out.println(getClass().getName() + " Processed: " + processed + " messages this second. Average: " + average);
206    
207                if ((times % dumpVmStatsFrequency) == 0 && times != 0) {
208                    System.out.println("Used memory: " + asMemoryString(runtime.totalMemory() - runtime.freeMemory())
209                            + " Free memory: " + asMemoryString(runtime.freeMemory())
210                            + " Total memory: " + asMemoryString(runtime.totalMemory())
211                            + " Max memory: " + asMemoryString(runtime.maxMemory()));
212                }
213    
214            }
215        }
216    
217        protected String asMemoryString(long value) {
218            return formatter.format(value / 1024) + " K";
219        }
220    
221        protected boolean useTimerLoop() {
222            return true;
223        }
224    
225        protected Destination createDestination(Session session, String subject) throws JMSException {
226            if (topic) {
227                return session.createTopic(subject);
228            }
229            else {
230                return session.createQueue(subject);
231            }
232        }
233    
234        protected void addResource(Object resource) {
235            resources.add(resource);
236        }
237    
238        protected static boolean parseBoolean(String text) {
239            return text.equalsIgnoreCase("true");
240        }
241    }