001 /** 002 * 003 * Copyright 2005 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.pool; 019 020 import org.activemq.ActiveMQMessageProducer; 021 import org.activemq.ActiveMQQueueSender; 022 import org.activemq.ActiveMQSession; 023 import org.activemq.ActiveMQTopicPublisher; 024 import org.activemq.AlreadyClosedException; 025 import org.activemq.util.JMSExceptionHelper; 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.commons.pool.ObjectPool; 029 030 import javax.jms.*; 031 import java.io.Serializable; 032 033 /** 034 * @version $Revision: 1.1 $ 035 */ 036 public class PooledSession implements TopicSession, QueueSession { 037 private static final transient Log log = LogFactory.getLog(PooledSession.class); 038 039 private ActiveMQSession session; 040 private ObjectPool sessionPool; 041 private ActiveMQMessageProducer messageProducer; 042 private ActiveMQQueueSender queueSender; 043 private ActiveMQTopicPublisher topicPublisher; 044 private boolean transactional = true; 045 046 public PooledSession(ActiveMQSession aSession, ObjectPool sessionPool) { 047 this.session = aSession; 048 this.sessionPool = sessionPool; 049 this.transactional = session.isTransacted(); 050 } 051 052 053 public void close() throws JMSException { 054 // TODO a cleaner way to reset?? 055 056 // lets reset the session 057 getSession().setMessageListener(null); 058 059 // maybe do a rollback? 060 if (transactional) { 061 try { 062 getSession().rollback(); 063 } 064 catch (JMSException e) { 065 log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e); 066 067 // lets close the session and not put the session back into the pool 068 try { 069 session.close(); 070 } 071 catch (JMSException e1) { 072 log.trace("Ignoring exception as discarding session: " + e1, e1); 073 } 074 session = null; 075 return; 076 } 077 } 078 079 try { 080 sessionPool.returnObject(this); 081 } 082 catch (Exception e) { 083 throw JMSExceptionHelper.newJMSException("Failed to return session to pool: " + e, e); 084 } 085 } 086 087 public void commit() throws JMSException { 088 getSession().commit(); 089 } 090 091 public BytesMessage createBytesMessage() throws JMSException { 092 return getSession().createBytesMessage(); 093 } 094 095 public MapMessage createMapMessage() throws JMSException { 096 return getSession().createMapMessage(); 097 } 098 099 public Message createMessage() throws JMSException { 100 return getSession().createMessage(); 101 } 102 103 public ObjectMessage createObjectMessage() throws JMSException { 104 return getSession().createObjectMessage(); 105 } 106 107 public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { 108 return getSession().createObjectMessage(serializable); 109 } 110 111 public Queue createQueue(String s) throws JMSException { 112 return getSession().createQueue(s); 113 } 114 115 public StreamMessage createStreamMessage() throws JMSException { 116 return getSession().createStreamMessage(); 117 } 118 119 public TemporaryQueue createTemporaryQueue() throws JMSException { 120 return getSession().createTemporaryQueue(); 121 } 122 123 public TemporaryTopic createTemporaryTopic() throws JMSException { 124 return getSession().createTemporaryTopic(); 125 } 126 127 public void unsubscribe(String s) throws JMSException { 128 getSession().unsubscribe(s); 129 } 130 131 public TextMessage createTextMessage() throws JMSException { 132 return getSession().createTextMessage(); 133 } 134 135 public TextMessage createTextMessage(String s) throws JMSException { 136 return getSession().createTextMessage(s); 137 } 138 139 public Topic createTopic(String s) throws JMSException { 140 return getSession().createTopic(s); 141 } 142 143 public int getAcknowledgeMode() throws JMSException { 144 return getSession().getAcknowledgeMode(); 145 } 146 147 public boolean getTransacted() throws JMSException { 148 return getSession().getTransacted(); 149 } 150 151 public void recover() throws JMSException { 152 getSession().recover(); 153 } 154 155 public void rollback() throws JMSException { 156 getSession().rollback(); 157 } 158 159 public void run() { 160 if (session != null) { 161 session.run(); 162 } 163 } 164 165 166 // Consumer related methods 167 //------------------------------------------------------------------------- 168 public QueueBrowser createBrowser(Queue queue) throws JMSException { 169 return getSession().createBrowser(queue); 170 } 171 172 public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { 173 return getSession().createBrowser(queue, selector); 174 } 175 176 public MessageConsumer createConsumer(Destination destination) throws JMSException { 177 return getSession().createConsumer(destination); 178 } 179 180 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { 181 return getSession().createConsumer(destination, selector); 182 } 183 184 public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { 185 return getSession().createConsumer(destination, selector, noLocal); 186 } 187 188 public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException { 189 return getSession().createDurableSubscriber(topic, selector); 190 } 191 192 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { 193 return getSession().createDurableSubscriber(topic, name, selector, noLocal); 194 } 195 196 public MessageListener getMessageListener() throws JMSException { 197 return getSession().getMessageListener(); 198 } 199 200 public void setMessageListener(MessageListener messageListener) throws JMSException { 201 getSession().setMessageListener(messageListener); 202 } 203 204 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 205 return getSession().createSubscriber(topic); 206 } 207 208 public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { 209 return getSession().createSubscriber(topic, selector, local); 210 } 211 212 public QueueReceiver createReceiver(Queue queue) throws JMSException { 213 return getSession().createReceiver(queue); 214 } 215 216 public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { 217 return getSession().createReceiver(queue, selector); 218 } 219 220 221 // Producer related methods 222 //------------------------------------------------------------------------- 223 public MessageProducer createProducer(Destination destination) throws JMSException { 224 return new PooledProducer(getMessageProducer(), destination); 225 } 226 227 public QueueSender createSender(Queue queue) throws JMSException { 228 return new PooledQueueSender(getQueueSender(), queue); 229 } 230 231 public TopicPublisher createPublisher(Topic topic) throws JMSException { 232 return new PooledTopicPublisher(getTopicPublisher(), topic); 233 } 234 235 // Implementation methods 236 //------------------------------------------------------------------------- 237 protected ActiveMQSession getSession() throws AlreadyClosedException { 238 if (session == null) { 239 throw new AlreadyClosedException("The session has already been closed"); 240 } 241 return session; 242 } 243 244 public ActiveMQMessageProducer getMessageProducer() throws JMSException { 245 if (messageProducer == null) { 246 messageProducer = (ActiveMQMessageProducer) getSession().createProducer(null); 247 } 248 return messageProducer; 249 } 250 251 public ActiveMQQueueSender getQueueSender() throws JMSException { 252 if (queueSender == null) { 253 queueSender = (ActiveMQQueueSender) getSession().createSender(null); 254 } 255 return queueSender; 256 } 257 258 public ActiveMQTopicPublisher getTopicPublisher() throws JMSException { 259 if (topicPublisher == null) { 260 topicPublisher = (ActiveMQTopicPublisher) getSession().createPublisher(null); 261 } 262 return topicPublisher; 263 } 264 265 }