001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.service.boundedvm; 020 import java.util.HashMap; 021 import java.util.List; 022 import java.util.Map; 023 024 import javax.jms.JMSException; 025 026 import org.activemq.broker.BrokerClient; 027 import org.activemq.filter.Filter; 028 import org.activemq.io.util.MemoryBoundedQueue; 029 import org.activemq.io.util.MemoryBoundedQueueManager; 030 import org.activemq.io.util.MemoryManageable; 031 import org.activemq.message.ActiveMQDestination; 032 import org.activemq.message.ActiveMQMessage; 033 import org.activemq.message.ConsumerInfo; 034 import org.activemq.message.MessageAck; 035 import org.activemq.service.DeadLetterPolicy; 036 import org.activemq.service.MessageContainer; 037 import org.activemq.service.MessageContainerAdmin; 038 import org.activemq.service.MessageIdentity; 039 import org.activemq.service.QueueListEntry; 040 import org.activemq.service.RedeliveryPolicy; 041 import org.activemq.service.Service; 042 import org.activemq.service.impl.DefaultQueueList; 043 import org.apache.commons.logging.Log; 044 import org.apache.commons.logging.LogFactory; 045 046 import EDU.oswego.cs.dl.util.concurrent.Executor; 047 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 048 049 /** 050 * A MessageContainer for transient queues 051 * 052 * @version $Revision: 1.1.1.1 $ 053 */ 054 public class TransientQueueBoundedMessageContainer implements MessageContainer, Service, Runnable, MessageContainerAdmin { 055 private MemoryBoundedQueueManager queueManager; 056 private ActiveMQDestination destination; 057 private SynchronizedBoolean started; 058 private SynchronizedBoolean running; 059 private MemoryBoundedQueue queue; 060 private DefaultQueueList subscriptions; 061 private Executor threadPool; 062 private Log log; 063 private long idleTimestamp; //length of time (ms) there have been no active subscribers 064 private DeadLetterPolicy deadLetterPolicy; 065 private final Object dispatchMutex = new Object(); 066 private final Object subscriptionMutex = new Object(); 067 068 /** 069 * Construct this beast 070 * 071 * @param threadPool 072 * @param queueManager 073 * @param destination 074 * @param redeliveryPolicy 075 * @param deadLetterPolicy 076 */ 077 public TransientQueueBoundedMessageContainer(Executor threadPool, MemoryBoundedQueueManager queueManager, 078 ActiveMQDestination destination,RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) { 079 this.threadPool = threadPool; 080 this.queueManager = queueManager; 081 this.destination = destination; 082 this.deadLetterPolicy = deadLetterPolicy; 083 this.queue = queueManager.getMemoryBoundedQueue("TRANSIENT_QUEUE:-" + destination.getPhysicalName()); 084 this.started = new SynchronizedBoolean(false); 085 this.running = new SynchronizedBoolean(false); 086 this.subscriptions = new DefaultQueueList(); 087 this.log = LogFactory.getLog("TransientQueueBoundedMessageContainer:- " + destination); 088 } 089 090 091 /** 092 * @return true if there are subscribers waiting for messages 093 */ 094 public boolean isActive(){ 095 return !subscriptions.isEmpty(); 096 } 097 098 /** 099 * @return true if no messages are enqueued 100 */ 101 public boolean isEmpty(){ 102 return queue.isEmpty(); 103 } 104 105 /** 106 * @return the timestamp (ms) from the when the last active subscriber stopped 107 */ 108 public long getIdleTimestamp(){ 109 return idleTimestamp; 110 } 111 112 113 114 /** 115 * Add a consumer to dispatch messages to 116 * 117 * @param filter 118 * @param info 119 * @param client 120 * @return TransientQueueSubscription 121 * @throws JMSException 122 */ 123 public TransientQueueSubscription addConsumer(Filter filter,ConsumerInfo info, BrokerClient client) throws JMSException { 124 synchronized (subscriptionMutex) { 125 TransientQueueSubscription ts = findMatch(info); 126 if (ts == null) { 127 MemoryBoundedQueue queue = queueManager 128 .getMemoryBoundedQueue("TRANSIENT_QUEUE_SUB:-" 129 + info.getConsumerId()); 130 MemoryBoundedQueue ackQueue = queueManager 131 .getMemoryBoundedQueue("TRANSIENT_QUEUE_SUB_ACKED:-" 132 + info.getConsumerId()); 133 ts = new TransientQueueSubscription(client, queue, ackQueue, 134 filter, info); 135 136 idleTimestamp = 0; 137 subscriptions.add(ts); 138 if (started.get()) { 139 synchronized (running) { 140 if (running.commit(false, true)) { 141 try { 142 threadPool.execute(this); 143 } catch (InterruptedException e) { 144 JMSException jmsEx = new JMSException( 145 toString() 146 + " Failed to start running dispatch thread"); 147 jmsEx.setLinkedException(e); 148 throw jmsEx; 149 } 150 } 151 } 152 } 153 154 } 155 return ts; 156 } 157 } 158 159 /** 160 * Remove a consumer 161 * 162 * @param info 163 * @throws JMSException 164 */ 165 public void removeConsumer(ConsumerInfo info) throws JMSException { 166 synchronized (subscriptionMutex) { 167 TransientQueueSubscription ts = findMatch(info); 168 if (ts != null) { 169 170 subscriptions.remove(ts); 171 if (subscriptions.isEmpty()) { 172 running.commit(true, false); 173 idleTimestamp = System.currentTimeMillis(); 174 } 175 176 //get unacknowledged messages and re-enqueue them 177 List list = ts.getUndeliveredMessages(); 178 for (int i = list.size() - 1; i >= 0; i--) { 179 queue.enqueueFirstNoBlock((MemoryManageable) list.get(i)); 180 } 181 182 // If it is a queue browser, then re-enqueue the browsed 183 // messages. 184 if (ts.isBrowser()) { 185 list = ts.listAckedMessages(); 186 for (int i = list.size() - 1; i >= 0; i--) { 187 queue.enqueueFirstNoBlock((MemoryManageable) list 188 .get(i)); 189 } 190 ts.removeAllAckedMessages(); 191 } 192 193 ts.close(); 194 } 195 } 196 } 197 198 199 /** 200 * start working 201 * 202 * @throws JMSException 203 */ 204 public void start() throws JMSException { 205 if (started.commit(false, true)) { 206 if (!subscriptions.isEmpty()) { 207 synchronized (running) { 208 if (running.commit(false, true)) { 209 try { 210 threadPool.execute(this); 211 } 212 catch (InterruptedException e) { 213 JMSException jmsEx = new JMSException(toString() + " Failed to start"); 214 jmsEx.setLinkedException(e); 215 throw jmsEx; 216 } 217 } 218 } 219 } 220 } 221 } 222 223 /** 224 * enqueue a message for dispatching 225 * 226 * @param message 227 */ 228 public void enqueue(ActiveMQMessage message) { 229 if (message.isAdvisory()) { 230 doAdvisoryDispatchMessage(message); 231 } 232 else { 233 queue.enqueue(message); 234 startRunning(); 235 } 236 } 237 238 /** 239 * re-enqueue a message for dispatching 240 * 241 * @param message 242 */ 243 public void redeliver(ActiveMQMessage message) { 244 queue.enqueueFirstNoBlock(message); 245 startRunning(); 246 } 247 248 public void redeliver(List messages) { 249 queue.enqueueAllFirstNoBlock(messages); 250 startRunning(); 251 } 252 253 /** 254 * stop working 255 */ 256 public void stop() { 257 started.set(false); 258 running.set(false); 259 queue.clear(); 260 } 261 262 /** 263 * close down this container 264 * 265 * @throws JMSException 266 */ 267 public void close() throws JMSException { 268 if (started.get()) { 269 stop(); 270 } 271 queue.close(); 272 synchronized (subscriptionMutex) { 273 QueueListEntry entry = subscriptions.getFirstEntry(); 274 while (entry != null) { 275 TransientQueueSubscription ts = (TransientQueueSubscription) entry 276 .getElement(); 277 ts.close(); 278 entry = subscriptions.getNextEntry(entry); 279 } 280 subscriptions.clear(); 281 } 282 } 283 284 /** 285 * do some dispatching 286 */ 287 public void run() { 288 // Only allow one thread at a time to dispatch. 289 synchronized (dispatchMutex) { 290 boolean dispatched = false; 291 boolean targeted = false; 292 ActiveMQMessage message = null; 293 int notDispatchedCount = 0; 294 int sleepTime = 250; 295 int iterationsWithoutDispatchingBeforeStopping = 10000 / sleepTime;// ~10 296 // seconds 297 Map messageParts = new HashMap(); 298 try { 299 while (started.get() && running.get()) { 300 dispatched = false; 301 targeted = false; 302 synchronized (subscriptionMutex) { 303 if (!subscriptions.isEmpty()) { 304 message = (ActiveMQMessage) queue 305 .dequeue(sleepTime); 306 if (message != null) { 307 if (!message.isExpired()) { 308 QueueListEntry entry = subscriptions.getFirstEntry(); 309 while (entry != null) { 310 TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement(); 311 if (ts.isTarget(message)) { 312 targeted = true; 313 if (message.isMessagePart()) { 314 TransientQueueSubscription sameTarget = (TransientQueueSubscription) messageParts 315 .get(message.getParentMessageID()); 316 if (sameTarget == null) { 317 sameTarget = ts; 318 messageParts.put(message.getParentMessageID(),sameTarget); 319 } 320 sameTarget.doDispatch(message); 321 if (message.isLastMessagePart()) { 322 messageParts.remove(message.getParentMessageID()); 323 } 324 message = null; 325 dispatched = true; 326 notDispatchedCount = 0; 327 break; 328 } else if (ts.canAcceptMessages()) { 329 ts.doDispatch(message); 330 message = null; 331 dispatched = true; 332 notDispatchedCount = 0; 333 subscriptions.rotate(); 334 break; 335 } 336 } 337 entry = subscriptions 338 .getNextEntry(entry); 339 } 340 } else { 341 // expire message 342 if (log.isDebugEnabled()) { 343 log.debug("expired message: "+ message); 344 } 345 deadLetterPolicy.sendToDeadLetter(message); 346 message = null; 347 } 348 } 349 } 350 } 351 if (!dispatched) { 352 if (message != null) { 353 if (targeted) { 354 queue.enqueueFirstNoBlock(message); 355 } else { 356 //no matching subscribers - dump to end and hope one shows up ... 357 queue.enqueueNoBlock(message); 358 359 } 360 } 361 if (running.get()) { 362 if (notDispatchedCount++ > iterationsWithoutDispatchingBeforeStopping 363 && queue.isEmpty()) { 364 synchronized (running) { 365 running.commit(true, false); 366 } 367 } else { 368 Thread.sleep(sleepTime); 369 } 370 } 371 } 372 } 373 } catch (InterruptedException ie) { 374 //someone is stopping us from another thread 375 } catch (Throwable e) { 376 log.warn("stop dispatching", e); 377 stop(); 378 } 379 } 380 } 381 382 private TransientQueueSubscription findMatch(ConsumerInfo info) throws JMSException { 383 TransientQueueSubscription result = null; 384 synchronized(subscriptionMutex){ 385 QueueListEntry entry = subscriptions.getFirstEntry(); 386 while (entry != null) { 387 TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement(); 388 if (ts.getConsumerInfo().equals(info)) { 389 result = ts; 390 break; 391 } 392 entry = subscriptions.getNextEntry(entry); 393 } 394 } 395 return result; 396 } 397 398 /** 399 * @return the destination associated with this container 400 */ 401 public ActiveMQDestination getDestination() { 402 return destination; 403 } 404 405 /** 406 * @return the destination name 407 */ 408 public String getDestinationName() { 409 return destination.getPhysicalName(); 410 } 411 412 /** 413 * @param msg 414 * @return @throws JMSException 415 */ 416 public void addMessage(ActiveMQMessage msg) throws JMSException { 417 } 418 419 /** 420 * @param messageIdentity 421 * @param ack 422 * @throws JMSException 423 */ 424 public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException { 425 } 426 427 /** 428 * @param messageIdentity 429 * @return @throws JMSException 430 */ 431 public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException { 432 return null; 433 } 434 435 /** 436 * @param messageIdentity 437 * @throws JMSException 438 */ 439 public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException { 440 } 441 442 /** 443 * @param messageIdentity 444 * @param ack 445 * @throws JMSException 446 */ 447 public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException { 448 } 449 450 /** 451 * @param messageIdentity 452 * @return @throws JMSException 453 */ 454 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException { 455 return false; 456 } 457 458 459 protected void clear() { 460 queue.clear(); 461 } 462 463 protected void removeExpiredMessages() { 464 long currentTime = System.currentTimeMillis(); 465 List list = queue.getContents(); 466 for (int i = 0;i < list.size();i++) { 467 ActiveMQMessage msg = (ActiveMQMessage) list.get(i); 468 if (msg.isExpired(currentTime)) { 469 queue.remove(msg); 470 if (log.isDebugEnabled()) { 471 log.debug("expired message: " + msg); 472 } 473 } 474 } 475 } 476 477 protected void startRunning(){ 478 if (!running.get() && started.get() && !subscriptions.isEmpty()) { 479 synchronized (running) { 480 if (running.commit(false, true)) { 481 try { 482 threadPool.execute(this); 483 } 484 catch (InterruptedException e) { 485 log.error(this + " Couldn't start executing ",e); 486 } 487 } 488 } 489 } 490 } 491 492 493 /** 494 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin() 495 */ 496 public MessageContainerAdmin getMessageContainerAdmin() { 497 return this; 498 } 499 500 /** 501 * @see org.activemq.service.MessageContainerAdmin#empty() 502 */ 503 public void empty() throws JMSException { 504 // TODO implement me. 505 } 506 507 508 /** 509 * @see org.activemq.service.MessageContainer#isDeadLetterQueue() 510 */ 511 public boolean isDeadLetterQueue() { 512 return false; 513 } 514 515 /** 516 * Dispatch an Advisory Message 517 * @param message 518 */ 519 private synchronized void doAdvisoryDispatchMessage(ActiveMQMessage message) { 520 try { 521 if (message != null && message.isAdvisory() && !message.isExpired()) { 522 synchronized (subscriptionMutex) { 523 QueueListEntry entry = subscriptions.getFirstEntry(); 524 while (entry != null) { 525 TransientQueueSubscription ts = (TransientQueueSubscription) entry.getElement(); 526 if (ts.isTarget(message)) { 527 ts.doDispatch(message); 528 break; 529 } 530 entry = subscriptions.getNextEntry(entry); 531 } 532 } 533 } 534 } catch (JMSException jmsEx) { 535 log.warn("Failed to dispatch advisory", jmsEx); 536 } 537 } 538 539 }