001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.io;
020    import java.io.ByteArrayInputStream;
021    import java.io.ByteArrayOutputStream;
022    import java.io.DataInput;
023    import java.io.DataInputStream;
024    import java.io.DataOutputStream;
025    import java.io.IOException;
026    import java.net.DatagramPacket;
027    
028    import javax.jms.JMSException;
029    
030    import org.activemq.message.Packet;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    /**
035     * Represents a strategy of encoding packets on the wire or on disk using some kind of serialization or wire format.
036     * <p/>We use a default efficient format for Java to Java communication but other formats to other systems can be used,
037     * such as using simple text strings when talking to JavaScript or coming up with other formats for talking to C / C#
038     * languages or proprietary messaging systems we wish to interface with at the wire level etc.
039     * 
040     * @version $Revision: 1.1.1.1 $
041     */
042    public abstract class AbstractWireFormat implements WireFormat {
043        private static final Log log = LogFactory.getLog(AbstractWireFormat.class);
044        protected DataOutputStream transportDataOut;
045        protected DataInputStream transportDataIn;
046        protected boolean cachingEnabled;
047    
048        /**
049         * Read a packet from a Datagram packet from the given channelID. If the packet is from the same channel ID as it
050         * was sent then we have a loop-back so discard the packet
051         * 
052         * @param channelID is the unique channel ID
053         * @param dpacket
054         * @return the packet read from the datagram or null if it should be discarded
055         * @throws IOException
056         */
057        public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException {
058            DataInput in = new DataInputStream(new ByteArrayInputStream(dpacket.getData(), dpacket.getOffset(), dpacket
059                    .getLength()));
060            String id = in.readUTF();
061            if (channelID == null) {
062                log
063                        .trace("We do not have a channelID which is probably caused by a synchronization issue, we're receiving messages before we're fully initialised");
064            }
065            else if (channelID.equals(id)) {
066                if (log.isTraceEnabled()) {
067                    log.trace("Discarding packet from id: " + id);
068                }
069                return null;
070            }
071            int type = in.readByte();
072            Packet packet = readPacket(type, in);
073            //        if (packet instanceof ActiveMQMessage) {
074            //            System.out.println("##### read packet from channel: " + id + " in channel: " + channelID + " message: " +
075            // packet);
076            //        }
077            //
078            return packet;
079        }
080    
081        /**
082         * Writes the given package to a new datagram
083         * 
084         * @param channelID is the unique channel ID
085         * @param packet is the packet to write
086         * @return @throws IOException
087         * @throws JMSException
088         */
089        public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException {
090            ByteArrayOutputStream out = new ByteArrayOutputStream();
091            DataOutputStream dataOut = new DataOutputStream(out);
092            channelID = channelID != null ? channelID : "";
093            dataOut.writeUTF(channelID);
094            writePacket(packet, dataOut);
095            dataOut.close();
096            byte[] data = out.toByteArray();
097            return new DatagramPacket(data, data.length);
098        }
099    
100        /**
101         * Reads the packet from the given byte[]
102         * 
103         * @param bytes
104         * @param offset
105         * @param length
106         * @return @throws IOException
107         */
108        public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException {
109            DataInput in = new DataInputStream(new ByteArrayInputStream(bytes, offset, length));
110            return readPacket(in);
111        }
112    
113        /**
114         * Reads the packet from the given byte[]
115         * 
116         * @param bytes
117         * @return @throws IOException
118         */
119        public Packet fromBytes(byte[] bytes) throws IOException {
120            DataInput in = new DataInputStream(new ByteArrayInputStream(bytes));
121            return readPacket(in);
122        }
123    
124        /**
125         * A helper method which converts a packet into a byte array
126         * 
127         * @param packet
128         * @return a byte array representing the packet using some wire protocol
129         * @throws IOException
130         * @throws JMSException
131         */
132        public byte[] toBytes(Packet packet) throws IOException, JMSException {
133            ByteArrayOutputStream out = new ByteArrayOutputStream();
134            DataOutputStream dataOut = new DataOutputStream(out);
135            writePacket(packet, dataOut);
136            dataOut.close();
137            return out.toByteArray();
138        }
139    
140        /**
141         * some transports may register their streams (e.g. Tcp)
142         * 
143         * @param dataOut
144         * @param dataIn
145         */
146        public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn) {
147            transportDataOut = dataOut;
148            transportDataIn = dataIn;
149        }
150    
151        /**
152         * Some wire formats require a handshake at start-up
153         * 
154         * @throws IOException
155         */
156        public void initiateClientSideProtocol() throws IOException {
157        }
158    
159        /**
160         * Some wire formats require a handshake at start-up
161         * 
162         * @throws IOException
163         */
164        public void initiateServerSideProtocol() throws IOException {
165        }
166    
167        
168        /**
169         * @return Returns the enableCaching.
170         */
171        public boolean isCachingEnabled() {
172            return cachingEnabled;
173        }
174    
175        /**
176         * @param enableCaching The enableCaching to set.
177         */
178        public void setCachingEnabled(boolean enableCaching) {
179            this.cachingEnabled = enableCaching;
180        }
181        
182        /**
183         * some wire formats will implement their own fragementation
184         * @return true unless a wire format supports it's own fragmentation
185         */
186        public boolean doesSupportMessageFragmentation(){
187            return true;
188        }
189        
190        
191        /**
192         * Some wire formats will not be able to understand compressed messages
193         * @return true unless a wire format cannot understand compression
194         */
195        public boolean doesSupportMessageCompression(){
196            return true;
197        }
198        /**
199         * @return Returns the transportDataOut.
200         */
201        public DataOutputStream getTransportDataOut() {
202            return transportDataOut;
203        }
204        /**
205         * @param transportDataOut The transportDataOut to set.
206         */
207        public void setTransportDataOut(DataOutputStream transportDataOut) {
208            this.transportDataOut = transportDataOut;
209        }
210        /**
211         * @return Returns the transportDataIn.
212         */
213        public DataInputStream getTransportDataIn() {
214            return transportDataIn;
215        }
216        /**
217         * @param transportDataIn The transportDataIn to set.
218         */
219        public void setTransportDataIn(DataInputStream transportDataIn) {
220            this.transportDataIn = transportDataIn;
221        }
222    
223        /**
224         * @param dataIn
225         * @return
226         * @throws java.io.IOException
227         */
228        public Packet readPacket(DataInput dataIn) throws IOException {
229            int type = -1;
230            while ((type = dataIn.readByte()) == 0);
231    
232            if (type == -1){
233                throw new IOException("InputStream now closed");
234            }
235            return readPacket(type, dataIn);
236        }
237    }