001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.web;
020    
021    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
022    import org.apache.commons.logging.Log;
023    import org.apache.commons.logging.LogFactory;
024    import org.activemq.ActiveMQConnection;
025    import org.activemq.ActiveMQConnectionFactory;
026    import org.activemq.ActiveMQSession;
027    
028    import javax.jms.ConnectionFactory;
029    import javax.jms.DeliveryMode;
030    import javax.jms.Destination;
031    import javax.jms.JMSException;
032    import javax.jms.Message;
033    import javax.jms.MessageConsumer;
034    import javax.jms.MessageProducer;
035    import javax.jms.Session;
036    import javax.jms.Topic;
037    import javax.servlet.ServletContext;
038    import javax.servlet.http.HttpSession;
039    import javax.servlet.http.HttpSessionActivationListener;
040    import javax.servlet.http.HttpSessionEvent;
041    import java.io.Externalizable;
042    import java.io.IOException;
043    import java.io.ObjectInput;
044    import java.io.ObjectOutput;
045    import java.util.HashMap;
046    import java.util.Map;
047    
048    /**
049     * Represents a messaging client used from inside a web container
050     * typically stored inside a HttpSession
051     *
052     * @version $Revision: 1.1.1.1 $
053     */
054    public class WebClient implements HttpSessionActivationListener, Externalizable {
055        public static final String webClientAttribute = "org.activemq.webclient";
056        public static final String connectionFactoryAttribute = "org.activemq.connectionFactory";
057        public static final String queueConsumersAttribute = "org.activemq.queueConsumers";
058        public static final String brokerUrlInitParam = "org.activemq.brokerURL";
059        public static final String embeddedBrokerInitParam = "org.activemq.embeddedBroker";
060    
061        private static final Log log = LogFactory.getLog(WebClient.class);
062    
063        private static transient ConnectionFactory factory;
064        private static transient Map queueConsumers;
065    
066        private transient ServletContext context;
067        private transient ActiveMQConnection connection;
068        private transient ActiveMQSession session;
069        private transient MessageProducer producer;
070        private transient Map topicConsumers = new ConcurrentHashMap();
071        private int deliveryMode = DeliveryMode.NON_PERSISTENT;
072    
073    
074        /**
075         * @return the web client for the current HTTP session or null if there is not a web client created yet
076         */
077        public static WebClient getWebClient(HttpSession session) {
078            return (WebClient) session.getAttribute(webClientAttribute);
079        }
080    
081    
082        public static void initContext(ServletContext context) {
083            factory = initConnectionFactory(context);
084            if (factory == null) {
085                log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute);
086                factory = new ActiveMQConnectionFactory("vm://localhost");
087                context.setAttribute(connectionFactoryAttribute, factory);
088            }
089            queueConsumers = initQueueConsumers(context);
090        }
091    
092        /**
093         * Only called by serialization
094         */
095        public WebClient() {
096        }
097    
098        public WebClient(ServletContext context) {
099            this.context = context;
100            initContext(context);
101        }
102    
103        
104        public int getDeliveryMode() {
105            return deliveryMode;
106        }
107    
108    
109        public void setDeliveryMode(int deliveryMode) {
110            this.deliveryMode = deliveryMode;
111        }
112    
113    
114        public void start() throws JMSException {
115        }
116    
117        public void stop() throws JMSException {
118            System.out.println("Closing the WebClient!!! " + this);
119            
120            try {
121                connection.close();
122            }
123            finally {
124                producer = null;
125                session = null;
126                connection = null;
127                topicConsumers.clear();
128            }
129        }
130    
131        public void writeExternal(ObjectOutput out) throws IOException {
132        }
133    
134        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
135            topicConsumers = new HashMap();
136        }
137    
138        public void send(Destination destination, Message message) throws JMSException {
139            if (producer == null) {
140                producer = getSession().createProducer(null);
141                producer.setDeliveryMode(deliveryMode );
142            }
143            log.info("Sending to destination: " + destination);
144            producer.send(destination, message);
145            log.info("Sent! message: " + message);
146        }
147    
148        public Session getSession() throws JMSException {
149            if (session == null) {
150                session = createSession();
151            }
152            return session;
153        }
154    
155        public ActiveMQConnection getConnection() throws JMSException {
156            if (connection == null) {
157                connection = (ActiveMQConnection) factory.createConnection();
158                connection.start();
159            }
160            return connection;
161        }
162    
163        public void sessionWillPassivate(HttpSessionEvent event) {
164            try {
165                stop();
166            }
167            catch (JMSException e) {
168                log.warn("Could not close connection: " + e, e);
169            }
170        }
171    
172        public void sessionDidActivate(HttpSessionEvent event) {
173            // lets update the connection factory from the servlet context
174            context = event.getSession().getServletContext();
175            initContext(context);
176        }
177    
178        public static Map initQueueConsumers(ServletContext context) {
179            Map answer = (Map) context.getAttribute(queueConsumersAttribute);
180            if (answer == null) {
181                answer = new HashMap();
182                context.setAttribute(queueConsumersAttribute, answer);
183            }
184            return answer;
185        }
186    
187    
188        public static ConnectionFactory initConnectionFactory(ServletContext servletContext) {
189            ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
190            if (connectionFactory == null) {
191                String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam);
192    
193                servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
194    
195                if (brokerURL == null) {
196                    brokerURL = "vm://localhost";
197                }
198    
199                boolean embeddedBroker = MessageServletSupport.asBoolean(servletContext.getInitParameter(embeddedBrokerInitParam));
200                servletContext.log("Use embedded broker: " + embeddedBroker);
201    
202                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
203                factory.setUseEmbeddedBroker(embeddedBroker);
204    
205                connectionFactory = factory;
206                servletContext.setAttribute(connectionFactoryAttribute, connectionFactory);
207            }
208            return connectionFactory;
209        }
210    
211        public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
212            if (destination instanceof Topic) {
213                MessageConsumer consumer = (MessageConsumer) topicConsumers.get(destination);
214                if (consumer == null) {
215                    consumer = getSession().createConsumer(destination);
216                    topicConsumers.put(destination, consumer);
217                }
218                return consumer;
219            }
220            else {
221                synchronized (queueConsumers) {
222                    SessionConsumerPair pair = (SessionConsumerPair) queueConsumers.get(destination);
223                    if (pair == null) {
224                        pair = createSessionConsumerPair(destination);
225                        queueConsumers.put(destination, pair);
226                    }
227                    return pair.consumer;
228                }
229            }
230        }
231    
232        protected ActiveMQSession createSession() throws JMSException {
233            return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
234        }
235    
236        protected SessionConsumerPair createSessionConsumerPair(Destination destination) throws JMSException {
237            SessionConsumerPair answer = new SessionConsumerPair();
238            answer.session = createSession();
239            answer.consumer = answer.session.createConsumer(destination);
240            return answer;
241        }
242    
243        protected static class SessionConsumerPair {
244            public Session session;
245            public MessageConsumer consumer;
246        }
247    }