001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.advisories;
020    import java.util.HashMap;
021    import java.util.HashSet;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.Set;
026    
027    import javax.jms.Connection;
028    import javax.jms.Destination;
029    import javax.jms.JMSException;
030    import javax.jms.Message;
031    import javax.jms.MessageConsumer;
032    import javax.jms.MessageListener;
033    import javax.jms.ObjectMessage;
034    import javax.jms.Session;
035    
036    import org.activemq.message.ActiveMQDestination;
037    import org.activemq.message.ActiveMQTopic;
038    import org.activemq.message.ConnectionInfo;
039    import org.apache.commons.logging.Log;
040    import org.apache.commons.logging.LogFactory;
041    
042    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
043    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
044    
045    /**
046     * A helper class for listening for MessageConnection advisories *
047     * 
048     * @version $Revision: 1.1.1.1 $
049     */
050    public class ConnectionAdvisor implements MessageListener {
051        private static final Log log = LogFactory.getLog(ConnectionAdvisor.class);
052        private Connection connection;
053        private Session session;
054        private List listeners = new CopyOnWriteArrayList();
055        private Map activeConnections = new HashMap();
056        private SynchronizedBoolean started = new SynchronizedBoolean(false);
057        private Object lock = new Object();
058    
059        /**
060         * Construct a ConnectionAdvisor
061         * 
062         * @param connection
063         * @throws JMSException
064         */
065        public ConnectionAdvisor(Connection connection) throws JMSException {
066            this.connection = connection;
067        }
068    
069        /**
070         * start listening for advisories
071         * 
072         * @throws JMSException
073         */
074        public void start() throws JMSException {
075            if (started.commit(false, true)) {
076                
077                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
078                
079                String advisoryName = ActiveMQDestination.CONNECTION_ADVISORY_PREFIX;
080                Destination advisoryDestination = new ActiveMQTopic(advisoryName);
081                MessageConsumer consumer = session.createConsumer(advisoryDestination);
082                consumer.setMessageListener(this);
083            }
084        }
085    
086        /**
087         * stop listening for advisories
088         * 
089         * @throws JMSException
090         */
091        public void stop() throws JMSException {
092            if (started.commit(true, false)) {
093                if (session != null) {
094                    session.close();
095                }
096                synchronized (lock) {
097                    lock.notifyAll();
098                }
099            }
100        }
101    
102        /**
103         * Add a listener
104         * 
105         * @param l
106         */
107        public void addListener(ConnectionAdvisoryEventListener l) {
108            listeners.add(l);
109        }
110    
111        /**
112         * Remove a listener
113         * 
114         * @param l
115         */
116        public void removeListener(ConnectionAdvisoryEventListener l) {
117            listeners.remove(l);
118        }
119    
120        /**
121         * returns true if the connection is active
122         * 
123         * @param clientId for the connection
124         * @return true if the connection is active
125         */
126        public boolean isActive(String clientId) {
127            return activeConnections.containsKey(clientId);
128        }
129    
130        /**
131         * Retrive all current Connections
132         * 
133         * @return
134         */
135        public Set getConnections() {
136            Set set = new HashSet();
137            set.addAll(activeConnections.values());
138            return set;
139        }
140    
141        /**
142         * Waits until the number of active connections is equivalent to the number supplied, or the timeout is exceeded
143         * 
144         * @param number
145         * @param timeout
146         * @return the number of activeConnections
147         */
148        public int waitForActiveConnections(int number, long timeout) {
149            int result = 0;
150            // if timeInMillis is less than zero assume nowait
151            long waitTime = timeout;
152            long start = (timeout <= 0) ? 0 : System.currentTimeMillis();
153            synchronized (lock) {
154                while (started.get()) {
155                    result = numberOfActiveConnections();
156                    if (result == number || waitTime <= 0) {
157                        break;
158                    }
159                    else {
160                        try {
161                            lock.wait(waitTime);
162                        }
163                        catch (Throwable e) {
164                            log.debug("Interrupted", e);
165                            e.printStackTrace();
166                        }
167                        waitTime = timeout - (System.currentTimeMillis() - start);
168                    }
169                }
170            }
171            return result;
172        }
173    
174        /**
175         * return the current number of active connections
176         * 
177         * @return
178         */
179        public int numberOfActiveConnections() {
180            return activeConnections.size();
181        }
182    
183        /**
184         * OnMessage() implementation
185         * 
186         * @param msg
187         */
188        public void onMessage(Message msg) {
189            if (msg instanceof ObjectMessage) {
190                try {
191                    ConnectionInfo info = (ConnectionInfo) ((ObjectMessage) msg).getObject();
192                    
193                    ConnectionAdvisoryEvent event = new ConnectionAdvisoryEvent(info);
194                    if (!event.getInfo().isClosed()) {
195                        activeConnections.put(event.getInfo().getClientId(), event.getInfo());
196                    }
197                    else {
198                        activeConnections.remove(event.getInfo().getClientId());
199                    }
200                    synchronized (lock) {
201                        lock.notify();
202                    }
203                    fireEvent(event);
204                }
205                catch (Throwable e) {
206                    log.error("Failed to process message: " + msg);
207                }
208            }
209        }
210    
211        private void fireEvent(ConnectionAdvisoryEvent event) {
212            for (Iterator i = listeners.iterator();i.hasNext();) {
213                ConnectionAdvisoryEventListener l = (ConnectionAdvisoryEventListener) i.next();
214                l.onEvent(event);
215            }
216        }
217    }