001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    
019    package org.activemq.io.impl;
020    import java.io.DataInput;
021    import java.io.IOException;
022    
023    import org.activemq.message.AbstractPacket;
024    import org.activemq.message.ActiveMQDestination;
025    import org.activemq.message.ActiveMQXid;
026    import org.activemq.message.MessageAck;
027    import org.activemq.message.Packet;
028    import org.activemq.util.BitArray;
029    
030    /**
031     * Reads a ConsumerInfo object from a Stream
032     */
033    public class MessageAckReader extends AbstractPacketReader {
034        private AbstractDefaultWireFormat wireFormat;
035    
036        MessageAckReader(AbstractDefaultWireFormat wf) {
037            this.wireFormat = wf;
038        }
039    
040        MessageAckReader() {
041        }
042    
043        /**
044         * Return the type of Packet
045         * 
046         * @return integer representation of the type of Packet
047         */
048        public int getPacketType() {
049            return Packet.ACTIVEMQ_MSG_ACK;
050        }
051    
052        /**
053         * @return a new Packet instance
054         */
055        public Packet createPacket() {
056            return new MessageAck();
057        }
058    
059        /**
060         * build a Packet instance from the data input stream
061         * 
062         * @param packet A Packet object
063         * @param dataIn the data input stream to build the packet from
064         * @throws IOException
065         */
066        public void buildPacket(Packet packet, DataInput dataIn) throws IOException {
067            MessageAck ack = (MessageAck) packet;
068            BitArray ba = ack.getBitArray();
069            ba.readFromStream(dataIn);
070            boolean cachingEnabled = ba.get(MessageAck.CACHED_VALUES_INDEX);
071            ack.setMessageRead(ba.get(MessageAck.MESSAGE_READ_INDEX));
072            ack.setPersistent(ba.get(MessageAck.PERSISTENT_INDEX));
073            ack.setExpired(ba.get(MessageAck.EXPIRED_INDEX));
074            if (ba.get(AbstractPacket.RECEIPT_REQUIRED_INDEX)) {
075                ack.setReceiptRequired(true);
076                ack.setId(dataIn.readShort());
077            }
078            if (ba.get(MessageAck.EXTERNAL_MESSAGE_ID_INDEX)) {
079                ack.setExternalMessageId(true);
080                ack.setMessageID(dataIn.readUTF());
081            }
082            else {
083                if (cachingEnabled) {
084                    short key = dataIn.readShort();
085                    ack.setProducerKey((String) wireFormat.getValueFromReadCache(key));
086                }
087                else {
088                    ack.setProducerKey(dataIn.readUTF());
089                }
090                if (ba.get(MessageAck.LONG_SEQUENCE_INDEX)) {
091                    ack.setSequenceNumber(dataIn.readLong());
092                }
093                else {
094                    ack.setSequenceNumber(dataIn.readInt());
095                }
096            }
097            if (ba.get(AbstractPacket.BROKERS_VISITED_INDEX)) {
098                int visitedLen = dataIn.readShort();
099                for (int i = 0;i < visitedLen;i++) {
100                    ack.addBrokerVisited(dataIn.readUTF());
101                }
102            }
103            if (ba.get(MessageAck.TRANSACTION_ID_INDEX)) {
104                if (cachingEnabled) {
105                    short key = dataIn.readShort();
106                    ack.setTransactionId(wireFormat.getValueFromReadCache(key));
107                } else {
108                    if (ba.get(MessageAck.XA_TRANS_INDEX)) {
109                        ack.setTransactionId(ActiveMQXid.read(dataIn));
110                    }
111                    else {
112                        ack.setTransactionId(super.readUTF(dataIn));
113                    }
114                }
115            }
116            else {
117                ack.setTransactionId(null);
118            }
119            if (cachingEnabled) {
120                short key = dataIn.readShort();
121                ack.setConsumerId((String) wireFormat.getValueFromReadCache(key));
122                key = dataIn.readShort();
123                ack.setDestination((ActiveMQDestination) wireFormat.getValueFromReadCache(key));
124            }
125            else {
126                ack.setConsumerId(dataIn.readUTF());
127                ack.setDestination(ActiveMQDestination.readFromStream(dataIn));
128            }
129        }
130    }