001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.io.impl; 020 import java.io.DataInput; 021 import java.io.DataInputStream; 022 import java.io.DataOutput; 023 import java.io.DataOutputStream; 024 import java.io.IOException; 025 import java.io.Serializable; 026 import java.util.ArrayList; 027 import java.util.List; 028 029 import javax.jms.JMSException; 030 031 import org.activemq.io.AbstractWireFormat; 032 import org.activemq.io.WireFormat; 033 import org.activemq.message.AbstractPacket; 034 import org.activemq.message.CachedValue; 035 import org.activemq.message.Packet; 036 import org.activemq.message.WireFormatInfo; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 040 /** 041 * Default implementation used for Java-Java protocols. When talking to non-Java nodes we may use a different wire 042 * format. 043 * 044 * @version $Revision: 1.1.1.1 $ 045 */ 046 public abstract class AbstractDefaultWireFormat extends AbstractWireFormat implements Serializable { 047 048 /** 049 * Current wire format version for this implementation 050 */ 051 public static final int WIRE_FORMAT_VERSION = 3; 052 private static final Log log = LogFactory.getLog(AbstractDefaultWireFormat.class); 053 054 protected transient PacketReader messageReader; 055 protected transient PacketReader textMessageReader; 056 protected transient PacketReader objectMessageReader; 057 protected transient PacketReader bytesMessageReader; 058 protected transient PacketReader streamMessageReader; 059 protected transient PacketReader mapMessageReader; 060 protected transient PacketReader messageAckReader; 061 protected transient PacketReader receiptReader; 062 protected transient PacketReader consumerInfoReader; 063 protected transient PacketReader producerInfoReader; 064 protected transient PacketReader transactionInfoReader; 065 protected transient PacketReader xaTransactionInfoReader; 066 protected transient PacketReader brokerInfoReader; 067 protected transient PacketReader connectionInfoReader; 068 protected transient PacketReader sessionInfoReader; 069 protected transient PacketReader durableUnsubscribeReader; 070 protected transient PacketReader reponseReceiptReader; 071 protected transient PacketReader intReponseReceiptReader; 072 protected transient PacketReader capacityInfoReader; 073 protected transient PacketReader capacityInfoRequestReader; 074 protected transient PacketReader wireFormatInfoReader; 075 protected transient PacketReader keepAliveReader; 076 protected transient PacketReader brokerAdminCommandReader; 077 protected transient PacketReader cachedValueReader; 078 protected transient PacketReader cleanupConnectionAndSessionInfoReader; 079 protected transient PacketWriter messageWriter; 080 protected transient PacketWriter textMessageWriter; 081 protected transient PacketWriter objectMessageWriter; 082 protected transient PacketWriter bytesMessageWriter; 083 protected transient PacketWriter streamMessageWriter; 084 protected transient PacketWriter mapMessageWriter; 085 protected transient PacketWriter messageAckWriter; 086 protected transient PacketWriter receiptWriter; 087 protected transient PacketWriter consumerInfoWriter; 088 protected transient PacketWriter producerInfoWriter; 089 protected transient PacketWriter transactionInfoWriter; 090 protected transient PacketWriter xaTransactionInfoWriter; 091 protected transient PacketWriter brokerInfoWriter; 092 protected transient PacketWriter connectionInfoWriter; 093 protected transient PacketWriter sessionInfoWriter; 094 protected transient PacketWriter durableUnsubscribeWriter; 095 protected transient PacketWriter reponseReceiptWriter; 096 protected transient PacketWriter intReponseReceiptWriter; 097 protected transient PacketWriter capacityInfoWriter; 098 protected transient PacketWriter capacityInfoRequestWriter; 099 protected transient PacketWriter wireFormatInfoWriter; 100 protected transient PacketWriter keepAliveWriter; 101 protected transient PacketWriter brokerAdminCommandWriter; 102 protected transient PacketWriter cachedValueWriter; 103 protected transient PacketWriter cleanupConnectionAndSessionInfoWriter; 104 105 private List readers = new ArrayList(); 106 private List writers = new ArrayList(); 107 108 protected transient int currentWireFormatVersion; 109 110 /** 111 * Default Constructor 112 */ 113 public AbstractDefaultWireFormat() { 114 this.currentWireFormatVersion = WIRE_FORMAT_VERSION; 115 initializeReaders(); 116 initializeWriters(); 117 } 118 119 120 abstract public byte[] toBytes(Packet packet) throws IOException; 121 abstract public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException; 122 abstract protected Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException; 123 124 abstract protected void handleCachedValue(CachedValue cv); 125 abstract public Object getValueFromReadCache(short key); 126 abstract short getWriteCachedKey(Object value) throws IOException; 127 128 129 /** 130 * Some wire formats require a handshake at start-up 131 * @param dataOut 132 * @param dataIn 133 * @throws JMSException 134 */ 135 public void initiateClientSideProtocol(DataOutputStream dataOut,DataInputStream dataIn) throws JMSException{ 136 WireFormatInfo info = new WireFormatInfo(); 137 info.setVersion(getCurrentWireFormatVersion()); 138 try { 139 writePacket(info, dataOut); 140 dataOut.flush(); 141 } 142 catch (IOException e) { 143 throw new JMSException("Failed to intiate protocol"); 144 } 145 } 146 147 /** 148 * Some wire formats require a handshake at start-up 149 * @param dataOut 150 * @param dataIn 151 * @throws JMSException 152 */ 153 public void initiateServerSideProtocol(DataOutputStream dataOut,DataInputStream dataIn) throws JMSException{ 154 } 155 156 /** 157 * @return new WireFormat 158 */ 159 abstract public WireFormat copy(); 160 161 /** 162 * @param firstByte 163 * @param dataIn 164 * @return 165 * @throws IOException 166 * 167 */ 168 public Packet readPacket(int firstByte, DataInput dataIn) throws IOException { 169 switch (firstByte) { 170 case Packet.ACTIVEMQ_MESSAGE : 171 return readPacket(dataIn, messageReader); 172 case Packet.ACTIVEMQ_TEXT_MESSAGE : 173 return readPacket(dataIn, textMessageReader); 174 case Packet.ACTIVEMQ_OBJECT_MESSAGE : 175 return readPacket(dataIn, objectMessageReader); 176 case Packet.ACTIVEMQ_BYTES_MESSAGE : 177 return readPacket(dataIn, bytesMessageReader); 178 case Packet.ACTIVEMQ_STREAM_MESSAGE : 179 return readPacket(dataIn, streamMessageReader); 180 case Packet.ACTIVEMQ_MAP_MESSAGE : 181 return readPacket(dataIn, mapMessageReader); 182 case Packet.ACTIVEMQ_MSG_ACK : 183 return readPacket(dataIn, messageAckReader); 184 case Packet.RECEIPT_INFO : 185 return readPacket(dataIn, receiptReader); 186 case Packet.CONSUMER_INFO : 187 return readPacket(dataIn, consumerInfoReader); 188 case Packet.PRODUCER_INFO : 189 return readPacket(dataIn, producerInfoReader); 190 case Packet.TRANSACTION_INFO : 191 return readPacket(dataIn, transactionInfoReader); 192 case Packet.XA_TRANSACTION_INFO : 193 return readPacket(dataIn, xaTransactionInfoReader); 194 case Packet.ACTIVEMQ_BROKER_INFO : 195 return readPacket(dataIn, brokerInfoReader); 196 case Packet.ACTIVEMQ_CONNECTION_INFO : 197 return readPacket(dataIn, connectionInfoReader); 198 case Packet.SESSION_INFO : 199 return readPacket(dataIn, sessionInfoReader); 200 case Packet.DURABLE_UNSUBSCRIBE : 201 return readPacket(dataIn, durableUnsubscribeReader); 202 case Packet.RESPONSE_RECEIPT_INFO : 203 return readPacket(dataIn, reponseReceiptReader); 204 case Packet.INT_RESPONSE_RECEIPT_INFO : 205 return readPacket(dataIn, intReponseReceiptReader); 206 case Packet.CAPACITY_INFO : 207 return readPacket(dataIn, capacityInfoReader); 208 case Packet.CAPACITY_INFO_REQUEST : 209 return readPacket(dataIn, capacityInfoRequestReader); 210 case Packet.WIRE_FORMAT_INFO : 211 WireFormatInfo info = (WireFormatInfo)readPacket(dataIn, wireFormatInfoReader); 212 if (info != null){ 213 if (info.getVersion() < 3){ 214 throw new IOException("Cannot support wire format version: " + info.getVersion()); 215 } 216 } 217 return info; 218 219 case Packet.KEEP_ALIVE : 220 return readPacket(dataIn, keepAliveReader); 221 case Packet.BROKER_ADMIN_COMMAND : 222 return readPacket(dataIn, brokerAdminCommandReader); 223 case Packet.CACHED_VALUE_COMMAND : 224 CachedValue cv = (CachedValue)readPacket(dataIn,cachedValueReader); 225 handleCachedValue(cv); 226 return null; 227 case Packet.CLEANUP_CONNECTION_INFO : 228 return readPacket(dataIn, cleanupConnectionAndSessionInfoReader); 229 default : 230 log.error("Could not find PacketReader for packet type: " 231 + AbstractPacket.getPacketTypeAsString(firstByte)); 232 return null; 233 } 234 } 235 236 protected PacketWriter getWriter(Packet packet) throws IOException { 237 PacketWriter answer = null; 238 switch (packet.getPacketType()) { 239 case Packet.ACTIVEMQ_MESSAGE : 240 answer = messageWriter; 241 break; 242 case Packet.ACTIVEMQ_TEXT_MESSAGE : 243 answer = textMessageWriter; 244 break; 245 case Packet.ACTIVEMQ_OBJECT_MESSAGE : 246 answer = objectMessageWriter; 247 break; 248 case Packet.ACTIVEMQ_BYTES_MESSAGE : 249 answer = bytesMessageWriter; 250 break; 251 case Packet.ACTIVEMQ_STREAM_MESSAGE : 252 answer = streamMessageWriter; 253 break; 254 case Packet.ACTIVEMQ_MAP_MESSAGE : 255 answer = mapMessageWriter; 256 break; 257 case Packet.ACTIVEMQ_MSG_ACK : 258 answer = messageAckWriter; 259 break; 260 case Packet.RECEIPT_INFO : 261 answer = receiptWriter; 262 break; 263 case Packet.CONSUMER_INFO : 264 answer = consumerInfoWriter; 265 break; 266 case Packet.PRODUCER_INFO : 267 answer = producerInfoWriter; 268 break; 269 case Packet.TRANSACTION_INFO : 270 answer = transactionInfoWriter; 271 break; 272 case Packet.XA_TRANSACTION_INFO : 273 answer = xaTransactionInfoWriter; 274 break; 275 case Packet.ACTIVEMQ_BROKER_INFO : 276 answer = brokerInfoWriter; 277 break; 278 case Packet.ACTIVEMQ_CONNECTION_INFO : 279 answer = connectionInfoWriter; 280 break; 281 case Packet.SESSION_INFO : 282 answer = sessionInfoWriter; 283 break; 284 case Packet.DURABLE_UNSUBSCRIBE : 285 answer = durableUnsubscribeWriter; 286 break; 287 case Packet.RESPONSE_RECEIPT_INFO : 288 answer = reponseReceiptWriter; 289 break; 290 case Packet.INT_RESPONSE_RECEIPT_INFO : 291 answer = intReponseReceiptWriter; 292 break; 293 case Packet.CAPACITY_INFO : 294 answer = capacityInfoWriter; 295 break; 296 case Packet.CAPACITY_INFO_REQUEST : 297 answer = capacityInfoRequestWriter; 298 break; 299 case Packet.WIRE_FORMAT_INFO : 300 answer = wireFormatInfoWriter; 301 break; 302 case Packet.KEEP_ALIVE : 303 answer = keepAliveWriter; 304 break; 305 case Packet.BROKER_ADMIN_COMMAND : 306 answer = brokerAdminCommandWriter; 307 break; 308 case Packet.CACHED_VALUE_COMMAND: 309 answer = cachedValueWriter; 310 break; 311 case Packet.CLEANUP_CONNECTION_INFO: 312 answer = cleanupConnectionAndSessionInfoWriter; 313 break; 314 default : 315 log.error("no PacketWriter for packet: " + packet); 316 } 317 return answer; 318 } 319 320 /** 321 * Can this wireformat process packets of this version 322 * @param version the version number to test 323 * @return true if can accept the version 324 */ 325 public boolean canProcessWireFormatVersion(int version){ 326 return version <= WIRE_FORMAT_VERSION; 327 } 328 329 /** 330 * @return the current version of this wire format 331 */ 332 public int getCurrentWireFormatVersion(){ 333 return currentWireFormatVersion; 334 } 335 336 /** 337 * set the current version 338 * @param version 339 */ 340 public void setCurrentWireFormatVersion(int version){ 341 this.currentWireFormatVersion = version; 342 for (int i =0; i < readers.size(); i++){ 343 PacketReader reader = (PacketReader)readers.get(i); 344 reader.setWireFormatVersion(version); 345 } 346 for (int i =0; i < writers.size(); i++){ 347 PacketWriter writer = (PacketWriter)writers.get(i); 348 writer.setWireFormatVersion(version); 349 } 350 } 351 352 private void initializeReaders() { 353 messageReader = new ActiveMQMessageReader(this); 354 readers.add(messageReader); 355 textMessageReader = new ActiveMQTextMessageReader(this); 356 readers.add(textMessageReader); 357 objectMessageReader = new ActiveMQObjectMessageReader(this); 358 readers.add(objectMessageReader); 359 bytesMessageReader = new ActiveMQBytesMessageReader(this); 360 readers.add(bytesMessageReader); 361 streamMessageReader = new ActiveMQStreamMessageReader(this); 362 readers.add(streamMessageReader); 363 mapMessageReader = new ActiveMQMapMessageReader(this); 364 readers.add(mapMessageReader); 365 messageAckReader = new MessageAckReader(this); 366 readers.add(messageAckReader); 367 receiptReader = new ReceiptReader(); 368 readers.add(receiptReader); 369 consumerInfoReader = new ConsumerInfoReader(); 370 readers.add(consumerInfoReader); 371 producerInfoReader = new ProducerInfoReader(); 372 readers.add(producerInfoReader); 373 transactionInfoReader = new TransactionInfoReader(); 374 readers.add(transactionInfoReader); 375 xaTransactionInfoReader = new XATransactionInfoReader(); 376 readers.add(xaTransactionInfoReader); 377 brokerInfoReader = new BrokerInfoReader(); 378 readers.add(brokerInfoReader); 379 connectionInfoReader = new ConnectionInfoReader(); 380 readers.add(connectionInfoReader); 381 sessionInfoReader = new SessionInfoReader(); 382 readers.add(sessionInfoReader); 383 durableUnsubscribeReader = new DurableUnsubscribeReader(); 384 readers.add(durableUnsubscribeReader); 385 reponseReceiptReader = new ResponseReceiptReader(); 386 readers.add(reponseReceiptReader); 387 intReponseReceiptReader = new IntResponseReceiptReader(); 388 readers.add(intReponseReceiptReader); 389 capacityInfoReader = new CapacityInfoReader(); 390 readers.add(capacityInfoReader); 391 capacityInfoRequestReader = new CapacityInfoRequestReader(); 392 readers.add(capacityInfoReader); 393 wireFormatInfoReader = new WireFormatInfoReader(this); 394 readers.add(wireFormatInfoReader); 395 keepAliveReader = new KeepAliveReader(); 396 readers.add(keepAliveReader); 397 brokerAdminCommandReader = new BrokerAdminCommandReader(); 398 readers.add(brokerAdminCommandReader); 399 cachedValueReader = new CachedValueReader(); 400 readers.add(cachedValueReader); 401 cleanupConnectionAndSessionInfoReader = new CleanupConnectionInfoReader(); 402 readers.add(cleanupConnectionAndSessionInfoReader); 403 } 404 405 private void initializeWriters(){ 406 messageWriter = new ActiveMQMessageWriter(this); 407 writers.add(messageWriter); 408 textMessageWriter = new ActiveMQTextMessageWriter(this); 409 writers.add(textMessageWriter); 410 objectMessageWriter = new ActiveMQObjectMessageWriter(this); 411 writers.add(objectMessageWriter); 412 bytesMessageWriter = new ActiveMQBytesMessageWriter(this); 413 writers.add(bytesMessageWriter); 414 streamMessageWriter = new ActiveMQStreamMessageWriter(this); 415 writers.add(streamMessageWriter); 416 mapMessageWriter = new ActiveMQMapMessageWriter(this); 417 writers.add(mapMessageWriter); 418 messageAckWriter = new MessageAckWriter(this); 419 writers.add(messageAckWriter); 420 receiptWriter = new ReceiptWriter(); 421 writers.add(receiptWriter); 422 consumerInfoWriter = new ConsumerInfoWriter(); 423 writers.add(consumerInfoWriter); 424 producerInfoWriter = new ProducerInfoWriter(); 425 writers.add(producerInfoWriter); 426 transactionInfoWriter = new TransactionInfoWriter(); 427 writers.add(transactionInfoWriter); 428 xaTransactionInfoWriter = new XATransactionInfoWriter(); 429 writers.add(xaTransactionInfoWriter); 430 brokerInfoWriter = new BrokerInfoWriter(); 431 writers.add(brokerInfoWriter); 432 connectionInfoWriter = new ConnectionInfoWriter(); 433 writers.add(connectionInfoWriter); 434 sessionInfoWriter = new SessionInfoWriter(); 435 writers.add(sessionInfoWriter); 436 durableUnsubscribeWriter = new DurableUnsubscribeWriter(); 437 writers.add(durableUnsubscribeWriter); 438 reponseReceiptWriter = new ResponseReceiptWriter(); 439 writers.add(reponseReceiptWriter); 440 intReponseReceiptWriter = new IntResponseReceiptWriter(); 441 writers.add(intReponseReceiptWriter); 442 capacityInfoWriter = new CapacityInfoWriter(); 443 writers.add(capacityInfoWriter); 444 capacityInfoRequestWriter = new CapacityInfoRequestWriter(); 445 writers.add(capacityInfoWriter); 446 wireFormatInfoWriter = new WireFormatInfoWriter(); 447 writers.add(wireFormatInfoWriter); 448 keepAliveWriter = new KeepAliveWriter(); 449 writers.add(keepAliveWriter); 450 brokerAdminCommandWriter = new BrokerAdminCommandWriter(); 451 writers.add(brokerAdminCommandWriter); 452 cachedValueWriter = new CachedValueWriter(); 453 writers.add(cachedValueWriter); 454 cleanupConnectionAndSessionInfoWriter = new CleanupConnectionInfoWriter(); 455 writers.add(cleanupConnectionAndSessionInfoWriter); 456 } 457 458 }