001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.transport.http; 019 020 import org.apache.commons.httpclient.HttpClient; 021 import org.apache.commons.httpclient.HttpMethod; 022 import org.apache.commons.httpclient.HttpStatus; 023 import org.apache.commons.httpclient.methods.GetMethod; 024 import org.apache.commons.httpclient.methods.PostMethod; 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.activemq.io.TextWireFormat; 028 import org.activemq.message.Packet; 029 import org.activemq.util.JMSExceptionHelper; 030 031 import javax.jms.JMSException; 032 import java.io.DataInputStream; 033 import java.io.IOException; 034 035 /** 036 * A HTTP {@link org.activemq.transport.TransportChannel} which uses the 037 * <a href="http://jakarta.apache.org/commons/httpclient/">commons-httpclient</a> library 038 * 039 * @version $Revision$ 040 */ 041 public class HttpClientTransportChannel extends HttpTransportChannelSupport { 042 private static final Log log = LogFactory.getLog(HttpClientTransportChannel.class); 043 044 private HttpClient sendHttpClient; 045 private HttpClient receiveHttpClient; 046 047 public HttpClientTransportChannel(TextWireFormat wireFormat, String remoteUrl) { 048 super(wireFormat, remoteUrl); 049 } 050 051 public void asyncSend(Packet packet) throws JMSException { 052 PostMethod httpMethod = new PostMethod(getRemoteUrl()); 053 configureMethod(httpMethod); 054 httpMethod.setRequestBody(getTextWireFormat().toString(packet)); 055 try { 056 int answer = getSendHttpClient().executeMethod(httpMethod); 057 if (answer != HttpStatus.SC_OK) { 058 throw new JMSException("Failed to post packet: " + packet + " as response was: " + answer); 059 } 060 } 061 catch (IOException e) { 062 throw JMSExceptionHelper.newJMSException("Could not post packet: " + packet + " due to: " + e, e); 063 } 064 } 065 066 public boolean isMulticast() { 067 return false; 068 } 069 070 public void run() { 071 log.trace("HTTP GET consumer thread starting for clientID: " + getClientID()); 072 HttpClient httpClient = getReceiveHttpClient(); 073 String remoteUrl = getRemoteUrl(); 074 while (!getClosed().get()) { 075 GetMethod httpMethod = new GetMethod(remoteUrl); 076 configureMethod(httpMethod); 077 try { 078 int answer = httpClient.executeMethod(httpMethod); 079 if (answer != HttpStatus.SC_OK) { 080 if (answer == HttpStatus.SC_REQUEST_TIMEOUT) { 081 log.info("GET timed out"); 082 } 083 else { 084 log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer); 085 } 086 } 087 else { 088 Packet packet = getWireFormat().readPacket(new DataInputStream(httpMethod.getResponseBodyAsStream())); 089 if (packet == null) { 090 log.warn("Received null packet from url: " + remoteUrl); 091 } 092 else { 093 doConsumePacket(packet); 094 } 095 } 096 } 097 catch (IOException e) { 098 log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e); 099 } 100 } 101 } 102 103 // Properties 104 //------------------------------------------------------------------------- 105 public HttpClient getSendHttpClient() { 106 if (sendHttpClient == null) { 107 sendHttpClient = createHttpClient(); 108 } 109 return sendHttpClient; 110 } 111 112 public void setSendHttpClient(HttpClient sendHttpClient) { 113 this.sendHttpClient = sendHttpClient; 114 } 115 116 public HttpClient getReceiveHttpClient() { 117 if (receiveHttpClient == null) { 118 receiveHttpClient = createHttpClient(); 119 } 120 return receiveHttpClient; 121 } 122 123 public void setReceiveHttpClient(HttpClient receiveHttpClient) { 124 this.receiveHttpClient = receiveHttpClient; 125 } 126 127 // Implementation methods 128 //------------------------------------------------------------------------- 129 protected HttpClient createHttpClient() { 130 return new HttpClient(); 131 } 132 133 protected void configureMethod(HttpMethod method) { 134 String clientID = getClientID(); 135 if (clientID != null) { 136 method.setRequestHeader("clientID", clientID); 137 } 138 } 139 140 public void forceDisconnect() { 141 // TODO: implement me. 142 throw new RuntimeException("Not yet Implemented."); 143 } 144 145 }