001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.ra; 019 020 import org.apache.commons.logging.Log; 021 import org.apache.commons.logging.LogFactory; 022 import org.activemq.ActiveMQSession; 023 import org.activemq.message.ActiveMQMessage; 024 025 import javax.jms.JMSException; 026 import javax.jms.MessageListener; 027 import javax.jms.MessageProducer; 028 import javax.jms.Session; 029 import javax.resource.ResourceException; 030 import javax.resource.spi.endpoint.MessageEndpoint; 031 import javax.resource.spi.work.Work; 032 033 /** 034 * @version $Revision: 1.1.1.1 $ 035 */ 036 public class InboundEndpointWork implements SessionAndProducer, Work { 037 038 private static final Log log = LogFactory.getLog(InboundEndpointWork.class); 039 040 private final ActiveMQSession session; 041 private final MessageEndpoint endpoint; 042 private final CircularQueue workers; 043 private MessageProducer messageProducer; 044 private ActiveMQMessage message; 045 046 047 /** 048 * @param session 049 * @param endpoint 050 * @param workers 051 * @throws javax.jms.JMSException 052 */ 053 public InboundEndpointWork(ActiveMQSession session, MessageEndpoint endpoint, CircularQueue workers) throws JMSException { 054 this.session = session; 055 this.endpoint = endpoint; 056 this.workers = workers; 057 session.setMessageListener((MessageListener) endpoint); 058 } 059 060 public Session getSession() { 061 return session; 062 } 063 064 public MessageProducer getMessageProducer() throws JMSException { 065 if (messageProducer == null) { 066 messageProducer = getSession().createProducer(null); 067 } 068 return messageProducer; 069 } 070 071 public ActiveMQMessage getMessage() { 072 return message; 073 } 074 075 public void setMessage(ActiveMQMessage message) { 076 this.message = message; 077 } 078 079 public void release() { 080 } 081 082 /** 083 * @see java.lang.Runnable#run() 084 */ 085 public void run() { 086 try { 087 088 SessionAndProducerHelper.register(this); 089 endpoint.beforeDelivery(ActiveMQBaseEndpointWorker.ON_MESSAGE_METHOD); 090 try { 091 session.dispatch(message); 092 session.run(); 093 } 094 finally { 095 endpoint.afterDelivery(); 096 SessionAndProducerHelper.unregister(this); 097 } 098 099 } 100 catch (NoSuchMethodException e) { 101 log.info("worker: ", e); 102 } 103 catch (ResourceException e) { 104 log.info("worker: ", e); 105 } 106 finally { 107 workers.returnObject(this); 108 } 109 } 110 111 }