001    /** 
002     * 
003     * Copyright 2005 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.advisories;
019    
020    import javax.jms.Connection;
021    import javax.jms.Destination;
022    import javax.jms.JMSException;
023    import javax.jms.Message;
024    import javax.jms.MessageConsumer;
025    import javax.jms.MessageListener;
026    import javax.jms.ObjectMessage;
027    import javax.jms.Session;
028    
029    import org.activemq.message.ActiveMQDestination;
030    import org.activemq.message.ConsumerInfo;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
035    
036    /**
037     * A ProducerDemandAdvisor is used to know when a destination is in demand.
038     * 
039     * Sometimes generating messages to send to a destination is very expensive 
040     * and the application would like to avoid producing messages if there are no 
041     * active consumers for the destination.  There is a "demand" for messages
042     * when a consumer does come active.
043     * 
044     * This object uses Advisory messages to know when consumer go active and 
045     * inactive.
046     */
047    public class ProducerDemandAdvisor {
048        
049        private static final Log log = LogFactory.getLog(ProducerDemandAdvisor.class);
050    
051        private final ActiveMQDestination destination;
052        private Connection connection;
053        private Session session;
054        private SynchronizedBoolean started = new SynchronizedBoolean(false);
055        private int consumerCount;
056        private ProducerDemandListener demandListener;
057        
058        public ProducerDemandAdvisor( Connection connection, final Destination destination ) throws JMSException {
059            this.connection = connection;
060            this.destination = ActiveMQDestination.transformDestination(destination);
061        }
062         
063        /**
064         * @param destination
065         */
066        private void fireDemandEvent() {
067            demandListener.onEvent( new ProducerDemandEvent(destination, isInDemand()));
068        }
069    
070        public boolean isInDemand() {
071            return consumerCount>0;
072        }
073        
074        public ProducerDemandListener getDemandListener() {
075            return demandListener;
076        }
077    
078        synchronized public void setDemandListener(ProducerDemandListener demandListener) {
079            this.demandListener = demandListener;
080            fireDemandEvent();
081        }
082    
083        public void start() throws JMSException {
084            if (started.commit(false, true)) {
085                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
086                MessageConsumer consumer = session.createConsumer(destination.getTopicForConsumerAdvisory());
087                consumer.setMessageListener(new MessageListener(){
088                    public void onMessage(Message msg) {
089                        process(msg);
090                    }
091                });
092            }
093        }
094    
095        public void stop() throws JMSException {
096            if (started.commit(true, false)) {
097                if (session != null) {
098                    session.close();
099                }
100            }
101        }
102        
103        protected void process(Message msg) {
104            if (msg instanceof ObjectMessage) {
105                try {
106                    ConsumerInfo info = (ConsumerInfo) ((ObjectMessage) msg).getObject();
107                    ConsumerAdvisoryEvent event = new ConsumerAdvisoryEvent(info);
108                    
109                    
110                    boolean inDemand = isInDemand();
111    
112                    if ( info.isStarted() ) {
113                        consumerCount++;
114                    } else {
115                        consumerCount--;
116                    }
117                    
118                    // Notify listener if there was a change in demand.
119                    if (inDemand ^ isInDemand() && demandListener != null) {
120                        fireDemandEvent();
121                    }
122                } catch (JMSException e) {
123                    log.error("Failed to process message: " + msg);
124                }
125            }
126        }
127    
128    }