001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.io.impl; 019 020 import org.activemq.message.Packet; 021 import org.activemq.message.AbstractPacket; 022 import org.activemq.message.ActiveMQDestination; 023 import org.activemq.util.SerializationHelper; 024 import org.activemq.util.BitArray; 025 026 import java.io.IOException; 027 import java.io.DataOutput; 028 import java.io.ByteArrayOutputStream; 029 import java.io.DataOutputStream; 030 import java.io.DataInput; 031 032 /** 033 * @version $Revision: 1.1 $ 034 */ 035 public abstract class AbstractPacketMarshaller extends AbstractPacketReader implements PacketWriter { 036 protected int wireFormatVersion = DefaultWireFormat.WIRE_FORMAT_VERSION; 037 038 039 /** 040 * simple helper method to ensure null strings are catered for 041 * 042 * @param str 043 * @param dataOut 044 * @throws IOException 045 */ 046 protected void writeUTF(String str, DataOutput dataOut) throws IOException { 047 if (str == null) { 048 str = ""; 049 } 050 dataOut.writeUTF(str); 051 } 052 053 /** 054 * Reads a destination from the input stream 055 */ 056 protected ActiveMQDestination readDestination(DataInput dataIn) throws IOException { 057 return ActiveMQDestination.readFromStream(dataIn); 058 } 059 060 /** 061 * Writes the given destination to the stream 062 */ 063 protected void writeDestination(ActiveMQDestination destination, DataOutput dataOut) throws IOException { 064 ActiveMQDestination.writeToStream(destination, dataOut); 065 } 066 067 /** 068 * @param packet 069 * @return true if this PacketWriter can write this type of Packet 070 */ 071 public boolean canWrite(Packet packet) { 072 return packet.getPacketType() == this.getPacketType(); 073 } 074 075 /** 076 * Simple (but inefficent) utility method to write an object on to a stream 077 * 078 * @param object 079 * @param dataOut 080 * @throws IOException 081 */ 082 protected void writeObject(Object object, DataOutput dataOut) throws IOException { 083 if (object != null) { 084 byte[] data = SerializationHelper.writeObject(object); 085 dataOut.writeInt(data.length); 086 dataOut.write(data); 087 } 088 else { 089 dataOut.writeInt(0); 090 } 091 } 092 093 /** 094 * Serializes a Packet int a byte array 095 * 096 * @param packet 097 * @return the byte[] 098 * @throws IOException 099 */ 100 public byte[] writePacketToByteArray(Packet packet) throws IOException { 101 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 102 DataOutputStream dataOut = new DataOutputStream(bytesOut); 103 writePacket(packet, dataOut); 104 dataOut.flush(); 105 return bytesOut.toByteArray(); 106 } 107 108 /** 109 * Write a Packet instance to data output stream 110 * 111 * @param p the instance to be seralized 112 * @param dataOut the output stream 113 * @throws IOException thrown if an error occurs 114 */ 115 public void writePacket(Packet p, DataOutput dataOut) throws IOException { 116 AbstractPacket packet = (AbstractPacket)p; 117 dataOut.writeShort(packet.getId()); 118 BitArray ba = packet.getBitArray(); 119 ba.set(AbstractPacket.RECEIPT_REQUIRED_INDEX, packet.isReceiptRequired()); 120 Object[] visited = packet.getBrokersVisited(); 121 boolean writeVisited = visited != null && visited.length > 0; 122 ba.set(AbstractPacket.BROKERS_VISITED_INDEX,writeVisited); 123 ba.writeToStream(dataOut); 124 if (writeVisited){ 125 dataOut.writeShort(visited.length); 126 for(int i =0; i < visited.length; i++){ 127 dataOut.writeUTF(visited[i].toString()); 128 } 129 } 130 } 131 132 /** 133 * Set the wire format version 134 * @param version 135 */ 136 public void setWireFormatVersion(int version){ 137 this.wireFormatVersion = version; 138 } 139 140 /** 141 * @return the wire format version 142 */ 143 public int getWireFormatVersion(){ 144 return wireFormatVersion; 145 } 146 147 }