001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.transport.http;
019    
020    import org.apache.commons.httpclient.HttpClient;
021    import org.apache.commons.httpclient.HttpMethod;
022    import org.apache.commons.httpclient.HttpStatus;
023    import org.apache.commons.httpclient.methods.GetMethod;
024    import org.apache.commons.httpclient.methods.PostMethod;
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.activemq.io.TextWireFormat;
028    import org.activemq.message.Packet;
029    import org.activemq.util.JMSExceptionHelper;
030    
031    import javax.jms.JMSException;
032    import java.io.DataInputStream;
033    import java.io.IOException;
034    
035    /**
036     * A HTTP {@link org.activemq.transport.TransportChannel} which uses the
037     * <a href="http://jakarta.apache.org/commons/httpclient/">commons-httpclient</a> library
038     *
039     * @version $Revision$
040     */
041    public class HttpClientTransportChannel extends HttpTransportChannelSupport {
042        private static final Log log = LogFactory.getLog(HttpClientTransportChannel.class);
043    
044        private HttpClient sendHttpClient;
045        private HttpClient receiveHttpClient;
046    
047        public HttpClientTransportChannel(TextWireFormat wireFormat, String remoteUrl) {
048            super(wireFormat, remoteUrl);
049        }
050    
051        public void asyncSend(Packet packet) throws JMSException {
052            PostMethod httpMethod = new PostMethod(getRemoteUrl());
053            configureMethod(httpMethod);
054            httpMethod.setRequestBody(getTextWireFormat().toString(packet));
055            try {
056                int answer = getSendHttpClient().executeMethod(httpMethod);
057                if (answer != HttpStatus.SC_OK) {
058                    throw new JMSException("Failed to post packet: " + packet + " as response was: " + answer);
059                }
060            }
061            catch (IOException e) {
062                throw JMSExceptionHelper.newJMSException("Could not post packet: " + packet + " due to: " + e, e);
063            }
064        }
065    
066        public boolean isMulticast() {
067            return false;
068        }
069    
070        public void run() {
071            log.trace("HTTP GET consumer thread starting for clientID: " + getClientID());
072            HttpClient httpClient = getReceiveHttpClient();
073            String remoteUrl = getRemoteUrl();
074            while (!getClosed().get()) {
075                GetMethod httpMethod = new GetMethod(remoteUrl);
076                configureMethod(httpMethod);
077                try {
078                    int answer = httpClient.executeMethod(httpMethod);
079                    if (answer != HttpStatus.SC_OK) {
080                        if (answer == HttpStatus.SC_REQUEST_TIMEOUT) {
081                            log.info("GET timed out");
082                        }
083                        else {
084                            log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
085                        }
086                    }
087                    else {
088                        Packet packet = getWireFormat().readPacket(new DataInputStream(httpMethod.getResponseBodyAsStream()));
089                        if (packet == null) {
090                            log.warn("Received null packet from url: " + remoteUrl);
091                        }
092                        else {
093                            doConsumePacket(packet);
094                        }
095                    }
096                }
097                catch (IOException e) {
098                    log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
099                }
100            }
101        }
102    
103        // Properties
104        //-------------------------------------------------------------------------
105        public HttpClient getSendHttpClient() {
106            if (sendHttpClient == null) {
107                sendHttpClient = createHttpClient();
108            }
109            return sendHttpClient;
110        }
111    
112        public void setSendHttpClient(HttpClient sendHttpClient) {
113            this.sendHttpClient = sendHttpClient;
114        }
115    
116        public HttpClient getReceiveHttpClient() {
117            if (receiveHttpClient == null) {
118                receiveHttpClient = createHttpClient();
119            }
120            return receiveHttpClient;
121        }
122    
123        public void setReceiveHttpClient(HttpClient receiveHttpClient) {
124            this.receiveHttpClient = receiveHttpClient;
125        }
126    
127        // Implementation methods
128        //-------------------------------------------------------------------------
129        protected HttpClient createHttpClient() {
130            return new HttpClient();
131        }
132    
133        protected void configureMethod(HttpMethod method) {
134            String clientID = getClientID();
135            if (clientID != null) {
136                method.setRequestHeader("clientID", clientID);
137            }
138        }
139        
140            public void forceDisconnect() {
141                // TODO: implement me.
142                    throw new RuntimeException("Not yet Implemented.");
143            }
144        
145    }