001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.io.impl;
020    import java.io.DataInput;
021    import java.io.IOException;
022    import javax.jms.Destination;
023    
024    import org.activemq.io.util.WireByteArrayInputStream;
025    import org.activemq.message.AbstractPacket;
026    import org.activemq.message.ActiveMQDestination;
027    import org.activemq.message.ActiveMQMessage;
028    import org.activemq.message.ActiveMQXid;
029    import org.activemq.message.Packet;
030    import org.activemq.util.BitArray;
031    
032    /**
033     * Writes a ProducerInfo object to a Stream
034     */
035    public class ActiveMQMessageReader extends AbstractPacketReader {
036        
037        private AbstractDefaultWireFormat wireFormat;
038       
039        
040        ActiveMQMessageReader(AbstractDefaultWireFormat wf){
041            this.wireFormat = wf;
042        }
043        /**
044         * Return the type of Packet
045         * 
046         * @return integer representation of the type of Packet
047         */
048        public int getPacketType() {
049            return Packet.ACTIVEMQ_MESSAGE;
050        }
051    
052        /**
053         * @return a new Packet instance
054         */
055        public Packet createPacket() {
056            return new ActiveMQMessage();
057        }
058    
059        /**
060         * build a Packet instance from the data input stream
061         *
062         * @param packet A Packet object
063         * @param dataIn the data input stream to build the packet from
064         * @throws IOException
065         */
066        public void buildPacket(Packet packet, DataInput dataIn) throws IOException {
067            ActiveMQMessage msg = (ActiveMQMessage) packet;
068            BitArray ba = msg.getBitArray();
069            ba.readFromStream(dataIn);
070            
071            boolean receiptRequired = ba.get(AbstractPacket.RECEIPT_REQUIRED_INDEX);
072            if (receiptRequired){
073                msg.setReceiptRequired(receiptRequired);
074                msg.setId(dataIn.readShort());
075            }
076            boolean externalMessageId = ba.get(ActiveMQMessage.EXTERNAL_MESSAGE_ID_INDEX);
077            
078            if (externalMessageId){
079                msg.setExternalMessageId(externalMessageId);
080                msg.setJMSMessageID(readUTF(dataIn));
081            }
082            
083            boolean cachingEnabled = ba.get(ActiveMQMessage.CACHED_VALUES_INDEX);
084            boolean cachingDestination = ba.get(ActiveMQMessage.CACHED_DESTINATION_INDEX);
085            
086            boolean messagePart = ba.get(ActiveMQMessage.MESSAGE_PART_INDEX);
087            msg.setMessagePart(messagePart);
088            if (messagePart){
089                msg.setParentMessageID(dataIn.readUTF());
090                msg.setNumberOfParts(dataIn.readShort());
091                msg.setPartNumber(dataIn.readShort());
092            }
093           
094            if (ba.get(AbstractPacket.BROKERS_VISITED_INDEX)){
095                int visitedLen = dataIn.readShort();
096                for (int i =0; i < visitedLen; i++){
097                    msg.addBrokerVisited(dataIn.readUTF());
098                }        
099            }
100            if (cachingEnabled){
101                short key = dataIn.readShort();
102                msg.setJMSClientID((String)wireFormat.getValueFromReadCache(key));
103                key = dataIn.readShort();
104                msg.setProducerKey((String)wireFormat.getValueFromReadCache(key));
105                if (cachingDestination){
106                    key = dataIn.readShort();
107                    msg.setJMSDestination((Destination)wireFormat.getValueFromReadCache(key));
108                }else{
109                    msg.setJMSDestination(ActiveMQDestination.readFromStream(dataIn));
110                }
111                if (ba.get(ActiveMQMessage.REPLY_TO_INDEX)) {
112                    key = dataIn.readShort();
113                    msg.setJMSReplyTo((Destination)wireFormat.getValueFromReadCache(key));
114                }
115                if (ba.get(ActiveMQMessage.TRANSACTION_ID_INDEX)) {
116                    key = dataIn.readShort();
117                    msg.setTransactionId(wireFormat.getValueFromReadCache(key));
118                }
119                
120            }else {
121                msg.setJMSClientID(super.readUTF(dataIn));
122                msg.setProducerKey(dataIn.readUTF());
123                msg.setJMSDestination(ActiveMQDestination.readFromStream(dataIn));
124                if (ba.get(ActiveMQMessage.REPLY_TO_INDEX)) {
125                    msg.setJMSReplyTo(ActiveMQDestination.readFromStream(dataIn));
126                }
127                if (ba.get(ActiveMQMessage.TRANSACTION_ID_INDEX)) {
128                    if( ba.get(ActiveMQMessage.XA_TRANS_INDEX) ) {
129                        msg.setTransactionId(ActiveMQXid.read(dataIn));                
130                    } else {
131                        msg.setTransactionId(super.readUTF(dataIn));
132                    }
133                } else {
134                    msg.setTransactionId(null);
135                }
136            }
137            
138            
139            msg.setJMSDeliveryMode(dataIn.readByte());
140            msg.setJMSPriority(dataIn.readByte());
141         
142            msg.setJMSRedelivered(ba.get(ActiveMQMessage.REDELIVERED_INDEX));
143    
144            if (ba.get(ActiveMQMessage.CORRELATION_INDEX)) {
145                msg.setJMSCorrelationID(super.readUTF(dataIn));
146            }
147            if (ba.get(ActiveMQMessage.TYPE_INDEX)) {
148                msg.setJMSType(super.readUTF(dataIn));
149            }
150            if (ba.get(ActiveMQMessage.BROKER_NAME_INDEX)) {
151                msg.setEntryBrokerName(super.readUTF(dataIn));
152            }
153            if (ba.get(ActiveMQMessage.CLUSTER_NAME_INDEX)) {
154                msg.setEntryClusterName(super.readUTF(dataIn));
155            }
156            
157            if (ba.get(ActiveMQMessage.TIMESTAMP_INDEX)) {
158                msg.setJMSTimestamp(dataIn.readLong());
159            }
160            if (ba.get(ActiveMQMessage.EXPIRATION_INDEX)) {
161                msg.setJMSExpiration(dataIn.readLong());
162            }
163            if (ba.get(ActiveMQMessage.LONG_SEQUENCE_INDEX)){
164                msg.setSequenceNumber(dataIn.readLong());
165            }else {
166                msg.setSequenceNumber(dataIn.readInt());
167            }
168            msg.setDeliveryCount(dataIn.readByte());
169            if (ba.get(ActiveMQMessage.DISPATCHED_FROM_DLQ_INDEX)){
170                msg.setDispatchedFromDLQ(true);
171            }
172            
173            if (ba.get(ActiveMQMessage.CID_INDEX)) {
174                int cidlength = dataIn.readShort();
175                if (cidlength > 0) {
176                    int[] cids = new int[cidlength];
177                    for (int i = 0; i < cids.length; i++) {
178                        cids[i] = dataIn.readShort();
179                    }
180                    msg.setConsumerNos(cids);
181                }
182            }
183            if (ba.get(ActiveMQMessage.PROPERTIES_INDEX)) {
184                msg.setProperties(msg.readMapProperties(dataIn));
185            }
186            
187            if (ba.get(ActiveMQMessage.PAYLOAD_INDEX)) {
188                int payloadLength = dataIn.readInt();
189                if (payloadLength >= 0) {
190                    if (dataIn instanceof WireByteArrayInputStream){
191                        WireByteArrayInputStream wireIn = (WireByteArrayInputStream)dataIn;
192                        msg.setBodyAsBytes(wireIn.getRawData(), wireIn.position(), payloadLength);
193                    }else {
194                    byte[] payload = new byte[payloadLength];
195                    dataIn.readFully(payload);
196                    msg.setBodyAsBytes(payload,0,payload.length);
197                    }
198                }
199            }
200            
201               
202        }
203    }