001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.io.impl; 020 import java.io.DataInput; 021 import java.io.IOException; 022 import javax.jms.Destination; 023 024 import org.activemq.io.util.WireByteArrayInputStream; 025 import org.activemq.message.AbstractPacket; 026 import org.activemq.message.ActiveMQDestination; 027 import org.activemq.message.ActiveMQMessage; 028 import org.activemq.message.ActiveMQXid; 029 import org.activemq.message.Packet; 030 import org.activemq.util.BitArray; 031 032 /** 033 * Writes a ProducerInfo object to a Stream 034 */ 035 public class ActiveMQMessageReader extends AbstractPacketReader { 036 037 private AbstractDefaultWireFormat wireFormat; 038 039 040 ActiveMQMessageReader(AbstractDefaultWireFormat wf){ 041 this.wireFormat = wf; 042 } 043 /** 044 * Return the type of Packet 045 * 046 * @return integer representation of the type of Packet 047 */ 048 public int getPacketType() { 049 return Packet.ACTIVEMQ_MESSAGE; 050 } 051 052 /** 053 * @return a new Packet instance 054 */ 055 public Packet createPacket() { 056 return new ActiveMQMessage(); 057 } 058 059 /** 060 * build a Packet instance from the data input stream 061 * 062 * @param packet A Packet object 063 * @param dataIn the data input stream to build the packet from 064 * @throws IOException 065 */ 066 public void buildPacket(Packet packet, DataInput dataIn) throws IOException { 067 ActiveMQMessage msg = (ActiveMQMessage) packet; 068 BitArray ba = msg.getBitArray(); 069 ba.readFromStream(dataIn); 070 071 boolean receiptRequired = ba.get(AbstractPacket.RECEIPT_REQUIRED_INDEX); 072 if (receiptRequired){ 073 msg.setReceiptRequired(receiptRequired); 074 msg.setId(dataIn.readShort()); 075 } 076 boolean externalMessageId = ba.get(ActiveMQMessage.EXTERNAL_MESSAGE_ID_INDEX); 077 078 if (externalMessageId){ 079 msg.setExternalMessageId(externalMessageId); 080 msg.setJMSMessageID(readUTF(dataIn)); 081 } 082 083 boolean cachingEnabled = ba.get(ActiveMQMessage.CACHED_VALUES_INDEX); 084 boolean cachingDestination = ba.get(ActiveMQMessage.CACHED_DESTINATION_INDEX); 085 086 boolean messagePart = ba.get(ActiveMQMessage.MESSAGE_PART_INDEX); 087 msg.setMessagePart(messagePart); 088 if (messagePart){ 089 msg.setParentMessageID(dataIn.readUTF()); 090 msg.setNumberOfParts(dataIn.readShort()); 091 msg.setPartNumber(dataIn.readShort()); 092 } 093 094 if (ba.get(AbstractPacket.BROKERS_VISITED_INDEX)){ 095 int visitedLen = dataIn.readShort(); 096 for (int i =0; i < visitedLen; i++){ 097 msg.addBrokerVisited(dataIn.readUTF()); 098 } 099 } 100 if (cachingEnabled){ 101 short key = dataIn.readShort(); 102 msg.setJMSClientID((String)wireFormat.getValueFromReadCache(key)); 103 key = dataIn.readShort(); 104 msg.setProducerKey((String)wireFormat.getValueFromReadCache(key)); 105 if (cachingDestination){ 106 key = dataIn.readShort(); 107 msg.setJMSDestination((Destination)wireFormat.getValueFromReadCache(key)); 108 }else{ 109 msg.setJMSDestination(ActiveMQDestination.readFromStream(dataIn)); 110 } 111 if (ba.get(ActiveMQMessage.REPLY_TO_INDEX)) { 112 key = dataIn.readShort(); 113 msg.setJMSReplyTo((Destination)wireFormat.getValueFromReadCache(key)); 114 } 115 if (ba.get(ActiveMQMessage.TRANSACTION_ID_INDEX)) { 116 key = dataIn.readShort(); 117 msg.setTransactionId(wireFormat.getValueFromReadCache(key)); 118 } 119 120 }else { 121 msg.setJMSClientID(super.readUTF(dataIn)); 122 msg.setProducerKey(dataIn.readUTF()); 123 msg.setJMSDestination(ActiveMQDestination.readFromStream(dataIn)); 124 if (ba.get(ActiveMQMessage.REPLY_TO_INDEX)) { 125 msg.setJMSReplyTo(ActiveMQDestination.readFromStream(dataIn)); 126 } 127 if (ba.get(ActiveMQMessage.TRANSACTION_ID_INDEX)) { 128 if( ba.get(ActiveMQMessage.XA_TRANS_INDEX) ) { 129 msg.setTransactionId(ActiveMQXid.read(dataIn)); 130 } else { 131 msg.setTransactionId(super.readUTF(dataIn)); 132 } 133 } else { 134 msg.setTransactionId(null); 135 } 136 } 137 138 139 msg.setJMSDeliveryMode(dataIn.readByte()); 140 msg.setJMSPriority(dataIn.readByte()); 141 142 msg.setJMSRedelivered(ba.get(ActiveMQMessage.REDELIVERED_INDEX)); 143 144 if (ba.get(ActiveMQMessage.CORRELATION_INDEX)) { 145 msg.setJMSCorrelationID(super.readUTF(dataIn)); 146 } 147 if (ba.get(ActiveMQMessage.TYPE_INDEX)) { 148 msg.setJMSType(super.readUTF(dataIn)); 149 } 150 if (ba.get(ActiveMQMessage.BROKER_NAME_INDEX)) { 151 msg.setEntryBrokerName(super.readUTF(dataIn)); 152 } 153 if (ba.get(ActiveMQMessage.CLUSTER_NAME_INDEX)) { 154 msg.setEntryClusterName(super.readUTF(dataIn)); 155 } 156 157 if (ba.get(ActiveMQMessage.TIMESTAMP_INDEX)) { 158 msg.setJMSTimestamp(dataIn.readLong()); 159 } 160 if (ba.get(ActiveMQMessage.EXPIRATION_INDEX)) { 161 msg.setJMSExpiration(dataIn.readLong()); 162 } 163 if (ba.get(ActiveMQMessage.LONG_SEQUENCE_INDEX)){ 164 msg.setSequenceNumber(dataIn.readLong()); 165 }else { 166 msg.setSequenceNumber(dataIn.readInt()); 167 } 168 msg.setDeliveryCount(dataIn.readByte()); 169 if (ba.get(ActiveMQMessage.DISPATCHED_FROM_DLQ_INDEX)){ 170 msg.setDispatchedFromDLQ(true); 171 } 172 173 if (ba.get(ActiveMQMessage.CID_INDEX)) { 174 int cidlength = dataIn.readShort(); 175 if (cidlength > 0) { 176 int[] cids = new int[cidlength]; 177 for (int i = 0; i < cids.length; i++) { 178 cids[i] = dataIn.readShort(); 179 } 180 msg.setConsumerNos(cids); 181 } 182 } 183 if (ba.get(ActiveMQMessage.PROPERTIES_INDEX)) { 184 msg.setProperties(msg.readMapProperties(dataIn)); 185 } 186 187 if (ba.get(ActiveMQMessage.PAYLOAD_INDEX)) { 188 int payloadLength = dataIn.readInt(); 189 if (payloadLength >= 0) { 190 if (dataIn instanceof WireByteArrayInputStream){ 191 WireByteArrayInputStream wireIn = (WireByteArrayInputStream)dataIn; 192 msg.setBodyAsBytes(wireIn.getRawData(), wireIn.position(), payloadLength); 193 }else { 194 byte[] payload = new byte[payloadLength]; 195 dataIn.readFully(payload); 196 msg.setBodyAsBytes(payload,0,payload.length); 197 } 198 } 199 } 200 201 202 } 203 }