001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.transport.http; 019 020 import java.io.BufferedReader; 021 import java.io.DataOutputStream; 022 import java.io.IOException; 023 import java.util.HashMap; 024 import java.util.Map; 025 026 import javax.jms.JMSException; 027 import javax.servlet.ServletException; 028 import javax.servlet.http.HttpServlet; 029 import javax.servlet.http.HttpServletRequest; 030 import javax.servlet.http.HttpServletResponse; 031 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 import org.activemq.io.TextWireFormat; 035 import org.activemq.message.Packet; 036 import org.activemq.message.WireFormatInfo; 037 import org.activemq.message.PacketListener; 038 import org.activemq.transport.TransportChannelListener; 039 import org.activemq.transport.xstream.XStreamWireFormat; 040 import org.activemq.util.JMSExceptionHelper; 041 042 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; 043 044 /** 045 * A servlet which handles server side HTTP transport, delegaging to the ActiveMQ broker. 046 * This servlet is designed for being embedded inside an ActiveMQ Broker using an embedded 047 * Jetty or Tomcat instance. 048 * 049 * @version $Revision$ 050 */ 051 public class HttpTunnelServlet extends HttpServlet { 052 053 private static final Log log = LogFactory.getLog(HttpTunnelServlet.class); 054 055 private TransportChannelListener listener; 056 private TextWireFormat wireFormat; 057 private Map clients = new HashMap(); 058 private long requestTimeout = 30000L; 059 060 public void init() throws ServletException { 061 super.init(); 062 listener = (TransportChannelListener) getServletContext().getAttribute("transportChannelListener"); 063 if (listener == null) { 064 throw new ServletException("No such attribute 'transportChannelListener' available in the ServletContext"); 065 } 066 wireFormat = (TextWireFormat) getServletContext().getAttribute("wireFormat"); 067 if (wireFormat == null) { 068 wireFormat = createWireFormat(); 069 } 070 } 071 072 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 073 // lets return the next response 074 Packet packet = null; 075 try { 076 HttpServerTransportChannel transportChannel = getTransportChannel(request); 077 if (transportChannel == null) { 078 return; 079 } 080 packet = (Packet) transportChannel.getChannel().poll(requestTimeout); 081 } 082 catch (InterruptedException e) { 083 // ignore 084 } 085 if (packet == null) { 086 response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT); 087 } 088 else { 089 try { 090 wireFormat.writePacket(packet, new DataOutputStream(response.getOutputStream())); 091 } 092 catch (JMSException e) { 093 throw JMSExceptionHelper.newIOException(e); 094 } 095 } 096 } 097 098 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 099 try { 100 Packet packet = wireFormat.fromString(readRequestBody(request)); 101 102 if( packet.getPacketType() == Packet.WIRE_FORMAT_INFO ) { 103 104 // Can we handle the requested wire format? 105 WireFormatInfo info = (WireFormatInfo) packet; 106 if (!canProcessWireFormatVersion(info.getVersion())) { 107 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion()); 108 } 109 110 } else { 111 HttpServerTransportChannel transportChannel = getTransportChannel(request); 112 if (transportChannel == null) { 113 response.setStatus(HttpServletResponse.SC_NOT_FOUND); 114 } 115 else { 116 PacketListener packetListener = transportChannel.getPacketListener(); 117 if (packetListener == null) { 118 log.error("No packetListener available to process inbound packet: " + packet); 119 } 120 else { 121 packetListener.consume(packet); 122 } 123 } 124 } 125 } 126 catch (IOException e) { 127 log.error("Caught: " + e, e); 128 } 129 catch (JMSException e) { 130 throw JMSExceptionHelper.newIOException(e); 131 } 132 } 133 134 /** 135 * @param version 136 * @return 137 */ 138 private boolean canProcessWireFormatVersion(int version) { 139 // TODO: 140 return true; 141 } 142 143 protected String readRequestBody(HttpServletRequest request) throws IOException { 144 StringBuffer buffer = new StringBuffer(); 145 BufferedReader reader = request.getReader(); 146 while (true) { 147 String line = reader.readLine(); 148 if (line == null) { 149 break; 150 } 151 else { 152 buffer.append(line); 153 buffer.append("\n"); 154 } 155 } 156 return buffer.toString(); 157 } 158 159 protected HttpServerTransportChannel getTransportChannel(HttpServletRequest request) { 160 String clientID = request.getHeader("clientID"); 161 if (clientID == null) { 162 clientID = request.getParameter("clientID"); 163 } 164 if (clientID == null) { 165 log.warn("No clientID header so ignoring request"); 166 return null; 167 } 168 synchronized (this) { 169 HttpServerTransportChannel answer = (HttpServerTransportChannel) clients.get(clientID); 170 if (answer == null) { 171 answer = createTransportChannel(); 172 clients.put(clientID, answer); 173 listener.addClient(answer); 174 } 175 else { 176 // this lookup should keep the client alive, otherwise we need to discard it 177 keepAlivePing(answer); 178 } 179 return answer; 180 } 181 } 182 183 /** 184 * Disable this channel from being auto-disconnected after a timeout period 185 */ 186 protected void keepAlivePing(HttpServerTransportChannel channel) { 187 /** TODO */ 188 } 189 190 protected HttpServerTransportChannel createTransportChannel() { 191 return new HttpServerTransportChannel(new BoundedLinkedQueue(10)); 192 } 193 194 protected TextWireFormat createWireFormat() { 195 return new XStreamWireFormat(); 196 } 197 }