001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.io.impl; 020 import java.io.DataOutput; 021 import java.io.IOException; 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 import org.activemq.io.util.ByteArray; 025 import org.activemq.message.AbstractPacket; 026 import org.activemq.message.ActiveMQDestination; 027 import org.activemq.message.ActiveMQMessage; 028 import org.activemq.message.ActiveMQXid; 029 import org.activemq.message.Packet; 030 import org.activemq.util.BitArray; 031 032 /** 033 * Writes a ActiveMQMessage packet to a Stream 034 */ 035 public class ActiveMQMessageWriter extends AbstractPacketWriter { 036 private static final Log log = LogFactory.getLog(ActiveMQMessageWriter.class); 037 private AbstractDefaultWireFormat wireFormat; 038 039 040 ActiveMQMessageWriter(AbstractDefaultWireFormat wf){ 041 this.wireFormat = wf; 042 } 043 044 /** 045 * Return the type of Packet 046 * 047 * @return integer representation of the type of Packet 048 */ 049 public int getPacketType() { 050 return Packet.ACTIVEMQ_MESSAGE; 051 } 052 053 /** 054 * Write a Packet instance to data output stream 055 * 056 * @param packet the instance to be seralized 057 * @param dataOut the output stream 058 * @throws IOException thrown if an error occurs 059 */ 060 public void writePacket(Packet packet, DataOutput dataOut) throws IOException { 061 ActiveMQMessage msg = (ActiveMQMessage) packet; 062 063 //check producerKey is set (will not be set by most Junit tests) 064 boolean externalMessageId = msg.isExternalMessageId() || msg.getProducerKey() == null || msg.getProducerKey().length() == 0; 065 msg.setExternalMessageId(externalMessageId); 066 067 ActiveMQDestination destination = msg.getJMSActiveMQDestination(); 068 ByteArray payload = msg.getBodyAsBytes(); 069 BitArray ba = msg.getBitArray(); 070 ba.reset(); 071 boolean cachingEnabled = wireFormat.isCachingEnabled(); 072 boolean cachingDestination = cachingEnabled && destination != null && !destination.isOrdered() && !destination.isExclusive(); 073 boolean longSequence = msg.getSequenceNumber() > Integer.MAX_VALUE; 074 075 ba.set(AbstractPacket.RECEIPT_REQUIRED_INDEX, packet.isReceiptRequired()); 076 Object[] visited = msg.getBrokersVisited(); 077 boolean writeVisited = visited != null && visited.length > 0; 078 ba.set(AbstractPacket.BROKERS_VISITED_INDEX,writeVisited); 079 ba.set(ActiveMQMessage.CORRELATION_INDEX, msg.getJMSCorrelationID() != null); 080 ba.set(ActiveMQMessage.TYPE_INDEX, msg.getJMSType() != null); 081 ba.set(ActiveMQMessage.BROKER_NAME_INDEX, msg.getEntryBrokerName() != null); 082 ba.set(ActiveMQMessage.CLUSTER_NAME_INDEX, msg.getEntryClusterName() != null); 083 ba.set(ActiveMQMessage.TRANSACTION_ID_INDEX, msg.getTransactionId() != null); 084 ba.set(ActiveMQMessage.REPLY_TO_INDEX, msg.getJMSReplyTo() != null); 085 ba.set(ActiveMQMessage.TIMESTAMP_INDEX, msg.getJMSTimestamp() > 0); 086 ba.set(ActiveMQMessage.EXPIRATION_INDEX, msg.getJMSExpiration() > 0); 087 ba.set(ActiveMQMessage.REDELIVERED_INDEX, msg.getJMSRedelivered()); 088 ba.set(ActiveMQMessage.XA_TRANS_INDEX, msg.isXaTransacted()); 089 ba.set(ActiveMQMessage.CID_INDEX, msg.getConsumerNos() != null); 090 ba.set(ActiveMQMessage.PROPERTIES_INDEX, msg.getProperties() != null && msg.getProperties().size() > 0); 091 ba.set(ActiveMQMessage.DISPATCHED_FROM_DLQ_INDEX,msg.isDispatchedFromDLQ()); 092 ba.set(ActiveMQMessage.PAYLOAD_INDEX, payload != null); 093 ba.set(ActiveMQMessage.EXTERNAL_MESSAGE_ID_INDEX,msg.isExternalMessageId()); 094 ba.set(ActiveMQMessage.MESSAGE_PART_INDEX,msg.isMessagePart()); 095 ba.set(ActiveMQMessage.CACHED_VALUES_INDEX, cachingEnabled); 096 ba.set(ActiveMQMessage.CACHED_DESTINATION_INDEX,cachingDestination); 097 ba.set(ActiveMQMessage.LONG_SEQUENCE_INDEX, longSequence); 098 099 ba.writeToStream(dataOut); 100 if (msg.isReceiptRequired()){ 101 dataOut.writeShort(msg.getId()); 102 } 103 if (msg.isExternalMessageId()){ 104 writeUTF(msg.getJMSMessageID(),dataOut); 105 } 106 if (msg.isMessagePart()){ 107 writeUTF(msg.getParentMessageID(), dataOut); 108 dataOut.writeShort(msg.getNumberOfParts()); 109 dataOut.writeShort(msg.getPartNumber()); 110 } 111 if (writeVisited){ 112 dataOut.writeShort(visited.length); 113 for(int i =0; i < visited.length; i++){ 114 final String brokerName = visited[i].toString(); 115 if (brokerName != null) { 116 dataOut.writeUTF(brokerName); 117 } else { 118 log.warn("The brokerVisited name is null"); 119 } 120 } 121 } 122 123 if (cachingEnabled){ 124 dataOut.writeShort(wireFormat.getWriteCachedKey(msg.getJMSClientID())); 125 dataOut.writeShort(wireFormat.getWriteCachedKey(msg.getProducerKey())); 126 if (cachingDestination){ 127 dataOut.writeShort(wireFormat.getWriteCachedKey(destination)); 128 }else { 129 ActiveMQDestination.writeToStream(destination, dataOut); 130 } 131 if (msg.getJMSReplyTo() != null){ 132 dataOut.writeShort(wireFormat.getWriteCachedKey(msg.getJMSReplyTo())); 133 } 134 if (ba.get(ActiveMQMessage.TRANSACTION_ID_INDEX)) { 135 dataOut.writeShort(wireFormat.getWriteCachedKey(msg.getTransactionId())); 136 } 137 138 139 }else { 140 super.writeUTF(msg.getJMSClientID(), dataOut); 141 writeUTF(msg.getProducerKey(),dataOut); 142 ActiveMQDestination.writeToStream(destination, dataOut); 143 if (ba.get(ActiveMQMessage.REPLY_TO_INDEX)) { 144 ActiveMQDestination.writeToStream((ActiveMQDestination) msg.getJMSReplyTo(), dataOut); 145 } 146 if (ba.get(ActiveMQMessage.TRANSACTION_ID_INDEX)) { 147 if( ba.get(ActiveMQMessage.XA_TRANS_INDEX) ) { 148 ActiveMQXid xid = (ActiveMQXid) msg.getTransactionId(); 149 xid.write(dataOut); 150 } else { 151 super.writeUTF((String) msg.getTransactionId(), dataOut); 152 } 153 } 154 } 155 156 dataOut.writeByte(msg.getJMSDeliveryMode()); 157 dataOut.writeByte(msg.getJMSPriority()); 158 159 160 if (ba.get(ActiveMQMessage.CORRELATION_INDEX)) { 161 super.writeUTF(msg.getJMSCorrelationID(), dataOut); 162 } 163 if (ba.get(ActiveMQMessage.TYPE_INDEX)) { 164 super.writeUTF(msg.getJMSType(), dataOut); 165 } 166 if (ba.get(ActiveMQMessage.BROKER_NAME_INDEX)) { 167 super.writeUTF(msg.getEntryBrokerName(), dataOut); 168 } 169 if (ba.get(ActiveMQMessage.CLUSTER_NAME_INDEX)) { 170 super.writeUTF(msg.getEntryClusterName(), dataOut); 171 } 172 173 174 if (ba.get(ActiveMQMessage.TIMESTAMP_INDEX)) { 175 dataOut.writeLong(msg.getJMSTimestamp()); 176 } 177 if (ba.get(ActiveMQMessage.EXPIRATION_INDEX)) { 178 dataOut.writeLong(msg.getJMSExpiration()); 179 } 180 if (longSequence){ 181 dataOut.writeLong(msg.getSequenceNumber()); 182 }else{ 183 dataOut.writeInt((int) msg.getSequenceNumber()); 184 } 185 186 dataOut.writeByte(msg.getDeliveryCount()); 187 188 if (ba.get(ActiveMQMessage.CID_INDEX)) { 189 //write out consumer numbers ... 190 int[] cids = msg.getConsumerNos(); 191 dataOut.writeShort(cids.length); 192 for (int i = 0; i < cids.length; i++) { 193 dataOut.writeShort(cids[i]); 194 } 195 } 196 if (ba.get(ActiveMQMessage.PROPERTIES_INDEX)) { 197 msg.writeMapProperties(msg.getProperties(), dataOut); 198 } 199 if (ba.get(ActiveMQMessage.PAYLOAD_INDEX)) { 200 dataOut.writeInt(payload.getLength()); 201 dataOut.write(payload.getBuf(),payload.getOffset(),payload.getLength()); 202 } 203 204 205 206 207 } 208 }