001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq; 020 021 import java.util.Iterator; 022 import java.util.List; 023 024 import javax.jms.JMSException; 025 026 import org.activemq.io.util.MemoryBoundedQueue; 027 import org.activemq.message.ActiveMQMessage; 028 029 /** 030 * A utility class used by the Session for dispatching messages asynchronously to consumers 031 * 032 * @version $Revision: 1.1.1.1 $ 033 * @see javax.jms.Session 034 */ 035 public class ActiveMQSessionExecutor implements Runnable { 036 private ActiveMQSession session; 037 private MemoryBoundedQueue messageQueue; 038 private boolean closed; 039 private Thread runner; 040 private boolean dispatchedBySessionPool; 041 private boolean optimizedMessageDispatch; 042 043 ActiveMQSessionExecutor(ActiveMQSession session, MemoryBoundedQueue queue) { 044 this.session = session; 045 this.messageQueue = queue; 046 } 047 048 void setDispatchedBySessionPool(boolean value) { 049 dispatchedBySessionPool = value; 050 } 051 052 /** 053 * @return Returns the optimizedMessageDispatch. 054 */ 055 boolean isOptimizedMessageDispatch() { 056 return optimizedMessageDispatch; 057 } 058 /** 059 * @param optimizedMessageDispatch The optimizedMessageDispatch to set. 060 */ 061 void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 062 this.optimizedMessageDispatch = optimizedMessageDispatch; 063 } 064 065 void execute(ActiveMQMessage message) { 066 if (optimizedMessageDispatch && !dispatchedBySessionPool){ 067 dispatch(message); 068 }else { 069 messageQueue.enqueue(message); 070 } 071 072 } 073 074 void executeFirst(ActiveMQMessage message) { 075 messageQueue.enqueueFirstNoBlock(message); 076 } 077 078 boolean hasUncomsumedMessages() { 079 return !messageQueue.isEmpty(); 080 } 081 082 List getUnconsumedMessages() { 083 return messageQueue.getContents(); 084 } 085 086 /** 087 * implementation of Runnable 088 */ 089 public void run() { 090 while (!closed && !dispatchedBySessionPool) { 091 ActiveMQMessage message = null; 092 try { 093 message = (ActiveMQMessage) messageQueue.dequeue(100); 094 } 095 catch (InterruptedException ie) { 096 } 097 if (!closed) { 098 if (message != null) { 099 if (!dispatchedBySessionPool) { 100 dispatch(message); 101 } 102 else { 103 messageQueue.enqueueFirstNoBlock(message); 104 } 105 } 106 } 107 } 108 } 109 110 void dispatch(ActiveMQMessage message){ 111 for (Iterator i = this.session.consumers.iterator(); i.hasNext();) { 112 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); 113 if (message.isConsumerTarget(consumer.getConsumerNumber())) { 114 try { 115 consumer.processMessage(message.shallowCopy()); 116 } 117 catch (JMSException e) { 118 this.session.connection.handleAsyncException(e); 119 } 120 } 121 } 122 } 123 124 synchronized void start() { 125 messageQueue.start(); 126 if (runner == null && (!dispatchedBySessionPool || optimizedMessageDispatch)) { 127 runner = new Thread(this, "JmsSessionDispatcher: " + session.getSessionId()); 128 runner.setPriority(Thread.MAX_PRIORITY); 129 //runner.setDaemon(true); 130 runner.start(); 131 } 132 } 133 134 synchronized void stop() { 135 messageQueue.stop(); 136 } 137 138 synchronized void close() { 139 closed = true; 140 messageQueue.close(); 141 } 142 143 void clear() { 144 messageQueue.clear(); 145 } 146 147 ActiveMQMessage dequeueNoWait() { 148 try { 149 return (ActiveMQMessage) messageQueue.dequeueNoWait(); 150 } 151 catch (InterruptedException ie) { 152 return null; 153 } 154 } 155 156 protected void clearMessagesInProgress(){ 157 messageQueue.clear(); 158 } 159 160 161 }