001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * Copyright 2004 Hiram Chirino 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 package org.activemq.io.impl; 020 021 import java.io.ByteArrayOutputStream; 022 import java.io.DataInput; 023 import java.io.DataInputStream; 024 import java.io.DataOutput; 025 import java.io.DataOutputStream; 026 import java.io.IOException; 027 import java.io.ObjectStreamException; 028 029 import org.activeio.PacketData; 030 import org.activeio.adapter.PacketByteArrayOutputStream; 031 import org.activeio.adapter.PacketInputStream; 032 import org.activemq.io.WireFormat; 033 import org.activemq.message.CachedValue; 034 import org.activemq.message.Packet; 035 036 /** 037 * Provides a stateless implementation of AbstractDefaultWireFormat. Safe for use by multiple threads and incurs no locking overhead. 038 * 039 * @version $Revision: 1.1.1.1 $ 040 */ 041 public class StatelessDefaultWireFormat extends AbstractDefaultWireFormat { 042 043 private static final long serialVersionUID = -2648674156081593006L; 044 045 public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException { 046 047 PacketWriter writer = getWriter(packet); 048 if (writer != null) { 049 050 PacketByteArrayOutputStream internalBytesOut = new PacketByteArrayOutputStream( 50+ (packet.getMemoryUsage()==0 ? 1024 : packet.getMemoryUsage()) ); 051 DataOutputStream internalDataOut = new DataOutputStream(internalBytesOut); 052 writer.writePacket(packet, internalDataOut); 053 internalDataOut.close(); 054 055 org.activeio.Packet p = internalBytesOut.getPacket(); 056 int count = p.remaining(); 057 058 dataOut.writeByte(packet.getPacketType()); 059 dataOut.writeInt(count); 060 packet.setMemoryUsage(count); 061 p.writeTo(dataOut); 062 063 } 064 return null; 065 } 066 067 /** 068 * Write a Packet to a PacketByteArrayOutputStream 069 * 070 * @param packet 071 * @param dataOut 072 * @return a response packet - or null 073 * @throws IOException 074 */ 075 public org.activeio.Packet writePacket(Packet packet, PacketByteArrayOutputStream paos) throws IOException { 076 PacketWriter writer = getWriter(packet); 077 if (writer != null) { 078 079 // We may not be writing to the start of the PAOS. 080 int startPosition = paos.position(); 081 // Skip space for the headers. 082 paos.skip(5); 083 // Stream the data. 084 DataOutputStream data = new DataOutputStream(paos); 085 writer.writePacket(packet, data); 086 data.close(); 087 org.activeio.Packet rc = paos.getPacket(); 088 089 int count = rc.remaining()-(startPosition+5); 090 packet.setMemoryUsage(count); 091 092 // Now write the headers to the packet. 093 094 rc.position(startPosition); 095 PacketData pd = new PacketData(rc); 096 pd.writeByte(packet.getPacketType()); 097 pd.writeInt(count); 098 rc.rewind(); 099 return rc; 100 } 101 return null; 102 } 103 104 /** 105 * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal 106 * BytesOutputStream 107 * 108 * @param packet 109 * @return a byte array representing the packet using some wire protocol 110 * @throws IOException 111 */ 112 public byte[] toBytes(Packet packet) throws IOException { 113 114 byte[] data = null; 115 PacketWriter writer = getWriter(packet); 116 if (writer != null) { 117 118 // Try to guess the right size. 119 ByteArrayOutputStream internalBytesOut = new ByteArrayOutputStream( 50+ (packet.getMemoryUsage()==0 ? 1024 : packet.getMemoryUsage()) ); 120 DataOutputStream internalDataOut = new DataOutputStream(internalBytesOut); 121 122 internalBytesOut.reset(); 123 internalDataOut.writeByte(packet.getPacketType()); 124 internalDataOut.writeInt(-1);//the length 125 writer.writePacket(packet, internalDataOut); 126 internalDataOut.flush(); 127 data = internalBytesOut.toByteArray(); 128 // lets subtract the header offset from the length 129 int length = data.length - 5; 130 packet.setMemoryUsage(length); 131 //write in the length to the data 132 data[1] = (byte) ((length >>> 24) & 0xFF); 133 data[2] = (byte) ((length >>> 16) & 0xFF); 134 data[3] = (byte) ((length >>> 8) & 0xFF); 135 data[4] = (byte) ((length >>> 0) & 0xFF); 136 } 137 return data; 138 } 139 140 protected Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException { 141 Packet packet = reader.createPacket(); 142 int length = dataIn.readInt(); 143 packet.setMemoryUsage(length); 144 reader.buildPacket(packet, dataIn); 145 return packet; 146 } 147 148 /** 149 * @param dataIn 150 * @return 151 * @throws IOException 152 */ 153 public Packet readPacket(org.activeio.Packet dataIn) throws IOException { 154 return readPacket(new DataInputStream(new PacketInputStream(dataIn))); 155 } 156 157 protected void handleCachedValue(CachedValue cv) { 158 throw new IllegalStateException("Value caching is not supported."); 159 } 160 161 public Object getValueFromReadCache(short key) { 162 throw new IllegalStateException("Value caching is not supported."); 163 } 164 165 short getWriteCachedKey(Object value) { 166 throw new IllegalStateException("Value caching is not supported."); 167 } 168 169 public boolean isCachingEnabled() { 170 return false; 171 } 172 173 public WireFormat copy() { 174 return new StatelessDefaultWireFormat(); 175 } 176 177 private Object readResolve() throws ObjectStreamException { 178 return new DefaultWireFormat(); 179 } 180 181 }