001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.io; 020 import java.io.ByteArrayInputStream; 021 import java.io.ByteArrayOutputStream; 022 import java.io.DataInput; 023 import java.io.DataInputStream; 024 import java.io.DataOutputStream; 025 import java.io.IOException; 026 import java.net.DatagramPacket; 027 028 import javax.jms.JMSException; 029 030 import org.activemq.message.Packet; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 034 /** 035 * Represents a strategy of encoding packets on the wire or on disk using some kind of serialization or wire format. 036 * <p/>We use a default efficient format for Java to Java communication but other formats to other systems can be used, 037 * such as using simple text strings when talking to JavaScript or coming up with other formats for talking to C / C# 038 * languages or proprietary messaging systems we wish to interface with at the wire level etc. 039 * 040 * @version $Revision: 1.1.1.1 $ 041 */ 042 public abstract class AbstractWireFormat implements WireFormat { 043 private static final Log log = LogFactory.getLog(AbstractWireFormat.class); 044 protected DataOutputStream transportDataOut; 045 protected DataInputStream transportDataIn; 046 protected boolean cachingEnabled; 047 048 /** 049 * Read a packet from a Datagram packet from the given channelID. If the packet is from the same channel ID as it 050 * was sent then we have a loop-back so discard the packet 051 * 052 * @param channelID is the unique channel ID 053 * @param dpacket 054 * @return the packet read from the datagram or null if it should be discarded 055 * @throws IOException 056 */ 057 public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException { 058 DataInput in = new DataInputStream(new ByteArrayInputStream(dpacket.getData(), dpacket.getOffset(), dpacket 059 .getLength())); 060 String id = in.readUTF(); 061 if (channelID == null) { 062 log 063 .trace("We do not have a channelID which is probably caused by a synchronization issue, we're receiving messages before we're fully initialised"); 064 } 065 else if (channelID.equals(id)) { 066 if (log.isTraceEnabled()) { 067 log.trace("Discarding packet from id: " + id); 068 } 069 return null; 070 } 071 int type = in.readByte(); 072 Packet packet = readPacket(type, in); 073 // if (packet instanceof ActiveMQMessage) { 074 // System.out.println("##### read packet from channel: " + id + " in channel: " + channelID + " message: " + 075 // packet); 076 // } 077 // 078 return packet; 079 } 080 081 /** 082 * Writes the given package to a new datagram 083 * 084 * @param channelID is the unique channel ID 085 * @param packet is the packet to write 086 * @return @throws IOException 087 * @throws JMSException 088 */ 089 public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException { 090 ByteArrayOutputStream out = new ByteArrayOutputStream(); 091 DataOutputStream dataOut = new DataOutputStream(out); 092 channelID = channelID != null ? channelID : ""; 093 dataOut.writeUTF(channelID); 094 writePacket(packet, dataOut); 095 dataOut.close(); 096 byte[] data = out.toByteArray(); 097 return new DatagramPacket(data, data.length); 098 } 099 100 /** 101 * Reads the packet from the given byte[] 102 * 103 * @param bytes 104 * @param offset 105 * @param length 106 * @return @throws IOException 107 */ 108 public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException { 109 DataInput in = new DataInputStream(new ByteArrayInputStream(bytes, offset, length)); 110 return readPacket(in); 111 } 112 113 /** 114 * Reads the packet from the given byte[] 115 * 116 * @param bytes 117 * @return @throws IOException 118 */ 119 public Packet fromBytes(byte[] bytes) throws IOException { 120 DataInput in = new DataInputStream(new ByteArrayInputStream(bytes)); 121 return readPacket(in); 122 } 123 124 /** 125 * A helper method which converts a packet into a byte array 126 * 127 * @param packet 128 * @return a byte array representing the packet using some wire protocol 129 * @throws IOException 130 * @throws JMSException 131 */ 132 public byte[] toBytes(Packet packet) throws IOException, JMSException { 133 ByteArrayOutputStream out = new ByteArrayOutputStream(); 134 DataOutputStream dataOut = new DataOutputStream(out); 135 writePacket(packet, dataOut); 136 dataOut.close(); 137 return out.toByteArray(); 138 } 139 140 /** 141 * some transports may register their streams (e.g. Tcp) 142 * 143 * @param dataOut 144 * @param dataIn 145 */ 146 public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn) { 147 transportDataOut = dataOut; 148 transportDataIn = dataIn; 149 } 150 151 /** 152 * Some wire formats require a handshake at start-up 153 * 154 * @throws IOException 155 */ 156 public void initiateClientSideProtocol() throws IOException { 157 } 158 159 /** 160 * Some wire formats require a handshake at start-up 161 * 162 * @throws IOException 163 */ 164 public void initiateServerSideProtocol() throws IOException { 165 } 166 167 168 /** 169 * @return Returns the enableCaching. 170 */ 171 public boolean isCachingEnabled() { 172 return cachingEnabled; 173 } 174 175 /** 176 * @param enableCaching The enableCaching to set. 177 */ 178 public void setCachingEnabled(boolean enableCaching) { 179 this.cachingEnabled = enableCaching; 180 } 181 182 /** 183 * some wire formats will implement their own fragementation 184 * @return true unless a wire format supports it's own fragmentation 185 */ 186 public boolean doesSupportMessageFragmentation(){ 187 return true; 188 } 189 190 191 /** 192 * Some wire formats will not be able to understand compressed messages 193 * @return true unless a wire format cannot understand compression 194 */ 195 public boolean doesSupportMessageCompression(){ 196 return true; 197 } 198 /** 199 * @return Returns the transportDataOut. 200 */ 201 public DataOutputStream getTransportDataOut() { 202 return transportDataOut; 203 } 204 /** 205 * @param transportDataOut The transportDataOut to set. 206 */ 207 public void setTransportDataOut(DataOutputStream transportDataOut) { 208 this.transportDataOut = transportDataOut; 209 } 210 /** 211 * @return Returns the transportDataIn. 212 */ 213 public DataInputStream getTransportDataIn() { 214 return transportDataIn; 215 } 216 /** 217 * @param transportDataIn The transportDataIn to set. 218 */ 219 public void setTransportDataIn(DataInputStream transportDataIn) { 220 this.transportDataIn = transportDataIn; 221 } 222 223 /** 224 * @param dataIn 225 * @return 226 * @throws java.io.IOException 227 */ 228 public Packet readPacket(DataInput dataIn) throws IOException { 229 int type = -1; 230 while ((type = dataIn.readByte()) == 0); 231 232 if (type == -1){ 233 throw new IOException("InputStream now closed"); 234 } 235 return readPacket(type, dataIn); 236 } 237 }