001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    
019    package org.activemq;
020    import java.io.IOException;
021    import java.io.Serializable;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.ListIterator;
026    
027    import javax.jms.BytesMessage;
028    import javax.jms.DeliveryMode;
029    import javax.jms.Destination;
030    import javax.jms.IllegalStateException;
031    import javax.jms.InvalidDestinationException;
032    import javax.jms.InvalidSelectorException;
033    import javax.jms.JMSException;
034    import javax.jms.MapMessage;
035    import javax.jms.Message;
036    import javax.jms.MessageConsumer;
037    import javax.jms.MessageListener;
038    import javax.jms.MessageProducer;
039    import javax.jms.ObjectMessage;
040    import javax.jms.Queue;
041    import javax.jms.QueueBrowser;
042    import javax.jms.QueueReceiver;
043    import javax.jms.QueueSender;
044    import javax.jms.QueueSession;
045    import javax.jms.Session;
046    import javax.jms.StreamMessage;
047    import javax.jms.TemporaryQueue;
048    import javax.jms.TemporaryTopic;
049    import javax.jms.TextMessage;
050    import javax.jms.Topic;
051    import javax.jms.TopicPublisher;
052    import javax.jms.TopicSession;
053    import javax.jms.TopicSubscriber;
054    import javax.jms.TransactionRolledBackException;
055    
056    import org.activemq.io.util.ByteArray;
057    import org.activemq.io.util.ByteArrayCompression;
058    import org.activemq.io.util.ByteArrayFragmentation;
059    import org.activemq.management.JMSSessionStatsImpl;
060    import org.activemq.management.StatsCapable;
061    import org.activemq.management.StatsImpl;
062    import org.activemq.message.ActiveMQBytesMessage;
063    import org.activemq.message.ActiveMQDestination;
064    import org.activemq.message.ActiveMQMapMessage;
065    import org.activemq.message.ActiveMQMessage;
066    import org.activemq.message.ActiveMQObjectMessage;
067    import org.activemq.message.ActiveMQQueue;
068    import org.activemq.message.ActiveMQStreamMessage;
069    import org.activemq.message.ActiveMQTemporaryQueue;
070    import org.activemq.message.ActiveMQTemporaryTopic;
071    import org.activemq.message.ActiveMQTextMessage;
072    import org.activemq.message.ActiveMQTopic;
073    import org.activemq.message.ConsumerInfo;
074    import org.activemq.message.DurableUnsubscribe;
075    import org.activemq.message.MessageAck;
076    import org.activemq.message.MessageAcknowledge;
077    import org.activemq.message.ProducerInfo;
078    import org.activemq.service.impl.DefaultQueueList;
079    import org.activemq.util.IdGenerator;
080    import org.apache.commons.logging.Log;
081    import org.apache.commons.logging.LogFactory;
082    
083    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
084    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
085    
086    /**
087     * <P>
088     * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may
089     * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
090     * <P>
091     * A session serves several purposes:
092     * <UL>
093     * <LI>It is a factory for its message producers and consumers.
094     * <LI>It supplies provider-optimized message factories.
095     * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
096     * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to
097     * dynamically manipulate provider-specific destination names.
098     * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic
099     * units.
100     * <LI>It defines a serial order for the messages it consumes and the messages it produces.
101     * <LI>It retains messages it consumes until they have been acknowledged.
102     * <LI>It serializes execution of message listeners registered with its message consumers.
103     * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
104     * </UL>
105     * <P>
106     * A session can create and service multiple message producers and consumers.
107     * <P>
108     * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives.
109     * The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
110     * <P>
111     * If a client desires to have one thread produce messages while others consume them, the client should use a separate
112     * session for its producing thread.
113     * <P>
114     * Once a connection has been started, any session with one or more registered message listeners is dedicated to the
115     * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its
116     * constituent objects from another thread of control. The only exception to this rule is the use of the session or
117     * connection <CODE>close</CODE> method.
118     * <P>
119     * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to
120     * start simply and incrementally add message processing complexity as their need for concurrency grows.
121     * <P>
122     * The <CODE>close</CODE> method is the only session method that can be called while some other session method is
123     * being executed in another thread.
124     * <P>
125     * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each
126     * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect,
127     * transactions organize a session's input message stream and output message stream into series of atomic units. When a
128     * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a
129     * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically
130     * recovered.
131     * <P>
132     * The content of a transaction's input and output units is simply those messages that have been produced and consumed
133     * within the session's current transaction.
134     * <P>
135     * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's <CODE>rollback
136     * </CODE> method. The completion of a session's current transaction automatically begins the next. The result is that a
137     * transacted session always has a current transaction within which its work is done.
138     * <P>
139     * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction
140     * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are
141     * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback</CODE>
142     * methods in this context is prohibited.
143     * <P>
144     * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
145     * <P>
146     * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many
147     * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the
148     * JMS API into their application server products.
149     * 
150     * @version $Revision: 1.1.1.1 $
151     * @see javax.jms.Session
152     * @see javax.jms.QueueSession
153     * @see javax.jms.TopicSession
154     * @see javax.jms.XASession
155     */
156    public class ActiveMQSession
157            implements
158                Session,
159                QueueSession,
160                TopicSession,
161                ActiveMQMessageDispatcher,
162                MessageAcknowledge,
163                StatsCapable {
164        
165        public static interface DeliveryListener {
166            public void beforeDelivery(ActiveMQSession session, Message msg);
167            public void afterDelivery(ActiveMQSession session, Message msg);
168        }
169        
170        protected static final int CONSUMER_DISPATCH_UNSET = 1;
171        protected static final int CONSUMER_DISPATCH_ASYNC = 2;
172        protected static final int CONSUMER_DISPATCH_SYNC = 3;
173        private static final Log log = LogFactory.getLog(ActiveMQSession.class);
174        protected ActiveMQConnection connection;
175        protected int acknowledgeMode;
176        protected CopyOnWriteArrayList consumers;
177        protected CopyOnWriteArrayList producers;
178        private IdGenerator temporaryDestinationGenerator;
179        private MessageListener messageListener;
180        protected boolean closed;
181        private SynchronizedBoolean started;
182        private short sessionId;
183        private long startTime;
184        private DefaultQueueList deliveredMessages;
185        private ActiveMQSessionExecutor messageExecutor;
186        private JMSSessionStatsImpl stats;
187        private int consumerDispatchState;
188        private ByteArrayCompression compression;
189        private TransactionContext transactionContext;
190        private boolean internalSession;
191        private DeliveryListener deliveryListener;
192        
193        /**
194         * Construct the Session
195         * 
196         * @param theConnection
197         * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
198         * @throws JMSException on internal error
199         */
200        protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException {
201            this(theConnection, theAcknowledgeMode,theConnection.isOptimizedMessageDispatch());
202        }
203    
204        /**
205         * Construct the Session
206         * 
207         * @param theConnection
208         * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
209         * @param optimizedDispatch
210         * @throws JMSException on internal error
211         */
212        protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode,boolean optimizedDispatch) throws JMSException {
213            this.connection = theConnection;
214            this.acknowledgeMode = theAcknowledgeMode;
215            setTransactionContext(new TransactionContext(theConnection));
216            this.consumers = new CopyOnWriteArrayList();
217            this.producers = new CopyOnWriteArrayList();
218            this.temporaryDestinationGenerator = new IdGenerator();
219            this.started = new SynchronizedBoolean(false);
220            this.sessionId = connection.generateSessionId();
221            this.startTime = System.currentTimeMillis();
222            this.deliveredMessages = new DefaultQueueList();
223            this.messageExecutor = new ActiveMQSessionExecutor(this, connection.getMemoryBoundedQueue("Session("
224                    + sessionId + ")"));
225            this.messageExecutor.setOptimizedMessageDispatch(optimizedDispatch);
226            connection.addSession(this);
227            stats = new JMSSessionStatsImpl(producers, consumers);
228            this.consumerDispatchState = CONSUMER_DISPATCH_UNSET;
229            this.compression = new ByteArrayCompression();
230            this.compression.setCompressionLevel(theConnection.getMessageCompressionLevel());
231            this.compression.setCompressionStrategy(theConnection.getMessageCompressionStrategy());
232            this.compression.setCompressionLimit(theConnection.getMessageCompressionLimit());
233            
234            this.internalSession = theConnection.isInternalConnection();
235        }
236    
237        public void setTransactionContext(TransactionContext transactionContext) {
238            if( this.transactionContext!=null ) {
239                this.transactionContext.removeSession(this);
240            }        
241            this.transactionContext = transactionContext;
242            this.transactionContext.addSession(this);
243        }
244        
245        public TransactionContext getTransactionContext() {
246            return transactionContext;
247        }
248    
249        public StatsImpl getStats() {
250            return stats;
251        }
252    
253        public JMSSessionStatsImpl getSessionStats() {
254            return stats;
255        }
256    
257        /**
258         * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message
259         * containing a stream of uninterpreted bytes.
260         * 
261         * @return the an ActiveMQBytesMessage
262         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
263         */
264        public BytesMessage createBytesMessage() throws JMSException {
265            checkClosed();
266            return new ActiveMQBytesMessage();
267        }
268    
269        /**
270         * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining
271         * set of name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the
272         * Java programming language.
273         * 
274         * @return an ActiveMQMapMessage
275         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
276         */
277        public MapMessage createMapMessage() throws JMSException {
278            checkClosed();
279            return new ActiveMQMapMessage();
280        }
281    
282        /**
283         * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS
284         * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when
285         * a message containing only header information is sufficient.
286         * 
287         * @return an ActiveMQMessage
288         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
289         */
290        public Message createMessage() throws JMSException {
291            checkClosed();
292            return new ActiveMQMessage();
293        }
294    
295        /**
296         * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message
297         * that contains a serializable Java object.
298         * 
299         * @return an ActiveMQObjectMessage
300         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
301         */
302        public ObjectMessage createObjectMessage() throws JMSException {
303            checkClosed();
304            return new ActiveMQObjectMessage();
305        }
306    
307        /**
308         * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to
309         * send a message that contains a serializable Java object.
310         * 
311         * @param object the object to use to initialize this message
312         * @return an ActiveMQObjectMessage
313         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
314         */
315        public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
316            checkClosed();
317            ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
318            msg.setObject(object);
319            return msg;
320        }
321    
322        /**
323         * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a
324         * self-defining stream of primitive values in the Java programming language.
325         * 
326         * @return an ActiveMQStreamMessage
327         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
328         */
329        public StreamMessage createStreamMessage() throws JMSException {
330            checkClosed();
331            return new ActiveMQStreamMessage();
332        }
333    
334        /**
335         * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message
336         * containing a <CODE>String</CODE> object.
337         * 
338         * @return an ActiveMQTextMessage
339         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
340         */
341        public TextMessage createTextMessage() throws JMSException {
342            checkClosed();
343            return new ActiveMQTextMessage();
344        }
345    
346        /**
347         * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a
348         * message containing a <CODE>String</CODE>.
349         * 
350         * @param text the string used to initialize this message
351         * @return an ActiveMQTextMessage
352         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
353         */
354        public TextMessage createTextMessage(String text) throws JMSException {
355            checkClosed();
356            ActiveMQTextMessage msg = new ActiveMQTextMessage();
357            msg.setText(text);
358            return msg;
359        }
360    
361        /**
362         * Indicates whether the session is in transacted mode.
363         * 
364         * @return true if the session is in transacted mode
365         * @throws JMSException if there is some internal error.
366         */
367        public boolean getTransacted() throws JMSException {
368            checkClosed();
369            return this.acknowledgeMode == Session.SESSION_TRANSACTED || transactionContext.isInXATransaction();
370        }
371    
372        /**
373         * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is
374         * created. If the session is transacted, the acknowledgement mode is ignored.
375         * 
376         * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the
377         * session is transacted, returns SESSION_TRANSACTED.
378         * @throws JMSException
379         * @see javax.jms.Connection#createSession(boolean,int)
380         * @since 1.1 exception JMSException if there is some internal error.
381         */
382        public int getAcknowledgeMode() throws JMSException {
383            checkClosed();
384            return this.acknowledgeMode;
385        }
386    
387        /**
388         * Commits all messages done in this transaction and releases any locks currently held.
389         * 
390         * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error.
391         * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
392         * commit.
393         * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
394         */
395        public void commit() throws JMSException {
396            checkClosed();
397            if (!getTransacted()) {
398                throw new javax.jms.IllegalStateException("Not a transacted session");
399            }
400            transactionContext.commit();
401        }
402    
403        /**
404         * Rolls back any messages done in this transaction and releases any locks currently held.
405         * 
406         * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
407         * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
408         */
409        public void rollback() throws JMSException {
410            checkClosed();
411            if (!getTransacted()) {
412                throw new javax.jms.IllegalStateException("Not a transacted session");
413            }
414            transactionContext.rollback();
415        }
416    
417        public void clearDeliveredMessages() {
418            deliveredMessages.clear();        
419        }
420        
421        /**
422         * Closes the session.
423         * <P>
424         * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the
425         * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not
426         * be timely enough.
427         * <P>
428         * There is no need to close the producers and consumers of a closed session.
429         * <P>
430         * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A blocked
431         * message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
432         * <P>
433         * Closing a transacted session must roll back the transaction in progress.
434         * <P>
435         * This method is the only <CODE>Session</CODE> method that can be called concurrently.
436         * <P>
437         * Invoking any other <CODE>Session</CODE> method on a closed session must throw a <CODE>
438         * JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception.
439         * 
440         * @throws JMSException if the JMS provider fails to close the session due to some internal error.
441         */
442        public void close() throws JMSException {
443            if (!this.closed) {
444                if (getTransactionContext().isInLocalTransaction()) {
445                    rollback();
446                }
447                doClose();
448                closed = true;
449            }
450        }
451    
452        protected void doClose() throws JMSException {
453            doAcknowledge(true);
454            deliveredMessages.clear();
455            for (Iterator i = consumers.iterator();i.hasNext();) {
456                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
457                consumer.close();
458            }
459            for (Iterator i = producers.iterator();i.hasNext();) {
460                ActiveMQMessageProducer producer = (ActiveMQMessageProducer) i.next();
461                producer.close();
462            }
463            consumers.clear();
464            producers.clear();
465            this.connection.removeSession(this);
466            this.transactionContext.removeSession(this);
467            messageExecutor.close();
468        }
469    
470        /**
471         * @throws IllegalStateException if the Session is closed
472         */
473        protected void checkClosed() throws IllegalStateException {
474            if (this.closed) {
475                throw new IllegalStateException("The Session is closed");
476            }
477        }
478    
479        /**
480         * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
481         * <P>
482         * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all
483         * messages that have been delivered to the client.
484         * <P>
485         * Restarting a session causes it to take the following actions:
486         * <UL>
487         * <LI>Stop message delivery
488         * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered"
489         * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
490         * Redelivered messages do not have to be delivered in exactly their original delivery order.
491         * </UL>
492         * 
493         * @throws JMSException if the JMS provider fails to stop and restart message delivery due to some internal error.
494         * @throws IllegalStateException if the method is called by a transacted session.
495         */
496        public void recover() throws JMSException {
497            checkClosed();
498            if (getTransacted()) {
499                throw new IllegalStateException("This session is transacted");
500            }
501            redeliverUnacknowledgedMessages();
502        }
503    
504        /**
505         * Returns the session's distinguished message listener (optional).
506         * 
507         * @return the message listener associated with this session
508         * @throws JMSException if the JMS provider fails to get the message listener due to an internal error.
509         * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
510         * @see javax.jms.ServerSessionPool
511         * @see javax.jms.ServerSession
512         */
513        public MessageListener getMessageListener() throws JMSException {
514            checkClosed();
515            return this.messageListener;
516        }
517    
518        /**
519         * Sets the session's distinguished message listener (optional).
520         * <P>
521         * When the distinguished message listener is set, no other form of message receipt in the session can be used;
522         * however, all forms of sending messages are still supported.
523         * <P>
524         * This is an expert facility not used by regular JMS clients.
525         * 
526         * @param listener the message listener to associate with this session
527         * @throws JMSException if the JMS provider fails to set the message listener due to an internal error.
528         * @see javax.jms.Session#getMessageListener()
529         * @see javax.jms.ServerSessionPool
530         * @see javax.jms.ServerSession
531         */
532        public void setMessageListener(MessageListener listener) throws JMSException {
533            checkClosed();
534            this.messageListener = listener;
535            if (listener != null) {
536                messageExecutor.setDispatchedBySessionPool(true);
537            }
538        }
539    
540        /**
541         * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients.
542         * 
543         * @see javax.jms.ServerSession
544         */
545        public void run() {
546            ActiveMQMessage message;
547            while ((message = messageExecutor.dequeueNoWait()) != null) {
548                if( deliveryListener!=null )
549                    deliveryListener.beforeDelivery(this, message);
550                beforeMessageDelivered(message);
551                deliver(message);
552                if( deliveryListener!=null )
553                    deliveryListener.afterDelivery(this, message);
554            }
555        }
556    
557        /**
558         * Delivers a message to the messageListern
559         * @param message The message to deliver
560         */ 
561        private void deliver(ActiveMQMessage message) {
562            if (!message.isExpired() && this.messageListener != null) {
563                try {
564                    
565                    if( log.isDebugEnabled() ) {
566                        log.debug("Message delivered to session message listener: "+message);
567                    }
568                    
569                    this.messageListener.onMessage(message);
570                    this.afterMessageDelivered(true, message, true, false, true);
571                }
572                catch (Throwable t) {
573                    log.info("Caught :" + t, t);
574                    this.afterMessageDelivered(true, message, false, false, true);
575                }
576            }
577            else {
578                this.afterMessageDelivered(true, message, false, message.isExpired(), true);
579            }
580        }
581    
582        /**
583         * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination.
584         * <P>
585         * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue
586         * </CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
587         * destination parameter to create a <CODE>MessageProducer</CODE> object.
588         * 
589         * @param destination the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have a
590         * specified destination.
591         * @return the MessageProducer
592         * @throws JMSException if the session fails to create a MessageProducer due to some internal error.
593         * @throws InvalidDestinationException if an invalid destination is specified.
594         * @since 1.1
595         */
596        public MessageProducer createProducer(Destination destination) throws JMSException {
597            checkClosed();
598            return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
599        }
600    
601        /**
602         * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and <CODE>
603         * Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
604         * create a <CODE>MessageConsumer</CODE>.
605         * 
606         * @param destination the <CODE>Destination</CODE> to access.
607         * @return the MessageConsumer
608         * @throws JMSException if the session fails to create a consumer due to some internal error.
609         * @throws InvalidDestinationException if an invalid destination is specified.
610         * @since 1.1
611         */
612        public MessageConsumer createConsumer(Destination destination) throws JMSException {
613            checkClosed();
614            int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
615                    .getPrefetchPolicy().getQueuePrefetch();
616            return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
617                    "", this.connection.getNextConsumerNumber(), prefetch, false, false);
618        }
619    
620        /**
621         * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since <CODE>
622         * Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
623         * destination parameter to create a <CODE>MessageConsumer</CODE>.
624         * <P>
625         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
626         * 
627         * @param destination the <CODE>Destination</CODE> to access
628         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
629         * value of null or an empty string indicates that there is no message selector for the message consumer.
630         * @return the MessageConsumer
631         * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
632         * @throws InvalidDestinationException if an invalid destination is specified.
633         * @throws InvalidSelectorException if the message selector is invalid.
634         * @since 1.1
635         */
636        public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
637            checkClosed();
638            int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
639                    .getPrefetchPolicy().getQueuePrefetch();
640            return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
641                    messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
642        }
643    
644        /**
645         * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
646         * specify whether messages published by its own connection should be delivered to it, if the destination is a
647         * topic.
648         * <P>
649         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be
650         * used in the destination parameter to create a <CODE>MessageConsumer</CODE>.
651         * <P>
652         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a
653         * destination.
654         * <P>
655         * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE>
656         * attribute allows a consumer to inhibit the delivery of messages published by its own connection. The default
657         * value for this attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are
658         * topics.
659         * 
660         * @param destination the <CODE>Destination</CODE> to access
661         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
662         * value of null or an empty string indicates that there is no message selector for the message consumer.
663         * @param NoLocal - if true, and the destination is a topic, inhibits the delivery of messages published by its own
664         * connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
665         * @return the MessageConsumer
666         * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
667         * @throws InvalidDestinationException if an invalid destination is specified.
668         * @throws InvalidSelectorException if the message selector is invalid.
669         * @since 1.1
670         */
671        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
672                throws JMSException {
673            checkClosed();
674            int prefetch = connection.getPrefetchPolicy().getTopicPrefetch();
675            return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
676                    messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
677        }
678    
679        /**
680         * Creates a queue identity given a <CODE>Queue</CODE> name.
681         * <P>
682         * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It
683         * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are
684         * not portable.
685         * <P>
686         * Note that this method is not for creating the physical queue. The physical creation of queues is an
687         * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
688         * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> method.
689         * 
690         * @param queueName the name of this <CODE>Queue</CODE>
691         * @return a <CODE>Queue</CODE> with the given name
692         * @throws JMSException if the session fails to create a queue due to some internal error.
693         * @since 1.1
694         */
695        public Queue createQueue(String queueName) throws JMSException {
696            checkClosed();
697            return new ActiveMQQueue(queueName);
698        }
699    
700        /**
701         * Creates a topic identity given a <CODE>Topic</CODE> name.
702         * <P>
703         * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This
704         * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are
705         * not portable.
706         * <P>
707         * Note that this method is not for creating the physical topic. The physical creation of topics is an
708         * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
709         * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> method.
710         * 
711         * @param topicName the name of this <CODE>Topic</CODE>
712         * @return a <CODE>Topic</CODE> with the given name
713         * @throws JMSException if the session fails to create a topic due to some internal error.
714         * @since 1.1
715         */
716        public Topic createTopic(String topicName) throws JMSException {
717            checkClosed();
718            return new ActiveMQTopic(topicName);
719        }
720    
721        /**
722         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
723         * 
724         * @param queue the <CODE>queue</CODE> to access
725         * @exception InvalidDestinationException if an invalid destination is specified
726         * @since 1.1
727         */
728        /**
729         * Creates a durable subscriber to the specified topic.
730         * <P>
731         * If a client needs to receive all the messages published on a topic, including the ones published while the
732         * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
733         * this durable subscription and insures that all messages from the topic's publishers are retained until they are
734         * acknowledged by this durable subscriber or they have expired.
735         * <P>
736         * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
737         * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only one
738         * session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription.
739         * <P>
740         * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
741         * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
742         * unsubscribing (deleting) the old one and creating a new one.
743         * <P>
744         * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
745         * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
746         * value for this attribute is false.
747         * 
748         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
749         * @param name the name used to identify this subscription
750         * @return the TopicSubscriber
751         * @throws JMSException if the session fails to create a subscriber due to some internal error.
752         * @throws InvalidDestinationException if an invalid topic is specified.
753         * @since 1.1
754         */
755        public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
756            checkClosed();
757            return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "",
758                    this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(),
759                    false, false);
760        }
761    
762        /**
763         * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
764         * published by its own connection should be delivered to it.
765         * <P>
766         * If a client needs to receive all the messages published on a topic, including the ones published while the
767         * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
768         * this durable subscription and insures that all messages from the topic's publishers are retained until they are
769         * acknowledged by this durable subscriber or they have expired.
770         * <P>
771         * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
772         * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only
773         * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
774         * inactive durable subscriber is one that exists but does not currently have a message consumer associated with it.
775         * <P>
776         * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
777         * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
778         * unsubscribing (deleting) the old one and creating a new one.
779         * 
780         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
781         * @param name the name used to identify this subscription
782         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
783         * value of null or an empty string indicates that there is no message selector for the message consumer.
784         * @param noLocal if set, inhibits the delivery of messages published by its own connection
785         * @return the Queue Browser
786         * @throws JMSException if the session fails to create a subscriber due to some internal error.
787         * @throws InvalidDestinationException if an invalid topic is specified.
788         * @throws InvalidSelectorException if the message selector is invalid.
789         * @since 1.1
790         */
791        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
792                throws JMSException {
793            checkClosed();
794            return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name,
795                    messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
796                            .getDurableTopicPrefetch(), noLocal, false);
797        }
798    
799        /**
800         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
801         * 
802         * @param queue the <CODE>queue</CODE> to access
803         * @return the Queue Browser
804         * @throws JMSException if the session fails to create a browser due to some internal error.
805         * @throws InvalidDestinationException if an invalid destination is specified
806         * @since 1.1
807         */
808        public QueueBrowser createBrowser(Queue queue) throws JMSException {
809            checkClosed();
810            return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue), "",
811                    this.connection.getNextConsumerNumber());
812        }
813    
814        /**
815         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message
816         * selector.
817         * 
818         * @param queue the <CODE>queue</CODE> to access
819         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
820         * value of null or an empty string indicates that there is no message selector for the message consumer.
821         * @return the Queue Browser
822         * @throws JMSException if the session fails to create a browser due to some internal error.
823         * @throws InvalidDestinationException if an invalid destination is specified
824         * @throws InvalidSelectorException if the message selector is invalid.
825         * @since 1.1
826         */
827        public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
828            checkClosed();
829            return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue),
830                    messageSelector, this.connection.getNextConsumerNumber());
831        }
832    
833        /**
834         * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
835         * it is deleted earlier.
836         * 
837         * @return a temporary queue identity
838         * @throws JMSException if the session fails to create a temporary queue due to some internal error.
839         * @since 1.1
840         */
841        public TemporaryQueue createTemporaryQueue() throws JMSException {
842            checkClosed();
843            String tempQueueName = "TemporaryQueue-"
844                    + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
845            tempQueueName += this.temporaryDestinationGenerator.generateId();
846            ActiveMQTemporaryQueue tempQueue =  new ActiveMQTemporaryQueue(tempQueueName);
847           tempQueue.setSessionCreatedBy(this);
848           this.connection.startTemporaryDestination(tempQueue);
849           return tempQueue;
850        }
851    
852        /**
853         * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
854         * it is deleted earlier.
855         * 
856         * @return a temporary topic identity
857         * @throws JMSException if the session fails to create a temporary topic due to some internal error.
858         * @since 1.1
859         */
860        public TemporaryTopic createTemporaryTopic() throws JMSException {
861            checkClosed();
862            String tempTopicName = "TemporaryTopic-"
863                    + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
864            tempTopicName += this.temporaryDestinationGenerator.generateId();
865            ActiveMQTemporaryTopic tempTopic =  new ActiveMQTemporaryTopic(tempTopicName);
866            tempTopic.setSessionCreatedBy(this);
867            this.connection.startTemporaryDestination(tempTopic);
868            return tempTopic;
869        }
870    
871        /**
872         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue.
873         * 
874         * @param queue the <CODE>Queue</CODE> to access
875         * @return @throws JMSException if the session fails to create a receiver due to some internal error.
876         * @throws JMSException
877         * @throws InvalidDestinationException if an invalid queue is specified.
878         */
879        public QueueReceiver createReceiver(Queue queue) throws JMSException {
880            checkClosed();
881            return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection
882                    .getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
883        }
884    
885        /**
886         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message
887         * selector.
888         * 
889         * @param queue the <CODE>Queue</CODE> to access
890         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
891         * value of null or an empty string indicates that there is no message selector for the message consumer.
892         * @return QueueReceiver
893         * @throws JMSException if the session fails to create a receiver due to some internal error.
894         * @throws InvalidDestinationException if an invalid queue is specified.
895         * @throws InvalidSelectorException if the message selector is invalid.
896         */
897        public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
898            checkClosed();
899            return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue),
900                    messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
901                            .getQueuePrefetch());
902        }
903    
904        /**
905         * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue.
906         * 
907         * @param queue the <CODE>Queue</CODE> to access, or null if this is an unidentified producer
908         * @return QueueSender
909         * @throws JMSException if the session fails to create a sender due to some internal error.
910         * @throws InvalidDestinationException if an invalid queue is specified.
911         */
912        public QueueSender createSender(Queue queue) throws JMSException {
913            checkClosed();
914            return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
915        }
916    
917        /**
918         * Creates a nondurable subscriber to the specified topic. <p/>
919         * <P>
920         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
921         * <p/>
922         * <P>
923         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
924         * while they are active. <p/>
925         * <P>
926         * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
927         * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
928         * value for this attribute is false.
929         * 
930         * @param topic the <CODE>Topic</CODE> to subscribe to
931         * @return TopicSubscriber
932         * @throws JMSException if the session fails to create a subscriber due to some internal error.
933         * @throws InvalidDestinationException if an invalid topic is specified.
934         */
935        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
936            checkClosed();
937            return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null,
938                    this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false,
939                    false);
940        }
941    
942        /**
943         * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages
944         * published by its own connection should be delivered to it. <p/>
945         * <P>
946         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
947         * <p/>
948         * <P>
949         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
950         * while they are active. <p/>
951         * <P>
952         * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the
953         * subscriber's perspective, they do not exist. <p/>
954         * <P>
955         * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
956         * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
957         * value for this attribute is false.
958         * 
959         * @param topic the <CODE>Topic</CODE> to subscribe to
960         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
961         * value of null or an empty string indicates that there is no message selector for the message consumer.
962         * @param noLocal if set, inhibits the delivery of messages published by its own connection
963         * @return TopicSubscriber
964         * @throws JMSException if the session fails to create a subscriber due to some internal error.
965         * @throws InvalidDestinationException if an invalid topic is specified.
966         * @throws InvalidSelectorException if the message selector is invalid.
967         */
968        public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
969            checkClosed();
970            return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null,
971                    messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
972                            .getTopicPrefetch(), noLocal, false);
973        }
974    
975        /**
976         * Creates a publisher for the specified topic. <p/>
977         * <P>
978         * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates a
979         * <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering
980         * relationship with the messages it has previously sent.
981         * 
982         * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer
983         * @return TopicPublisher
984         * @throws JMSException if the session fails to create a publisher due to some internal error.
985         * @throws InvalidDestinationException if an invalid topic is specified.
986         */
987        public TopicPublisher createPublisher(Topic topic) throws JMSException {
988            checkClosed();
989            return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
990        }
991    
992        /**
993         * Unsubscribes a durable subscription that has been created by a client.
994         * <P>
995         * This method deletes the state being maintained on behalf of the subscriber by its provider.
996         * <P>
997         * It is erroneous for a client to delete a durable subscription while there is an active <CODE>MessageConsumer
998         * </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is part of a pending
999         * transaction or has not been acknowledged in the session.
1000         * 
1001         * @param name the name used to identify this subscription
1002         * @throws JMSException if the session fails to unsubscribe to the durable subscription due to some internal error.
1003         * @throws InvalidDestinationException if an invalid subscription name is specified.
1004         * @since 1.1
1005         */
1006        public void unsubscribe(String name) throws JMSException {
1007            checkClosed();
1008            DurableUnsubscribe ds = new DurableUnsubscribe();
1009            ds.setClientId(this.connection.getClientID());
1010            ds.setSubscriberName(name);
1011            this.connection.syncSendPacket(ds);
1012        }
1013    
1014        /**
1015         * Tests to see if the Message Dispatcher is a target for this message
1016         * 
1017         * @param message the message to test
1018         * @return true if the Message Dispatcher can dispatch the message
1019         */
1020        public boolean isTarget(ActiveMQMessage message) {
1021            for (Iterator i = this.consumers.iterator();i.hasNext();) {
1022                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1023                if (message.isConsumerTarget(consumer.getConsumerNumber())) {
1024                    return true;
1025                }
1026            }
1027            return false;
1028        }
1029    
1030        /**
1031         * Dispatch an ActiveMQMessage
1032         * 
1033         * @param message
1034         */
1035        public void dispatch(ActiveMQMessage message) {
1036            message.setMessageAcknowledge(this);
1037            messageExecutor.execute(message);
1038        }
1039    
1040        /**
1041         * Acknowledges all consumed messages of the session of this consumed message.
1042         * <P>
1043         * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that
1044         * its JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on
1045         * a consumed message, a client acknowledges all messages consumed by the session that the message was delivered to.
1046         * <P>
1047         * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use
1048         * implicit acknowledgement modes.
1049         * <P>
1050         * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages as
1051         * an application-defined group (which is done by calling acknowledge on the last received message of the group,
1052         * thereby acknowledging all messages consumed by the session.)
1053         * <P>
1054         * Messages that have been received but not acknowledged may be redelivered.
1055         * @param caller - the message calling acknowledge on the session
1056         * 
1057         * @throws JMSException if the JMS provider fails to acknowledge the messages due to some internal error.
1058         * @throws javax.jms.IllegalStateException if this method is called on a closed session.
1059         * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1060         */
1061        public void acknowledge(ActiveMQMessage caller) throws JMSException {
1062            checkClosed();
1063            /**
1064             * Find the caller and ensure it is marked as consumed
1065             * This is to ensure acknowledge called by a 
1066             * MessageListener works correctly
1067             */
1068            ActiveMQMessage msg = (ActiveMQMessage)deliveredMessages.get(caller);
1069            if (msg != null){
1070                msg.setMessageConsumed(true);
1071            }
1072           
1073            doAcknowledge(false);
1074        }
1075    
1076        protected void doAcknowledge(boolean isClosing) throws JMSException {
1077            if (!closed) {
1078                if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
1079                    ActiveMQMessage msg = null;
1080                    while((msg = (ActiveMQMessage)deliveredMessages.removeFirst())!=null){
1081                        boolean messageConsumed = isClosing ? false : msg.isMessageConsumed();
1082                        if (!msg.isTransientConsumed()){
1083                            sendMessageAck(msg, messageConsumed, false);
1084                        }else {
1085                            if (!messageConsumed){
1086                                connection.addToTransientConsumedRedeliverCache(msg);
1087                            }
1088                        }
1089                    }
1090                    deliveredMessages.clear();
1091                }
1092            }
1093        }
1094    
1095        protected void beforeMessageDelivered(ActiveMQMessage message) {
1096            if (message != null && !closed) {
1097                deliveredMessages.add(message);
1098            }
1099        }
1100    
1101        protected void afterMessageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed,
1102                boolean messageExpired, boolean beforeCalled) {
1103            if (message != null && !closed) {
1104                if ((isClientAcknowledge() && !messageExpired) || (isTransacted() && message.isTransientConsumed())) {
1105                    message.setMessageConsumed(messageConsumed);
1106                    if (!beforeCalled) {
1107                        deliveredMessages.add(message);
1108                    }
1109                }
1110                else {
1111                    if (beforeCalled) {
1112                        deliveredMessages.remove(message);
1113                    }
1114                }
1115                //don't send acks for expired messages unless sendAcknowledge is set
1116                //the sendAcknowledge flag is set for all messages expect those destined
1117                //for transient Topic subscribers
1118                if (sendAcknowledge && !isClientAcknowledge()) {
1119                    try {
1120                        doStartTransaction();
1121                        sendMessageAck(message,messageConsumed,messageExpired);
1122                    }
1123                    catch (JMSException e) {
1124                        log.warn("failed to notify Broker that message is delivered", e);
1125                    }
1126                }
1127            }
1128        }
1129        
1130        /**
1131         * remove a temporary destination
1132         * @param destination
1133         * @throws JMSException if active subscribers already exist
1134         */
1135        public void removeTemporaryDestination(ActiveMQDestination destination) throws JMSException{
1136            this.connection.stopTemporaryDestination(destination);
1137        }
1138        
1139        private void sendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired)
1140                throws JMSException {
1141            if (message.isMessagePart()) {
1142                ActiveMQMessage[] parts = (ActiveMQMessage[]) connection.getAssemblies().remove(message.getParentMessageID());
1143                if (parts != null) {
1144                    for (int i = 0;i < parts.length;i++) {
1145                        parts[i].setConsumerIdentifer(message.getConsumerIdentifer());
1146                        doSendMessageAck(parts[i], messageConsumed, messageExpired);
1147                    }
1148                }
1149                else {
1150                    JMSException jmsEx = new JMSException("Could not find parts for fragemented message: " + message);
1151                    connection.onException(jmsEx);
1152                }
1153            }
1154            else {
1155                doSendMessageAck(message, messageConsumed, messageExpired);
1156            }
1157        }
1158        
1159        private void doSendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired)
1160                throws JMSException {
1161            if (message != null && !message.isAdvisory()) {
1162                MessageAck ack = new MessageAck();
1163                ack.setConsumerId(message.getConsumerIdentifer());
1164                ack.setTransactionId(transactionContext.getTransactionId());
1165                ack.setExternalMessageId(message.isExternalMessageId());
1166                ack.setMessageID(message.getJMSMessageID());
1167                ack.setSequenceNumber(message.getSequenceNumber());
1168                ack.setProducerKey(message.getProducerKey());
1169                ack.setMessageRead(messageConsumed);
1170                ack.setDestination(message.getJMSActiveMQDestination());
1171                ack.setPersistent(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
1172                ack.setExpired(messageExpired);
1173                ack.setSessionId(getSessionId());
1174                this.connection.asyncSendPacket(ack);
1175            }
1176        }
1177    
1178        /**
1179         * @param consumer
1180         * @throws JMSException
1181         */
1182        protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1183            // ensure that the connection info is sent to the broker
1184            connection.sendConnectionInfoToBroker();
1185            // lets add the stat
1186            if (consumer.isDurableSubscriber()) {
1187                stats.onCreateDurableSubscriber();
1188            }
1189            ConsumerInfo info = createConsumerInfo(consumer);
1190            info.setStarted(true);
1191            //we add before notifying the server - as messages could
1192            //start to be dispatched before receipt from syncSend()
1193            //is returned
1194            this.consumers.add(consumer);
1195            if (started.get()){
1196                connection.replayTransientConsumedRedeliveredMessages(this,consumer);
1197            }
1198            try {
1199                this.connection.syncSendPacket(info);
1200            }
1201            catch (JMSException jmsEx) {
1202                this.consumers.remove(consumer);
1203                throw jmsEx;
1204            }
1205        }
1206    
1207        /**
1208         * @param consumer
1209         * @throws JMSException
1210         */
1211        protected void removeConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1212            this.consumers.remove(consumer);
1213            // lets remove the stat
1214            if (consumer.isDurableSubscriber()) {
1215                stats.onRemoveDurableSubscriber();
1216            }
1217            if (!closed) {
1218                ConsumerInfo info = createConsumerInfo(consumer);
1219                info.setStarted(false);
1220                this.connection.asyncSendPacket(info, false);
1221            }
1222        }
1223    
1224        protected ConsumerInfo createConsumerInfo(ActiveMQMessageConsumer consumer) throws JMSException {
1225            ConsumerInfo info = new ConsumerInfo();
1226            info.setConsumerId(consumer.consumerIdentifier);
1227            info.setClientId(connection.clientID);
1228            info.setSessionId(this.sessionId);
1229            info.setConsumerNo(consumer.consumerNumber);
1230            info.setPrefetchNumber(consumer.prefetchNumber);
1231            info.setDestination(consumer.destination);
1232            info.setNoLocal(consumer.noLocal);
1233            info.setBrowser(consumer.browser);
1234            info.setSelector(consumer.messageSelector);
1235            info.setStartTime(consumer.startTime);
1236            info.setConsumerName(consumer.consumerName);
1237            return info;
1238        }
1239    
1240        /**
1241         * @param producer
1242         * @throws JMSException
1243         */
1244        protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1245            // ensure that the connection info is sent to the broker
1246            connection.sendConnectionInfoToBroker();
1247            //start listening for advisories if the destination is temporary
1248            this.connection.startAdvisoryForTempDestination(producer.defaultDestination);
1249            producer.setProducerId(connection.handleIdGenerator.getNextShortSequence());
1250            ProducerInfo info = createProducerInfo(producer);
1251            info.setStarted(true);
1252            this.connection.asyncSendPacket(info);
1253            this.producers.add(producer);
1254        }
1255    
1256        /**
1257         * @param producer
1258         * @throws JMSException
1259         */
1260        protected void removeProducer(ActiveMQMessageProducer producer) throws JMSException {
1261            this.producers.remove(producer);
1262            if (!closed) {
1263                this.connection.stopAdvisoryForTempDestination(producer.defaultDestination);
1264                ProducerInfo info = createProducerInfo(producer);
1265                info.setStarted(false);
1266                this.connection.asyncSendPacket(info, false);
1267            }
1268        }
1269    
1270        protected ProducerInfo createProducerInfo(ActiveMQMessageProducer producer) throws JMSException {
1271            ProducerInfo info = new ProducerInfo();
1272            info.setProducerId(producer.getProducerId());
1273            info.setClientId(connection.clientID);
1274            info.setSessionId(this.sessionId);
1275            info.setDestination(producer.defaultDestination);
1276            info.setStartTime(producer.getStartTime());
1277            return info;
1278        }
1279    
1280        /**
1281         * Start this Session
1282         * @throws JMSException
1283         */
1284        protected void start() throws JMSException {
1285            started.set(true);
1286            for (Iterator i = consumers.iterator(); i.hasNext();){
1287                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
1288                connection.replayTransientConsumedRedeliveredMessages(this,consumer);
1289            }
1290            messageExecutor.start();
1291        }
1292    
1293        /**
1294         * Stop this Session
1295         */
1296        protected void stop() {
1297            started.set(false);
1298            messageExecutor.stop();
1299        }
1300    
1301        /**
1302         * @return Returns the sessionId.
1303         */
1304        protected short getSessionId() {
1305            return sessionId;
1306        }
1307    
1308        /**
1309         * @param sessionId The sessionId to set.
1310         */
1311        protected void setSessionId(short sessionId) {
1312            this.sessionId = sessionId;
1313        }
1314    
1315        /**
1316         * @return Returns the startTime.
1317         */
1318        protected long getStartTime() {
1319            return startTime;
1320        }
1321    
1322        /**
1323         * @param startTime The startTime to set.
1324         */
1325        protected void setStartTime(long startTime) {
1326            this.startTime = startTime;
1327        }
1328    
1329        /**
1330         * send the message for dispatch by the broker
1331         * 
1332         * @param producer
1333         * @param destination
1334         * @param message
1335         * @param deliveryMode
1336         * @param priority
1337         * @param timeToLive
1338         * @throws JMSException
1339         */
1340        protected void send(ActiveMQMessageProducer producer, Destination destination, Message message, int deliveryMode,
1341                int priority, long timeToLive, boolean reuseMessageId) throws JMSException {
1342            checkClosed();
1343            // ensure that the connection info is sent to the broker
1344            connection.sendConnectionInfoToBroker();
1345            // tell the Broker we are about to start a new transaction
1346            doStartTransaction();
1347            message.setJMSDestination(destination);
1348            message.setJMSDeliveryMode(deliveryMode);
1349            message.setJMSPriority(priority);
1350            long expiration = 0L;
1351            if (!producer.getDisableMessageTimestamp()) {
1352                long timeStamp = System.currentTimeMillis();
1353                message.setJMSTimestamp(timeStamp);
1354                if (timeToLive > 0) {
1355                    expiration = timeToLive + timeStamp;
1356                }
1357            }
1358            message.setJMSExpiration(expiration);
1359            String id = message.getJMSMessageID();
1360            String producerKey = producer.getProducerMessageKey();
1361            long sequenceNumber = producer.getIdGenerator().getNextSequence();
1362            
1363            if ((id == null || id.length() == 0) || !producer.getDisableMessageID() && !reuseMessageId) {
1364                message.setJMSMessageID(producerKey + sequenceNumber);
1365            }
1366            //transform to our own message format here
1367            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message);
1368            if (connection.isCopyMessageOnSend()){
1369                msg = msg.shallowCopy();
1370            }
1371            //clear identity - incase forwared on
1372            msg.setJMSMessageIdentity(null);
1373            msg.setExternalMessageId(id != null);
1374            msg.setSequenceNumber(sequenceNumber);
1375            msg.setProducerKey(producerKey);
1376            msg.setTransactionId(transactionContext.getTransactionId());
1377            msg.setJMSClientID(this.connection.clientID);
1378            msg.setMesssageHandle(producer.getProducerId());
1379            //reset state as could be forwarded on
1380            msg.setJMSRedelivered(false);
1381            if (!connection.isInternalConnection()){
1382                msg.clearBrokersVisited();
1383                connection.validateDestination(msg.getJMSActiveMQDestination());
1384            }
1385            
1386            if (this.connection.isPrepareMessageBodyOnSend()){
1387                msg.prepareMessageBody();
1388            }
1389            //do message payload compression
1390            if (connection.isDoMessageCompression()){
1391                try {
1392                    msg.getBodyAsBytes(compression);
1393                }
1394                catch (IOException e) {
1395                    JMSException jmsEx = new JMSException("Failed to compress message payload");
1396                    jmsEx.setLinkedException(e);
1397                    throw jmsEx;
1398                }
1399            }
1400            boolean fragmentedMessage = connection.isDoMessageFragmentation();
1401            if (fragmentedMessage && !msg.isMessagePart()){
1402                try {
1403                    ByteArrayFragmentation fragmentation = connection.getFragmentation();
1404                    fragmentedMessage = fragmentation.doFragmentation(msg.getBodyAsBytes());
1405                    if (fragmentedMessage){
1406                        ByteArray[] array = fragmentation.fragment(msg.getBodyAsBytes());
1407                        String parentMessageId = msg.getJMSMessageID();
1408                        for (int i = 0; i < array.length; i++){
1409                            ActiveMQMessage fragment = msg.shallowCopy();
1410                            fragment.setJMSMessageID(null);
1411                            fragment.setMessagePart(true);
1412                            fragment.setParentMessageID(parentMessageId);
1413                            fragment.setNumberOfParts((short)array.length);
1414                            fragment.setPartNumber((short)i);
1415                            if (i != 0){
1416                                fragment.setSequenceNumber(producer.getIdGenerator().getNextSequence());
1417                            }
1418                            fragment.setBodyAsBytes(array[i]);
1419                            if (this.connection.isUseAsyncSend()) {
1420                                this.connection.asyncSendPacket(fragment);
1421                            }
1422                            else {
1423                                this.connection.syncSendPacket(fragment);
1424                            }
1425                            
1426                        }
1427                    }
1428                }catch (IOException e) {
1429                    JMSException jmsEx = new JMSException("Failed to fragment message payload");
1430                    jmsEx.setLinkedException(e);
1431                    throw jmsEx;
1432                }
1433            }
1434            if (log.isDebugEnabled()) {
1435                log.debug("Sending message: " + msg);
1436            }
1437            
1438            if (!fragmentedMessage){
1439                if (this.connection.isUseAsyncSend()) {
1440                    this.connection.asyncSendPacket(msg);
1441                }
1442                else {
1443                    this.connection.syncSendPacket(msg);
1444                }
1445            }
1446        }
1447    
1448        /**
1449         * Send TransactionInfo to indicate transaction has started
1450         * 
1451         * @throws JMSException if some internal error occurs
1452         */
1453        protected void doStartTransaction() throws JMSException {
1454            if (getTransacted() && !transactionContext.isInXATransaction()) {
1455                transactionContext.begin();
1456            }
1457        }
1458    
1459        protected void setSessionConsumerDispatchState(int value) throws JMSException {
1460            if (consumerDispatchState != ActiveMQSession.CONSUMER_DISPATCH_UNSET && value != consumerDispatchState) {
1461                String errorStr = "Cannot mix consumer dispatching on a session - already: ";
1462                if (value == ActiveMQSession.CONSUMER_DISPATCH_SYNC) {
1463                    errorStr += "synchronous";
1464                }
1465                else {
1466                    errorStr += "asynchronous";
1467                }
1468                throw new IllegalStateException(errorStr);
1469            }
1470            consumerDispatchState = value;
1471        }
1472    
1473        protected void redeliverUnacknowledgedMessages() {
1474            redeliverUnacknowledgedMessages(false);
1475        }
1476    
1477        protected void redeliverUnacknowledgedMessages(boolean onlyDeliverTransientConsumed) {
1478            messageExecutor.stop();
1479            LinkedList replay = new LinkedList();
1480            Object obj = null;
1481            while ((obj = deliveredMessages.removeFirst()) != null) {
1482                replay.add(obj);
1483            }
1484            
1485            deliveredMessages.clear();
1486            if (!replay.isEmpty()) {
1487                for (ListIterator i = replay.listIterator(replay.size());i.hasPrevious();) {
1488                    ActiveMQMessage msg = (ActiveMQMessage) i.previous();
1489                    if (!onlyDeliverTransientConsumed || msg.isTransientConsumed()) {
1490                        msg.setJMSRedelivered(true);
1491                        msg.incrementDeliveryCount();
1492                        messageExecutor.executeFirst(msg);
1493                    }
1494                }
1495            }
1496            replay.clear();
1497            messageExecutor.start();
1498        }
1499    
1500        protected void clearMessagesInProgress() {
1501            messageExecutor.clearMessagesInProgress();
1502            for (Iterator i = consumers.iterator();i.hasNext();) {
1503                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1504                consumer.clearMessagesInProgress();
1505            }
1506        }
1507        
1508        public boolean hasUncomsumedMessages() {
1509            return messageExecutor.hasUncomsumedMessages();
1510        }
1511        
1512        public List getUnconsumedMessages() {
1513            return messageExecutor.getUnconsumedMessages();
1514        }
1515        
1516        public boolean isTransacted() {
1517            return this.acknowledgeMode == Session.SESSION_TRANSACTED;
1518        }
1519    
1520        protected boolean isClientAcknowledge() {
1521            return this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE;
1522        }
1523        
1524            /**
1525         * @return Returns the internalSession.
1526         */
1527        public boolean isInternalSession() {
1528            return internalSession;
1529        }
1530        /**
1531         * @param internalSession The internalSession to set.
1532         */
1533        public void setInternalSession(boolean internalSession) {
1534            this.internalSession = internalSession;
1535        }
1536        
1537        public DeliveryListener getDeliveryListener() {
1538            return deliveryListener;
1539        }
1540        
1541    
1542        public void setDeliveryListener(DeliveryListener deliveryListener) {
1543            this.deliveryListener = deliveryListener;
1544        }
1545        
1546    }