001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.service.boundedvm; 020 021 import java.util.Collections; 022 import java.util.Iterator; 023 import java.util.Map; 024 import java.util.Set; 025 import java.util.HashMap; 026 import javax.jms.JMSException; 027 import org.activemq.broker.BrokerClient; 028 import org.activemq.filter.AndFilter; 029 import org.activemq.filter.DestinationMap; 030 import org.activemq.filter.Filter; 031 import org.activemq.filter.FilterFactory; 032 import org.activemq.filter.FilterFactoryImpl; 033 import org.activemq.filter.NoLocalFilter; 034 import org.activemq.io.util.MemoryBoundedQueue; 035 import org.activemq.io.util.MemoryBoundedQueueManager; 036 import org.activemq.message.ActiveMQDestination; 037 import org.activemq.message.ActiveMQMessage; 038 import org.activemq.message.ConsumerInfo; 039 import org.activemq.message.MessageAck; 040 import org.activemq.service.DeadLetterPolicy; 041 import org.activemq.service.MessageContainer; 042 import org.activemq.service.MessageContainerManager; 043 import org.activemq.service.TransactionManager; 044 import org.activemq.service.TransactionTask; 045 import org.activemq.service.impl.AutoCommitTransaction; 046 047 import org.apache.commons.logging.Log; 048 import org.apache.commons.logging.LogFactory; 049 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 050 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 051 052 /** 053 * A MessageContainerManager for transient topics 054 * 055 * @version $Revision: 1.1.1.1 $ 056 */ 057 058 /** 059 * A manager of MessageContainer instances 060 */ 061 public class TransientTopicBoundedMessageManager implements MessageContainerManager { 062 private static final Log log = LogFactory.getLog(TransientTopicBoundedMessageManager.class); 063 private MemoryBoundedQueueManager queueManager; 064 private ConcurrentHashMap containers; 065 private ConcurrentHashMap subscriptions; 066 private DestinationMap destinationMap; 067 private FilterFactory filterFactory; 068 private SynchronizedBoolean started; 069 private Map destinations; 070 private DeadLetterPolicy deadLetterPolicy; 071 private boolean decoupledDispatch = false; 072 073 /** 074 * Constructor for TransientTopicBoundedMessageManager 075 * 076 * @param mgr 077 */ 078 public TransientTopicBoundedMessageManager(MemoryBoundedQueueManager mgr) { 079 this.queueManager = mgr; 080 this.containers = new ConcurrentHashMap(); 081 this.subscriptions = new ConcurrentHashMap(); 082 this.destinationMap = new DestinationMap(); 083 this.destinations = new ConcurrentHashMap(); 084 this.filterFactory = new FilterFactoryImpl(); 085 this.started = new SynchronizedBoolean(false); 086 } 087 088 /** 089 * start the manager 090 * 091 * @throws JMSException 092 */ 093 public void start() throws JMSException { 094 if (started.commit(false, true)) { 095 for (Iterator i = containers.values().iterator(); i.hasNext();) { 096 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next(); 097 container.start(); 098 } 099 } 100 } 101 102 /** 103 * stop the manager 104 * 105 * @throws JMSException 106 */ 107 public void stop() throws JMSException { 108 if (started.commit(true, false)) { 109 for (Iterator i = containers.values().iterator(); i.hasNext();) { 110 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next(); 111 container.stop(); 112 } 113 } 114 } 115 116 /** 117 * Add a consumer if appropiate 118 * 119 * @param client 120 * @param info 121 * @throws JMSException 122 */ 123 public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 124 ActiveMQDestination destination = info.getDestination(); 125 if (destination.isTopic()) { 126 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers 127 .get(client); 128 if (container == null) { 129 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString()); 130 container = new TransientTopicBoundedMessageContainer(this, client, queue); 131 containers.put(client, container); 132 if (started.get()) { 133 container.start(); 134 } 135 } 136 if (log.isDebugEnabled()) { 137 log.debug("Adding consumer: " + info); 138 } 139 140 TransientTopicSubscription ts = container.addConsumer(createFilter(info), info); 141 if (ts != null) { 142 subscriptions.put(info.getConsumerId(), ts); 143 } 144 145 destinationMap.put(destination,container); 146 String name = destination.getPhysicalName(); 147 //As the destinations are used for generating 148 //subscriptions for NetworkConnectors etc., 149 //we should not generate duplicates by adding in 150 //durable topic subscribers 151 if (!info.isDurableTopic() && !destinations.containsKey(name)) { 152 destinations.put(name, destination); 153 } 154 } 155 } 156 157 /** 158 * @param client 159 * @param info 160 * @throws JMSException 161 */ 162 public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 163 ActiveMQDestination destination = info.getDestination(); 164 if (destination.isTopic()) { 165 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers 166 .get(client); 167 if (container != null) { 168 container.removeConsumer(info); 169 if (container.isInactive()) { 170 containers.remove(client); 171 container.close(); 172 destinationMap.remove(destination, container); 173 } 174 175 // lets check if we've no more consumers for this destination 176 //As the destinations are used for generating 177 //subscriptions for NetworkConnectors etc., 178 //we should not count durable topic subscribers 179 if (!info.isDurableTopic() && !hasConsumerFor(destination)) { 180 destinations.remove(destination.getPhysicalName()); 181 } 182 } 183 subscriptions.remove(info.getConsumerId()); 184 } 185 } 186 187 /** 188 * Delete a durable subscriber 189 * 190 * @param clientId 191 * @param subscriberName 192 * @throws JMSException if the subscriber doesn't exist or is still active 193 */ 194 public void deleteSubscription(String clientId, String subscriberName) throws JMSException { 195 } 196 197 /** 198 * @param client 199 * @param message 200 * @throws JMSException 201 */ 202 public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException { 203 if (TransactionManager.getContexTransaction()==AutoCommitTransaction.AUTO_COMMIT_TRANSACTION){ 204 doSendMessage(client, message); 205 }else { 206 // If there is no transaction.. then this executes directly. 207 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() { 208 public void execute() throws Throwable { 209 doSendMessage(client, message); 210 } 211 }); 212 } 213 } 214 215 /** 216 * @param client 217 * @param message 218 * @throws JMSException 219 */ 220 private void doSendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException { 221 Set set = destinationMap.get(message.getJMSActiveMQDestination()); 222 if (!set.isEmpty()){ 223 for (Iterator i = set.iterator(); i.hasNext();) { 224 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next(); 225 container.targetAndDispatch(client,message); 226 } 227 } 228 } 229 230 /** 231 * @param client 232 * @param ack 233 * @throws JMSException 234 * 235 */ 236 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException { 237 } 238 239 /** 240 * @throws JMSException 241 * 242 */ 243 244 public void poll() throws JMSException { 245 } 246 247 /** 248 * For Transient topics - a MessageContainer maps on to the messages 249 * to be dispatched through a BrokerClient, not a destination 250 * @param physicalName 251 * @return the MessageContainer used for dispatching - always returns null 252 * @throws JMSException 253 */ 254 public MessageContainer getContainer(String physicalName) throws JMSException { 255 return null; 256 } 257 258 /** 259 * @return a map of all the destinations 260 */ 261 public Map getDestinations() { 262 return Collections.unmodifiableMap(destinations); 263 } 264 265 /** 266 * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination} 267 * objects used by non-broker consumers directly connected to this container 268 * 269 * @return 270 */ 271 public Map getLocalDestinations() { 272 Map localDestinations = new HashMap(); 273 for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { 274 TransientTopicSubscription sub = (TransientTopicSubscription) iter.next(); 275 if (sub.isLocalSubscription() && !sub.isDurableTopic()) { 276 final ActiveMQDestination dest = sub.getDestination(); 277 localDestinations.put(dest.getPhysicalName(), dest); 278 } 279 } 280 return Collections.unmodifiableMap(localDestinations); 281 } 282 283 /** 284 * @return the DeadLetterPolicy for this Container Manager 285 */ 286 public DeadLetterPolicy getDeadLetterPolicy(){ 287 return deadLetterPolicy; 288 } 289 290 /** 291 * Set the DeadLetterPolicy for this Container Manager 292 * @param policy 293 */ 294 public void setDeadLetterPolicy(DeadLetterPolicy policy){ 295 this.deadLetterPolicy = policy; 296 } 297 298 /** 299 * @return Returns the decoupledDispatch. 300 */ 301 public boolean isDecoupledDispatch() { 302 return decoupledDispatch; 303 } 304 /** 305 * @param decoupledDispatch The decoupledDispatch to set. 306 */ 307 public void setDecoupledDispatch(boolean decoupledDispatch) { 308 this.decoupledDispatch = decoupledDispatch; 309 } 310 /** 311 * Create filter for a Consumer 312 * 313 * @param info 314 * @return the Fitler 315 * @throws javax.jms.JMSException 316 */ 317 protected Filter createFilter(ConsumerInfo info) throws JMSException { 318 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector()); 319 if (info.isNoLocal()) { 320 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId())); 321 } 322 return filter; 323 } 324 325 protected boolean hasConsumerFor(ActiveMQDestination destination) { 326 for (Iterator i = containers.values().iterator(); i.hasNext();) { 327 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next(); 328 if (container.hasConsumerFor(destination)) { 329 return true; 330 } 331 } 332 return false; 333 } 334 335 /** 336 * @see org.activemq.service.MessageContainerManager#createMessageContainer(org.activemq.message.ActiveMQDestination) 337 */ 338 public void createMessageContainer(ActiveMQDestination dest) throws JMSException { 339 } 340 341 /** 342 * @see org.activemq.service.MessageContainerManager#destroyMessageContainer(org.activemq.message.ActiveMQDestination) 343 */ 344 public void destroyMessageContainer(ActiveMQDestination dest) throws JMSException { 345 containers.remove(dest); 346 destinationMap.removeAll(dest); 347 } 348 349 /** 350 * @see org.activemq.service.MessageContainerManager#getMessageContainerAdmins() 351 */ 352 public Map getMessageContainerAdmins() throws JMSException { 353 return Collections.EMPTY_MAP; 354 } 355 356 }