001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.advisories; 020 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Set; 025 import javax.jms.Connection; 026 import javax.jms.Destination; 027 import javax.jms.JMSException; 028 import javax.jms.Message; 029 import javax.jms.MessageConsumer; 030 import javax.jms.MessageListener; 031 import javax.jms.ObjectMessage; 032 import javax.jms.Session; 033 import org.apache.commons.logging.Log; 034 import org.apache.commons.logging.LogFactory; 035 import org.activemq.message.ActiveMQDestination; 036 import org.activemq.message.ProducerInfo; 037 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 038 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 039 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; 040 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 041 /** 042 * A helper class for listening for MessageProducer advisories 043 * 044 * * @version $Revision: 1.1.1.1 $ 045 */ 046 047 public class ProducerAdvisor implements MessageListener { 048 private static final Log log = LogFactory.getLog(ProducerAdvisor.class); 049 private Connection connection; 050 private ActiveMQDestination destination; 051 private Session session; 052 private List listeners = new CopyOnWriteArrayList(); 053 private SynchronizedBoolean started = new SynchronizedBoolean(false); 054 private Map activeProducers = new ConcurrentHashMap(); 055 056 /** 057 * Construct a ProducerAdvisor 058 * @param connection 059 * @param destination the destination to listen for Producer events 060 * @throws JMSException 061 */ 062 public ProducerAdvisor(Connection connection, Destination destination) throws JMSException{ 063 this.connection = connection; 064 this.destination = ActiveMQDestination.transformDestination(destination); 065 } 066 067 /** 068 * start listening for advisories 069 * @throws JMSException 070 * 071 */ 072 public void start() throws JMSException { 073 if (started.commit(false, true)) { 074 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 075 MessageConsumer consumer = session.createConsumer(destination.getTopicForProducerAdvisory()); 076 consumer.setMessageListener(this); 077 } 078 } 079 080 /** 081 * stop listening for advisories 082 * @throws JMSException 083 */ 084 public void stop() throws JMSException{ 085 if (started.commit(true,false)){ 086 if (session != null){ 087 session.close(); 088 } 089 } 090 } 091 092 /** 093 * Add a listener 094 * @param l 095 */ 096 public void addListener(ProducerAdvisoryEventListener l){ 097 listeners.add(l); 098 } 099 100 /** 101 * Remove a listener 102 * @param l 103 */ 104 public void removeListener(ProducerAdvisoryEventListener l){ 105 listeners.remove(l); 106 } 107 108 109 /** 110 * returns true if there is an active producer for the destination 111 * 112 * @param destination 113 * @return true if a producer for the destination 114 */ 115 public boolean isActive(Destination destination) { 116 return activeProducers.containsKey(destination); 117 } 118 119 /** 120 * return a set of active ProducerInfo's for a particular destination 121 * @param destination 122 * @return the set of ProducerInfo objects currently active 123 */ 124 public Set activeProducers(Destination destination) { 125 Set set = (Set) activeProducers.get(destination); 126 return set != null ? set : new CopyOnWriteArraySet(); 127 } 128 129 130 131 132 /** 133 * OnMessage() implementation 134 * @param msg 135 */ 136 public void onMessage(Message msg){ 137 if (msg instanceof ObjectMessage){ 138 try { 139 ProducerInfo info = (ProducerInfo)((ObjectMessage)msg).getObject(); 140 updateActiveProducers(info); 141 ProducerAdvisoryEvent event = new ProducerAdvisoryEvent(info); 142 fireEvent(event); 143 } 144 catch (JMSException e) { 145 log.error("Failed to process message: " + msg); 146 } 147 } 148 } 149 150 private void fireEvent(ProducerAdvisoryEvent event){ 151 for (Iterator i = listeners.iterator(); i.hasNext(); ){ 152 ProducerAdvisoryEventListener l = (ProducerAdvisoryEventListener)i.next(); 153 l.onEvent(event); 154 } 155 } 156 157 private void updateActiveProducers(ProducerInfo info) { 158 Set set = (Set) activeProducers.get(info.getDestination()); 159 if (info.isStarted()) { 160 if (set == null) { 161 set = new CopyOnWriteArraySet(); 162 activeProducers.put(info.getDestination(), set); 163 } 164 set.add(info); 165 } 166 else { 167 if (set != null) { 168 set.remove(info); 169 if (set.isEmpty()) { 170 activeProducers.remove(set); 171 } 172 } 173 } 174 } 175 176 177 }