001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.service.boundedvm; 020 import java.util.ArrayList; 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.Set; 024 025 import javax.jms.JMSException; 026 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.activemq.broker.BrokerClient; 030 import org.activemq.filter.DestinationMap; 031 import org.activemq.filter.Filter; 032 import org.activemq.io.util.MemoryBoundedQueue; 033 import org.activemq.message.ActiveMQDestination; 034 import org.activemq.message.ActiveMQMessage; 035 import org.activemq.message.ConsumerInfo; 036 import org.activemq.message.MessageAck; 037 import org.activemq.service.MessageContainer; 038 import org.activemq.service.MessageContainerAdmin; 039 import org.activemq.service.MessageIdentity; 040 import org.activemq.service.Service; 041 042 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 043 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 044 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 045 046 /** 047 * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic 048 * messages 049 * 050 * @version $Revision: 1.1.1.1 $ 051 */ 052 public class TransientTopicBoundedMessageContainer 053 implements 054 MessageContainer, 055 Service, 056 Runnable, 057 MessageContainerAdmin { 058 private SynchronizedBoolean started; 059 private TransientTopicBoundedMessageManager manager; 060 private BrokerClient client; 061 private MemoryBoundedQueue queue; 062 private Thread worker; 063 private CopyOnWriteArrayList subscriptions; 064 private DestinationMap accel; 065 private ConcurrentHashMap subMap; 066 private Log log; 067 068 /** 069 * Construct this beast 070 * 071 * @param manager 072 * @param client 073 * @param queue 074 */ 075 public TransientTopicBoundedMessageContainer(TransientTopicBoundedMessageManager manager, BrokerClient client, 076 MemoryBoundedQueue queue) { 077 this.manager = manager; 078 this.client = client; 079 this.queue = queue; 080 this.started = new SynchronizedBoolean(false); 081 this.subscriptions = new CopyOnWriteArrayList(); 082 this.accel = new DestinationMap(); 083 this.subMap = new ConcurrentHashMap(100,0.25f); 084 this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer:- " + client); 085 } 086 087 /** 088 * @return true if this Container has no active subscriptions 089 */ 090 public boolean isInactive() { 091 return subscriptions.isEmpty(); 092 } 093 094 /** 095 * @return the BrokerClient this Container is dispatching to 096 */ 097 public BrokerClient getBrokerClient() { 098 return client; 099 } 100 101 /** 102 * Add a consumer to dispatch messages to 103 * 104 * @param filter 105 * @param info 106 */ 107 public TransientTopicSubscription addConsumer(Filter filter, ConsumerInfo info) { 108 TransientTopicSubscription ts = findMatch(info); 109 if (ts == null) { 110 ts = new TransientTopicSubscription(filter, info, client); 111 subscriptions.add(ts); 112 accel.put(info.getDestination(),ts); 113 subMap.put(info,ts); 114 } 115 return ts; 116 } 117 118 /** 119 * Remove a consumer 120 * 121 * @param info 122 */ 123 public void removeConsumer(ConsumerInfo info) { 124 TransientTopicSubscription ts = findMatch(info); 125 if (ts != null) { 126 subscriptions.remove(ts); 127 accel.remove(info.getDestination(),ts); 128 subMap.remove(info); 129 } 130 } 131 132 /** 133 * start working 134 */ 135 public void start() { 136 if (started.commit(false, true)) { 137 if (manager.isDecoupledDispatch()) { 138 worker = new Thread(this, "TransientTopicDispatcher"); 139 worker.setPriority(Thread.NORM_PRIORITY + 2); 140 worker.start(); 141 } 142 } 143 } 144 145 /** 146 * See if this container should get this message and dispatch it 147 * 148 * @param sender the BrokerClient the message came from 149 * @param message 150 * @return true if it is a valid container 151 * @throws JMSException 152 */ 153 public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException { 154 boolean result = false; 155 if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) { 156 List tmpList = null; 157 158 Set set = accel.get(message.getJMSActiveMQDestination()); 159 if (!set.isEmpty()) { 160 for (Iterator i = set.iterator(); i.hasNext();) { 161 TransientTopicSubscription ts = (TransientTopicSubscription) i.next(); 162 if (ts.isTarget(message)) { 163 if (tmpList == null) { 164 tmpList = new ArrayList(); 165 } 166 tmpList.add(ts); 167 } 168 } 169 } 170 dispatchToQueue(message, tmpList); 171 result = tmpList != null; 172 } 173 return result; 174 } 175 176 /** 177 * stop working 178 */ 179 public void stop() { 180 started.set(false); 181 queue.clear(); 182 } 183 184 /** 185 * close down this container 186 */ 187 public void close() { 188 if (started.get()) { 189 stop(); 190 } 191 queue.close(); 192 } 193 194 195 /** 196 * do some dispatching 197 */ 198 public void run() { 199 int count = 0; 200 ActiveMQMessage message = null; 201 while (started.get()) { 202 try { 203 message = (ActiveMQMessage) queue.dequeue(2000); 204 if (message != null) { 205 if (!message.isExpired()) { 206 client.dispatch(message); 207 if (++count == 250) { 208 count = 0; 209 Thread.yield(); 210 } 211 }else { 212 if (log.isDebugEnabled()){ 213 log.debug("Message: " + message + " has expired"); 214 } 215 } 216 } 217 } 218 catch (Exception e) { 219 stop(); 220 log.warn("stop dispatching", e); 221 } 222 } 223 } 224 225 private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException { 226 if (list != null && !list.isEmpty()) { 227 int[] ids = new int[list.size()]; 228 for (int i = 0;i < list.size();i++) { 229 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i); 230 ids[i] = ts.getConsumerInfo().getConsumerNo(); 231 } 232 message = message.shallowCopy(); 233 message.setConsumerNos(ids); 234 if (manager.isDecoupledDispatch()) { 235 queue.enqueue(message); 236 } 237 else { 238 client.dispatch(message); 239 } 240 } 241 } 242 243 private TransientTopicSubscription findMatch(ConsumerInfo info) { 244 return (TransientTopicSubscription) subMap.get(info); 245 } 246 247 /** 248 * @param destination 249 * @return true if a 250 */ 251 public boolean hasConsumerFor(ActiveMQDestination destination) { 252 for (Iterator i = subscriptions.iterator();i.hasNext();) { 253 TransientTopicSubscription ts = (TransientTopicSubscription) i.next(); 254 ConsumerInfo info = ts.getConsumerInfo(); 255 if (info.getDestination().matches(destination)) { 256 return true; 257 } 258 } 259 return false; 260 } 261 262 /** 263 * @return the destination name 264 */ 265 public String getDestinationName() { 266 return ""; 267 } 268 269 /** 270 * @param msg 271 * @return @throws JMSException 272 */ 273 public void addMessage(ActiveMQMessage msg) throws JMSException { 274 } 275 276 /** 277 * @param messageIdentity 278 * @param ack 279 * @throws JMSException 280 */ 281 public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException { 282 } 283 284 /** 285 * @param messageIdentity 286 * @return @throws JMSException 287 */ 288 public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException { 289 return null; 290 } 291 292 /** 293 * @param messageIdentity 294 * @throws JMSException 295 */ 296 public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException { 297 } 298 299 /** 300 * @param messageIdentity 301 * @param ack 302 * @throws JMSException 303 */ 304 public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException { 305 } 306 307 /** 308 * @param messageIdentity 309 * @return @throws JMSException 310 */ 311 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException { 312 return false; 313 } 314 315 /** 316 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin() 317 */ 318 public MessageContainerAdmin getMessageContainerAdmin() { 319 return this; 320 } 321 322 /** 323 * @see org.activemq.service.MessageContainerAdmin#empty() 324 */ 325 public void empty() throws JMSException { 326 // TODO implement me 327 } 328 329 /** 330 * @see org.activemq.service.MessageContainer#isDeadLetterQueue() 331 */ 332 public boolean isDeadLetterQueue() { 333 return false; 334 } 335 }