001    /**
002     * 
003     * Copyright 2005 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.pool;
019    
020    import org.activemq.ActiveMQMessageProducer;
021    import org.activemq.ActiveMQQueueSender;
022    import org.activemq.ActiveMQSession;
023    import org.activemq.ActiveMQTopicPublisher;
024    import org.activemq.AlreadyClosedException;
025    import org.activemq.util.JMSExceptionHelper;
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.commons.pool.ObjectPool;
029    
030    import javax.jms.*;
031    import java.io.Serializable;
032    
033    /**
034     * @version $Revision: 1.1 $
035     */
036    public class PooledSession implements TopicSession, QueueSession {
037        private static final transient Log log = LogFactory.getLog(PooledSession.class);
038    
039        private ActiveMQSession session;
040        private ObjectPool sessionPool;
041        private ActiveMQMessageProducer messageProducer;
042        private ActiveMQQueueSender queueSender;
043        private ActiveMQTopicPublisher topicPublisher;
044        private boolean transactional = true;
045    
046        public PooledSession(ActiveMQSession aSession, ObjectPool sessionPool) {
047            this.session = aSession;
048            this.sessionPool = sessionPool;
049            this.transactional = session.isTransacted();
050        }
051    
052    
053        public void close() throws JMSException {
054            // TODO a cleaner way to reset??
055    
056            // lets reset the session
057            getSession().setMessageListener(null);
058    
059            // maybe do a rollback?
060            if (transactional) {
061                try {
062                    getSession().rollback();
063                }
064                catch (JMSException e) {
065                    log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
066    
067                    // lets close the session and not put the session back into the pool
068                    try {
069                        session.close();
070                    }
071                    catch (JMSException e1) {
072                        log.trace("Ignoring exception as discarding session: " + e1, e1);
073                    }
074                    session = null;
075                    return;
076                }
077            }
078    
079            try {
080                sessionPool.returnObject(this);
081            }
082            catch (Exception e) {
083                throw JMSExceptionHelper.newJMSException("Failed to return session to pool: " + e, e);
084            }
085        }
086    
087        public void commit() throws JMSException {
088            getSession().commit();
089        }
090    
091        public BytesMessage createBytesMessage() throws JMSException {
092            return getSession().createBytesMessage();
093        }
094    
095        public MapMessage createMapMessage() throws JMSException {
096            return getSession().createMapMessage();
097        }
098    
099        public Message createMessage() throws JMSException {
100            return getSession().createMessage();
101        }
102    
103        public ObjectMessage createObjectMessage() throws JMSException {
104            return getSession().createObjectMessage();
105        }
106    
107        public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
108            return getSession().createObjectMessage(serializable);
109        }
110    
111        public Queue createQueue(String s) throws JMSException {
112            return getSession().createQueue(s);
113        }
114    
115        public StreamMessage createStreamMessage() throws JMSException {
116            return getSession().createStreamMessage();
117        }
118    
119        public TemporaryQueue createTemporaryQueue() throws JMSException {
120            return getSession().createTemporaryQueue();
121        }
122    
123        public TemporaryTopic createTemporaryTopic() throws JMSException {
124            return getSession().createTemporaryTopic();
125        }
126    
127        public void unsubscribe(String s) throws JMSException {
128            getSession().unsubscribe(s);
129        }
130    
131        public TextMessage createTextMessage() throws JMSException {
132            return getSession().createTextMessage();
133        }
134    
135        public TextMessage createTextMessage(String s) throws JMSException {
136            return getSession().createTextMessage(s);
137        }
138    
139        public Topic createTopic(String s) throws JMSException {
140            return getSession().createTopic(s);
141        }
142    
143        public int getAcknowledgeMode() throws JMSException {
144            return getSession().getAcknowledgeMode();
145        }
146    
147        public boolean getTransacted() throws JMSException {
148            return getSession().getTransacted();
149        }
150    
151        public void recover() throws JMSException {
152            getSession().recover();
153        }
154    
155        public void rollback() throws JMSException {
156            getSession().rollback();
157        }
158    
159        public void run() {
160            if (session != null) {
161                session.run();
162            }
163        }
164    
165    
166        // Consumer related methods
167        //-------------------------------------------------------------------------
168        public QueueBrowser createBrowser(Queue queue) throws JMSException {
169            return getSession().createBrowser(queue);
170        }
171    
172        public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
173            return getSession().createBrowser(queue, selector);
174        }
175    
176        public MessageConsumer createConsumer(Destination destination) throws JMSException {
177            return getSession().createConsumer(destination);
178        }
179    
180        public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
181            return getSession().createConsumer(destination, selector);
182        }
183    
184        public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
185            return getSession().createConsumer(destination, selector, noLocal);
186        }
187    
188        public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
189            return getSession().createDurableSubscriber(topic, selector);
190        }
191    
192        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
193            return getSession().createDurableSubscriber(topic, name, selector, noLocal);
194        }
195    
196        public MessageListener getMessageListener() throws JMSException {
197            return getSession().getMessageListener();
198        }
199    
200        public void setMessageListener(MessageListener messageListener) throws JMSException {
201            getSession().setMessageListener(messageListener);
202        }
203    
204        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
205            return getSession().createSubscriber(topic);
206        }
207    
208        public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
209            return getSession().createSubscriber(topic, selector, local);
210        }
211    
212        public QueueReceiver createReceiver(Queue queue) throws JMSException {
213            return getSession().createReceiver(queue);
214        }
215    
216        public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
217            return getSession().createReceiver(queue, selector);
218        }
219    
220    
221        // Producer related methods
222        //-------------------------------------------------------------------------
223        public MessageProducer createProducer(Destination destination) throws JMSException {
224            return new PooledProducer(getMessageProducer(), destination);
225        }
226    
227        public QueueSender createSender(Queue queue) throws JMSException {
228            return new PooledQueueSender(getQueueSender(), queue);
229        }
230    
231        public TopicPublisher createPublisher(Topic topic) throws JMSException {
232            return new PooledTopicPublisher(getTopicPublisher(), topic);
233        }
234    
235        // Implementation methods
236        //-------------------------------------------------------------------------
237        protected ActiveMQSession getSession() throws AlreadyClosedException {
238            if (session == null) {
239                throw new AlreadyClosedException("The session has already been closed");
240            }
241            return session;
242        }
243    
244        public ActiveMQMessageProducer getMessageProducer() throws JMSException {
245            if (messageProducer == null) {
246                messageProducer = (ActiveMQMessageProducer) getSession().createProducer(null);
247            }
248            return messageProducer;
249        }
250    
251        public ActiveMQQueueSender getQueueSender() throws JMSException {
252            if (queueSender == null) {
253                queueSender = (ActiveMQQueueSender) getSession().createSender(null);
254            }
255            return queueSender;
256        }
257    
258        public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
259            if (topicPublisher == null) {
260                topicPublisher = (ActiveMQTopicPublisher) getSession().createPublisher(null);
261            }
262            return topicPublisher;
263        }
264    
265    }