001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.ra;
019    
020    import java.lang.reflect.Method;
021    
022    import javax.jms.JMSException;
023    import javax.jms.Message;
024    import javax.jms.MessageListener;
025    import javax.jms.MessageProducer;
026    import javax.jms.ServerSession;
027    import javax.jms.Session;
028    import javax.resource.spi.endpoint.MessageEndpoint;
029    import javax.resource.spi.work.Work;
030    import javax.resource.spi.work.WorkEvent;
031    import javax.resource.spi.work.WorkException;
032    import javax.resource.spi.work.WorkListener;
033    import javax.resource.spi.work.WorkManager;
034    
035    import org.activemq.ActiveMQSession;
036    import org.activemq.ActiveMQSession.DeliveryListener;
037    import org.activemq.util.JMSExceptionHelper;
038    import org.apache.commons.logging.Log;
039    import org.apache.commons.logging.LogFactory;
040    
041    /**
042     * @version $Revision: 1.1.1.1 $
043     */
044    public class ServerSessionImpl implements ServerSession, SessionAndProducer, Work, DeliveryListener {
045    
046        public static final Method ON_MESSAGE_METHOD;
047    
048        static {
049            try {
050                ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[]{Message.class});
051            }
052            catch (Exception e) {
053                throw new ExceptionInInitializerError(e);
054            }
055        }
056    
057        private static int nextLogId=0;
058        synchronized static private int getNextLogId() {
059            return nextLogId++;
060        }
061    
062        private int serverSessionId = getNextLogId();
063        private final Log log = LogFactory.getLog( ServerSessionImpl.class.getName()+":"+serverSessionId );
064        
065        private ActiveMQSession session;
066        private WorkManager workManager;
067        private MessageEndpoint endpoint;
068        private MessageProducer messageProducer;
069        private final ServerSessionPoolImpl pool;
070    
071        private Object runControlMutex = new Object();
072        private boolean runningFlag = false;
073        /** 
074         * True if an error was detected that cause this session to be stale.  When a session 
075         * is stale, it should not be used again for proccessing.
076         */
077        private boolean stale;
078        /**
079         * Does the TX commit need to be managed by the RA?
080         */
081        private final boolean useRAManagedTx;
082        /**
083         * The maximum number of messages to batch
084         */
085        private final int batchSize;
086        /**
087         * The current number of messages in the batch
088         */
089        private int currentBatchSize;
090    
091        public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException {
092            this.pool = pool;
093            this.session = session;
094            this.workManager = workManager;
095            this.endpoint = endpoint;
096            this.useRAManagedTx = useRAManagedTx;
097            this.session.setMessageListener((MessageListener) endpoint);
098            this.session.setDeliveryListener(this);
099            this.batchSize = batchSize;
100        }
101    
102        public Session getSession() throws JMSException {
103            return session;
104        }
105    
106        public MessageProducer getMessageProducer() throws JMSException {
107            if (messageProducer == null) {
108                messageProducer = getSession().createProducer(null);
109            }
110            return messageProducer;
111        }
112    
113        /**
114         * @see javax.jms.ServerSession#start()
115         */
116        public void start() throws JMSException {
117    
118            synchronized (runControlMutex) {
119                if (runningFlag) {
120                    log.debug("Start request ignored, allready running.");
121                    return;
122                }
123                runningFlag = true;
124            }
125    
126            // We get here because we need to start a async worker.
127            log.debug("Starting run.");
128            try {
129                workManager.scheduleWork(this, WorkManager.INDEFINITE, null,
130                        new WorkListener() {
131                            //The work listener is useful only for debugging...
132                            public void workAccepted(WorkEvent event) {
133                                log.debug("Work accepted: " + event);
134                            }
135    
136                            public void workRejected(WorkEvent event) {
137                                log.debug("Work rejected: " + event);
138                            }
139    
140                            public void workStarted(WorkEvent event) {
141                                log.debug("Work started: " + event);
142                            }
143    
144                            public void workCompleted(WorkEvent event) {
145                                log.debug("Work completed: " + event);
146                            }
147    
148                        });
149            }
150            catch (WorkException e) {
151                throw (JMSException) new JMSException("Start failed: " + e).initCause(e);
152            }
153        }
154    
155        /**
156         * @see java.lang.Runnable#run()
157         */
158        synchronized public void run() {
159            log.debug("Running"); 
160            while (true) {
161                log.debug("run loop start");            
162                try {
163                    SessionAndProducerHelper.register(this);                
164                    currentBatchSize = 0;
165                    session.run();
166                }
167                catch (Throwable e) {
168                    stale=true;
169                    log.debug("Endpoint failed to process message.", e);
170                    log.info("Endpoint failed to process message. Reason: " + e);
171                }            
172                finally {
173                    SessionAndProducerHelper.unregister(this);                
174                    log.debug("run loop end");            
175                    synchronized (runControlMutex) {
176                        // This endpoint may have gone stale due to error
177                        if( stale) {
178                            runningFlag = false;
179                            pool.removeFromPool(this);
180                            break;
181                        }
182                        if( !session.hasUncomsumedMessages() ) {
183                            runningFlag = false;
184                            pool.returnToPool(this);
185                            break;
186                        }                
187                    }
188                }
189            }
190            log.debug("Run finished");
191        }
192    
193    
194        /**
195         * The ActiveMQSession's run method will call back to this method before 
196         * dispactching a message to the MessageListener.
197         */
198        public void beforeDelivery(ActiveMQSession session, Message msg) {
199            if (currentBatchSize == 0) {
200                    try {
201                            endpoint.beforeDelivery(ON_MESSAGE_METHOD);
202                    } catch (Throwable e) {
203                    throw new RuntimeException("Endpoint before delivery notification failure", e);
204                    }
205            }
206        }
207    
208        /**
209         * The ActiveMQSession's run method will call back to this method after 
210         * dispactching a message to the MessageListener.
211         */
212        public void afterDelivery(ActiveMQSession session, Message msg) {
213            if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) {
214                    currentBatchSize = 0;
215                    try {
216                            endpoint.afterDelivery();
217                    } catch (Throwable e) {
218                    throw new RuntimeException("Endpoint after delivery notification failure", e);
219                    } finally {
220                        if( session.getTransactionContext().isInLocalTransaction() ) {
221                            if( !useRAManagedTx ) {
222                                // Sanitiy Check: If the local transaction has not been commited..
223                                // Commit it now.
224                                log.warn("Local transaction had not been commited.  Commiting now.");
225                            }
226                            try {
227                                session.commit();
228                            } catch (JMSException e) {
229                                log.info("Commit failed:", e);
230                            }
231                        }
232                    }
233            }
234        }
235    
236        /**
237         * @see javax.resource.spi.work.Work#release()
238         */
239        public void release() {
240            log.debug("release called");
241        }
242    
243        /**
244         * @see java.lang.Object#toString()
245         */
246        public String toString() {
247            return "ServerSessionImpl:"+serverSessionId;
248        }
249    
250        public void close() {
251            try {
252                endpoint.release();
253            } catch (Throwable e) {
254                log.debug("Endpoint did not release properly: "+e,e);
255            }
256            try {
257                session.close();
258            } catch (Throwable e) {
259                log.debug("Session did not close properly: "+e,e);
260            }
261        }
262    
263    }