001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.ra;
019    
020    import org.apache.commons.logging.Log;
021    import org.apache.commons.logging.LogFactory;
022    import org.activemq.ActiveMQSession;
023    import org.activemq.message.ActiveMQMessage;
024    
025    import javax.jms.JMSException;
026    import javax.jms.MessageListener;
027    import javax.jms.MessageProducer;
028    import javax.jms.Session;
029    import javax.resource.ResourceException;
030    import javax.resource.spi.endpoint.MessageEndpoint;
031    import javax.resource.spi.work.Work;
032    
033    /**
034     * @version $Revision: 1.1.1.1 $
035     */
036    public class InboundEndpointWork implements SessionAndProducer, Work {
037    
038        private static final Log log = LogFactory.getLog(InboundEndpointWork.class);
039    
040        private final ActiveMQSession session;
041        private final MessageEndpoint endpoint;
042        private final CircularQueue workers;
043        private MessageProducer messageProducer;
044        private ActiveMQMessage message;
045    
046    
047        /**
048         * @param session
049         * @param endpoint
050         * @param workers
051         * @throws javax.jms.JMSException
052         */
053        public InboundEndpointWork(ActiveMQSession session, MessageEndpoint endpoint, CircularQueue workers) throws JMSException {
054            this.session = session;
055            this.endpoint = endpoint;
056            this.workers = workers;
057            session.setMessageListener((MessageListener) endpoint);
058        }
059    
060        public Session getSession() {
061            return session;
062        }
063    
064        public MessageProducer getMessageProducer() throws JMSException {
065            if (messageProducer == null) {
066                messageProducer = getSession().createProducer(null);
067            }
068            return messageProducer;
069        }
070    
071        public ActiveMQMessage getMessage() {
072            return message;
073        }
074    
075        public void setMessage(ActiveMQMessage message) {
076            this.message = message;
077        }
078    
079        public void release() {
080        }
081    
082        /**
083         * @see java.lang.Runnable#run()
084         */
085        public void run() {
086            try {
087    
088                SessionAndProducerHelper.register(this);
089                endpoint.beforeDelivery(ActiveMQBaseEndpointWorker.ON_MESSAGE_METHOD);
090                try {
091                    session.dispatch(message);
092                    session.run();
093                }
094                finally {
095                    endpoint.afterDelivery();
096                    SessionAndProducerHelper.unregister(this);
097                }
098    
099            }
100            catch (NoSuchMethodException e) {
101                log.info("worker: ", e);
102            }
103            catch (ResourceException e) {
104                log.info("worker: ", e);
105            }
106            finally {
107                workers.returnObject(this);
108            }
109        }
110    
111    }