001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.io.impl; 020 import java.io.DataInput; 021 import java.io.DataInputStream; 022 import java.io.DataOutput; 023 import java.io.DataOutputStream; 024 import java.io.IOException; 025 import java.io.ObjectStreamException; 026 import java.io.Serializable; 027 import java.util.Arrays; 028 import java.util.HashMap; 029 import java.util.Map; 030 031 import org.activemq.io.WireFormat; 032 import org.activemq.io.util.WireByteArrayInputStream; 033 import org.activemq.io.util.WireByteArrayOutputStream; 034 import org.activemq.message.CachedValue; 035 import org.activemq.message.Packet; 036 037 /** 038 * This is a stateful AbstractDefaultWireFormat which implements value caching. Not optimal for use by 039 * many concurrent threads. One DefaultWireFormat is typically allocated per client connection. 040 * 041 * @version $Revision: 1.1.1.1 $ 042 */ 043 public class DefaultWireFormat extends AbstractDefaultWireFormat implements Serializable { 044 045 private static final long serialVersionUID = -1454851936411678612L; 046 047 private static final int MAX_CACHE_SIZE = Short.MAX_VALUE/2; //needs to be a lot less than Short.MAX_VALUE 048 049 static final short NULL_VALUE = -1; 050 static final short CLEAR_CACHE = -2; 051 052 // 053 // Fields used during a write. 054 // 055 protected transient final Object writeMutex = new Object(); 056 protected transient WireByteArrayOutputStream internalBytesOut; 057 protected transient DataOutputStream internalDataOut; 058 protected transient WireByteArrayOutputStream cachedBytesOut; 059 protected transient DataOutputStream cachedDataOut; 060 private Map writeValueCache = new HashMap(); 061 protected transient short cachedKeyGenerator; 062 protected transient short lastWriteValueCacheEvictionPosition=500; 063 064 // 065 // Fields used during a read. 066 // 067 protected transient final Object readMutex = new Object(); 068 protected transient WireByteArrayInputStream internalBytesIn; 069 protected transient DataInputStream internalDataIn; 070 private Object[] writeValueCacheArray = new Object[MAX_CACHE_SIZE]; 071 private Object[] readValueCacheArray = new Object[MAX_CACHE_SIZE]; 072 073 074 /** 075 * Default Constructor 076 */ 077 public DefaultWireFormat() { 078 internalBytesOut = new WireByteArrayOutputStream(); 079 internalDataOut = new DataOutputStream(internalBytesOut); 080 internalBytesIn = new WireByteArrayInputStream(); 081 internalDataIn = new DataInputStream(internalBytesIn); 082 this.currentWireFormatVersion = WIRE_FORMAT_VERSION; 083 this.cachedBytesOut = new WireByteArrayOutputStream(); 084 this.cachedDataOut = new DataOutputStream(cachedBytesOut); 085 } 086 087 /** 088 * @return new WireFormat 089 */ 090 public WireFormat copy() { 091 DefaultWireFormat format = new DefaultWireFormat(); 092 format.setCachingEnabled(cachingEnabled); 093 format.setCurrentWireFormatVersion(getCurrentWireFormatVersion()); 094 return format; 095 } 096 097 098 private Object readResolve() throws ObjectStreamException { 099 return new DefaultWireFormat(); 100 } 101 102 public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException { 103 PacketWriter writer = getWriter(packet); 104 if (writer != null) { 105 synchronized (writeMutex) { 106 internalBytesOut.reset(); 107 writer.writePacket(packet, internalDataOut); 108 internalDataOut.flush(); 109 //reuse the byte buffer in the ByteArrayOutputStream 110 byte[] data = internalBytesOut.getData(); 111 int count = internalBytesOut.size(); 112 dataOut.writeByte(packet.getPacketType()); 113 dataOut.writeInt(count); 114 //byte[] data = internalBytesOut.toByteArray(); 115 //int count = data.length; 116 //dataOut.writeInt(count); 117 packet.setMemoryUsage(count); 118 dataOut.write(data, 0, count); 119 } 120 } 121 return null; 122 } 123 124 protected synchronized final Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException { 125 synchronized (readMutex) { 126 Packet packet = reader.createPacket(); 127 int length = dataIn.readInt(); 128 packet.setMemoryUsage(length); 129 byte[] data = new byte[length]; 130 dataIn.readFully(data); 131 //then splat into the internal datainput 132 internalBytesIn.restart(data); 133 reader.buildPacket(packet, internalDataIn); 134 return packet; 135 } 136 } 137 138 /** 139 * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal 140 * BytesOutputStream 141 * 142 * @param packet 143 * @return a byte array representing the packet using some wire protocol 144 * @throws IOException 145 */ 146 public byte[] toBytes(Packet packet) throws IOException { 147 byte[] data = null; 148 PacketWriter writer = getWriter(packet); 149 150 if (writer != null) { 151 152 synchronized (writeMutex) { 153 internalBytesOut.reset(); 154 internalDataOut.writeByte(packet.getPacketType()); 155 internalDataOut.writeInt(-1);//the length 156 writer.writePacket(packet, internalDataOut); 157 internalDataOut.flush(); 158 data = internalBytesOut.toByteArray(); 159 } 160 161 // lets subtract the header offset from the length 162 int length = data.length - 5; 163 packet.setMemoryUsage(length); 164 //write in the length to the data 165 data[1] = (byte) ((length >>> 24) & 0xFF); 166 data[2] = (byte) ((length >>> 16) & 0xFF); 167 data[3] = (byte) ((length >>> 8) & 0xFF); 168 data[4] = (byte) ((length >>> 0) & 0xFF); 169 } 170 171 return data; 172 } 173 174 /////////////////////////////////////////////////////////////// 175 // 176 // Methods to handle cached values 177 // 178 /////////////////////////////////////////////////////////////// 179 180 public Object getValueFromReadCache(short key) { 181 if( key < 0 || key > readValueCacheArray.length ) 182 return null; 183 return readValueCacheArray[key]; 184 } 185 186 protected short getWriteCachedKey(Object key) throws IOException{ 187 if (key != null){ 188 Short result = null; 189 result = (Short)writeValueCache.get(key); 190 if (result == null){ 191 result = getNextCacheId(); 192 writeValueCache.put(key,result); 193 writeValueCacheArray[result.shortValue()]=key; 194 updateCachedValue(result.shortValue(),key); 195 } 196 return result.shortValue(); 197 } 198 return DefaultWireFormat.NULL_VALUE; 199 } 200 201 /** 202 * @return 203 */ 204 private Short getNextCacheId() { 205 Short result; 206 result = new Short(cachedKeyGenerator++); 207 // once we fill the cache start reusing old cache locations to avoid memory leaks. 208 if (cachedKeyGenerator >= MAX_CACHE_SIZE) { 209 cachedKeyGenerator=0; 210 } 211 212 lastWriteValueCacheEvictionPosition++; 213 if (lastWriteValueCacheEvictionPosition >= MAX_CACHE_SIZE) { 214 lastWriteValueCacheEvictionPosition=0; 215 } 216 217 if( writeValueCacheArray[lastWriteValueCacheEvictionPosition] !=null ) { 218 Object o = writeValueCacheArray[lastWriteValueCacheEvictionPosition]; 219 writeValueCache.remove(o); 220 writeValueCacheArray[lastWriteValueCacheEvictionPosition]=null; 221 } 222 return result; 223 } 224 225 protected void validateWriteCache() throws IOException { 226 if (cachingEnabled) { 227 if (writeValueCache.size() >= MAX_CACHE_SIZE) { 228 writeValueCache.clear(); 229 Arrays.fill(writeValueCacheArray,null); 230 cachedKeyGenerator = 0; 231 updateCachedValue((short) -1, null);// send update to peer to 232 // clear the peer cache 233 } 234 } 235 } 236 237 protected void handleCachedValue(CachedValue cv) { 238 if (cv != null) { 239 if (cv.getId() == CLEAR_CACHE) { 240 Arrays.fill(readValueCacheArray, null); 241 } else if (cv.getId() != NULL_VALUE) { 242 readValueCacheArray[cv.getId()] = cv.getValue(); 243 } 244 } 245 } 246 247 private synchronized void updateCachedValue(short key, Object value) throws IOException { 248 if (cachedValueWriter == null) { 249 cachedValueWriter = new CachedValueWriter(); 250 } 251 CachedValue cv = new CachedValue(); 252 cv.setId(key); 253 cv.setValue(value); 254 cachedBytesOut.reset(); 255 cachedValueWriter.writePacket(cv, cachedDataOut); 256 cachedDataOut.flush(); 257 byte[] data = cachedBytesOut.getData(); 258 int count = cachedBytesOut.size(); 259 getTransportDataOut().writeByte(cv.getPacketType()); 260 getTransportDataOut().writeInt(count); 261 getTransportDataOut().write(data, 0, count); 262 } 263 }