001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.io.impl; 019 import java.io.ByteArrayOutputStream; 020 import java.io.DataOutput; 021 import java.io.DataOutputStream; 022 import java.io.IOException; 023 024 import org.activemq.message.AbstractPacket; 025 import org.activemq.message.Packet; 026 import org.activemq.util.BitArray; 027 import org.activemq.util.SerializationHelper; 028 029 /** 030 * Allows instances implementing Packet interface to be serailized/deserailized 031 */ 032 public abstract class AbstractPacketWriter implements PacketWriter { 033 protected int wireFormatVersion = DefaultWireFormat.WIRE_FORMAT_VERSION; 034 035 /** 036 * simple helper method to ensure null strings are catered for 037 * 038 * @param str 039 * @param dataOut 040 * @throws IOException 041 */ 042 protected void writeUTF(String str, DataOutput dataOut) throws IOException { 043 if (str == null) { 044 str = ""; 045 } 046 dataOut.writeUTF(str); 047 } 048 049 /** 050 * @param packet 051 * @return true if this PacketWriter can write this type of Packet 052 */ 053 public boolean canWrite(Packet packet) { 054 return packet.getPacketType() == this.getPacketType(); 055 } 056 057 /** 058 * Simple (but inefficent) utility method to write an object on to a stream 059 * 060 * @param object 061 * @param dataOut 062 * @throws IOException 063 */ 064 protected void writeObject(Object object, DataOutput dataOut) throws IOException { 065 if (object != null) { 066 byte[] data = SerializationHelper.writeObject(object); 067 dataOut.writeInt(data.length); 068 dataOut.write(data); 069 } 070 else { 071 dataOut.writeInt(0); 072 } 073 } 074 075 /** 076 * Serializes a Packet int a byte array 077 * 078 * @param packet 079 * @return the byte[] 080 * @throws IOException 081 */ 082 public byte[] writePacketToByteArray(Packet packet) throws IOException { 083 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 084 DataOutputStream dataOut = new DataOutputStream(bytesOut); 085 writePacket(packet, dataOut); 086 dataOut.flush(); 087 return bytesOut.toByteArray(); 088 } 089 090 /** 091 * Write a Packet instance to data output stream 092 * 093 * @param p the instance to be seralized 094 * @param dataOut the output stream 095 * @throws IOException thrown if an error occurs 096 */ 097 public void writePacket(Packet p, DataOutput dataOut) throws IOException { 098 AbstractPacket packet = (AbstractPacket)p; 099 dataOut.writeShort(packet.getId()); 100 BitArray ba = packet.getBitArray(); 101 ba.set(AbstractPacket.RECEIPT_REQUIRED_INDEX, packet.isReceiptRequired()); 102 Object[] visited = packet.getBrokersVisited(); 103 boolean writeVisited = visited != null && visited.length > 0; 104 ba.set(AbstractPacket.BROKERS_VISITED_INDEX,writeVisited); 105 ba.writeToStream(dataOut); 106 if (writeVisited){ 107 dataOut.writeShort(visited.length); 108 for(int i =0; i < visited.length; i++){ 109 dataOut.writeUTF(visited[i].toString()); 110 } 111 } 112 } 113 114 /** 115 * Set the wire format version 116 * @param version 117 */ 118 public void setWireFormatVersion(int version){ 119 this.wireFormatVersion = version; 120 } 121 122 /** 123 * @return the wire format version 124 */ 125 public int getWireFormatVersion(){ 126 return wireFormatVersion; 127 } 128 129 }