001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import org.apache.commons.logging.Log; 021 import org.apache.commons.logging.LogFactory; 022 import org.activemq.broker.BrokerClient; 023 import org.activemq.filter.FilterFactory; 024 import org.activemq.filter.FilterFactoryImpl; 025 import org.activemq.message.ActiveMQDestination; 026 import org.activemq.message.ActiveMQMessage; 027 import org.activemq.message.ConsumerInfo; 028 import org.activemq.service.Dispatcher; 029 import org.activemq.service.MessageContainer; 030 import org.activemq.service.Subscription; 031 import org.activemq.service.SubscriptionContainer; 032 import org.activemq.service.RedeliveryPolicy; 033 import org.activemq.service.DeadLetterPolicy; 034 import org.activemq.service.TransactionManager; 035 import org.activemq.service.TransactionTask; 036 import org.activemq.store.PersistenceAdapter; 037 038 import javax.jms.DeliveryMode; 039 import javax.jms.JMSException; 040 import java.util.Iterator; 041 import java.util.Set; 042 043 /** 044 * A default implementation of a Broker of Topic messages for transient consumers 045 * 046 * @version $Revision: 1.1.1.1 $ 047 */ 048 public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager { 049 private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class); 050 051 public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) { 052 this(persistenceAdapter, new SubscriptionContainerImpl(new RedeliveryPolicy(), new DeadLetterPolicy()), new FilterFactoryImpl(), new DispatcherImpl()); 053 } 054 055 public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) { 056 super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher); 057 } 058 059 /** 060 * @param client 061 * @param info 062 * @throws javax.jms.JMSException 063 */ 064 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 065 if (info.getDestination().isTopic()) { 066 doAddMessageConsumer(client, info); 067 } 068 } 069 070 071 /** 072 * @param client 073 * @param info 074 * @throws javax.jms.JMSException 075 */ 076 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 077 Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId()); 078 if (sub != null) { 079 sub.setActive(false); 080 dispatcher.removeActiveSubscription(client, sub); 081 subscriptionContainer.removeSubscription(info.getConsumerId()); 082 sub.clear(); 083 } 084 } 085 086 087 /** 088 * @param client 089 * @param message 090 * @throws javax.jms.JMSException 091 */ 092 public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException { 093 final ActiveMQDestination destination = message.getJMSActiveMQDestination(); 094 if (destination == null || !destination.isTopic()) { 095 return; 096 } 097 098 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){ 099 public void execute() throws Throwable { 100 doSendMessage(client, message, destination); 101 } 102 }); 103 104 } 105 106 /** 107 * @param client 108 * @param message 109 * @param destination 110 * @throws JMSException 111 */ 112 private void doSendMessage(BrokerClient client, ActiveMQMessage message, ActiveMQDestination destination) throws JMSException { 113 MessageContainer container = null; 114 if (log.isDebugEnabled()) { 115 log.debug("Dispaching to " + subscriptionContainer + " subscriptions with message: " + message); 116 } 117 Set subscriptions = subscriptionContainer.getSubscriptions(destination); 118 for (Iterator i = subscriptions.iterator(); i.hasNext();) { 119 Subscription sub = (Subscription) i.next(); 120 if (sub.isTarget(message) && (!sub.isDurableTopic() || message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT)) { 121 if (container == null) { 122 container = getContainer(message.getJMSDestination().toString()); 123 container.addMessage(message); 124 } 125 sub.addMessage(container, message); 126 } 127 } 128 updateSendStats(client, message); 129 } 130 131 /** 132 * Delete a durable subscriber 133 * 134 * @param clientId 135 * @param subscriberName 136 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active 137 */ 138 public void deleteSubscription(String clientId, String subscriberName) throws JMSException { 139 } 140 }