001 /** 002 * 003 * Copyright 2005 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.stomp; 020 021 import EDU.oswego.cs.dl.util.concurrent.Executor; 022 023 import org.activemq.message.Packet; 024 import org.activemq.transport.tcp.TcpTransportChannel; 025 import org.activemq.transport.tcp.TcpTransportServerChannel; 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 029 import javax.jms.ExceptionListener; 030 import javax.jms.JMSException; 031 import java.net.Socket; 032 import java.net.URI; 033 import java.io.IOException; 034 035 /** 036 * A transport for using Stomp to talk to ActiveMQ 037 * 038 * @version $Revision: 1.1 $ 039 */ 040 public class StompTransportChannel extends TcpTransportChannel { 041 private static final Log log = LogFactory.getLog(StompTransportChannel.class); 042 043 public StompTransportChannel() { 044 super(new StompWireFormat()); 045 } 046 047 public StompTransportChannel(URI remoteLocation) throws JMSException { 048 super(new StompWireFormat(), remoteLocation); 049 } 050 051 public StompTransportChannel(URI remoteLocation, URI localLocation) throws JMSException { 052 super(new StompWireFormat(), remoteLocation, localLocation); 053 } 054 055 public StompTransportChannel(TcpTransportServerChannel serverChannel, Socket socket, Executor executor) 056 throws JMSException { 057 super(serverChannel, new StompWireFormat(), socket, executor); 058 } 059 060 public StompTransportChannel(Socket socket, Executor executor) throws JMSException { 061 super(new StompWireFormat(), socket, executor); 062 } 063 064 public StompWireFormat getTTMPWireFormat() { 065 return (StompWireFormat) getWireFormat(); 066 } 067 068 protected void readWireFormat() throws JMSException, IOException { 069 // no need to read wire format from wire 070 } 071 072 protected void doConsumePacket(Packet packet) { 073 if( packet == FlushPacket.PACKET ) { 074 try { 075 doAsyncSend(null); 076 } catch (JMSException e) { 077 ExceptionListener listener = getExceptionListener(); 078 if (listener != null) { 079 listener.onException(e); 080 } 081 else { 082 log.warn("No listener to report error consuming packet: " + e, e); 083 } 084 } 085 } else { 086 super.doConsumePacket(packet); 087 } 088 } 089 090 }