001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.message;
020    
021    import javax.jms.JMSException;
022    import javax.jms.MessageEOFException;
023    import javax.jms.MessageFormatException;
024    import javax.jms.MessageNotReadableException;
025    import javax.jms.MessageNotWriteableException;
026    import javax.jms.StreamMessage;
027    
028    import org.activemq.io.util.ByteArray;
029    import org.activemq.io.util.ByteArrayCompression;
030    
031    import java.io.ByteArrayInputStream;
032    import java.io.ByteArrayOutputStream;
033    import java.io.DataInputStream;
034    import java.io.DataOutputStream;
035    import java.io.EOFException;
036    import java.io.IOException;
037    
038    /**
039     * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
040     * types in the Java programming language. It is filled and read sequentially.
041     * It inherits from the <CODE>Message</CODE> interface
042     * and adds a stream message body. Its methods are based largely on those
043     * found in <CODE>java.io.DataInputStream</CODE> and
044     * <CODE>java.io.DataOutputStream</CODE>.
045     * <p/>
046     * <P>The primitive types can be read or written explicitly using methods
047     * for each type. They may also be read or written generically as objects.
048     * For instance, a call to <CODE>StreamMessage.writeInt(6)</CODE> is
049     * equivalent to <CODE>StreamMessage.writeObject(new Integer(6))</CODE>.
050     * Both forms are provided, because the explicit form is convenient for
051     * static programming, and the object form is needed when types are not known
052     * at compile time.
053     * <p/>
054     * <P>When the message is first created, and when <CODE>clearBody</CODE>
055     * is called, the body of the message is in write-only mode. After the
056     * first call to <CODE>reset</CODE> has been made, the message body is in
057     * read-only mode.
058     * After a message has been sent, the client that sent it can retain and
059     * modify it without affecting the message that has been sent. The same message
060     * object can be sent multiple times.
061     * When a message has been received, the provider has called
062     * <CODE>reset</CODE> so that the message body is in read-only mode for the client.
063     * <p/>
064     * <P>If <CODE>clearBody</CODE> is called on a message in read-only mode,
065     * the message body is cleared and the message body is in write-only mode.
066     * <p/>
067     * <P>If a client attempts to read a message in write-only mode, a
068     * <CODE>MessageNotReadableException</CODE> is thrown.
069     * <p/>
070     * <P>If a client attempts to write a message in read-only mode, a
071     * <CODE>MessageNotWriteableException</CODE> is thrown.
072     * <p/>
073     * <P><CODE>StreamMessage</CODE> objects support the following conversion
074     * table. The marked cases must be supported. The unmarked cases must throw a
075     * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive conversions
076     * may throw a runtime exception if the primitive's <CODE>valueOf()</CODE>
077     * method does not accept it as a valid <CODE>String</CODE> representation of
078     * the primitive.
079     * <p/>
080     * <P>A value written as the row type can be read as the column type.
081     * <p/>
082     * <PRE>
083     * |        | boolean byte short char int long float double String byte[]
084     * |----------------------------------------------------------------------
085     * |boolean |    X                                            X
086     * |byte    |          X     X         X   X                  X
087     * |short   |                X         X   X                  X
088     * |char    |                     X                           X
089     * |int     |                          X   X                  X
090     * |long    |                              X                  X
091     * |float   |                                    X     X      X
092     * |double  |                                          X      X
093     * |String  |    X     X     X         X   X     X     X      X
094     * |byte[]  |                                                        X
095     * |----------------------------------------------------------------------
096     * </PRE>
097     * <p/>
098     * <P>Attempting to read a null value as a primitive type must be treated
099     * as calling the primitive's corresponding <code>valueOf(String)</code>
100     * conversion method with a null value. Since <code>char</code> does not
101     * support a <code>String</code> conversion, attempting to read a null value
102     * as a <code>char</code> must throw a <code>NullPointerException</code>.
103     *
104     * @see javax.jms.Session#createStreamMessage()
105     * @see javax.jms.BytesMessage
106     * @see javax.jms.MapMessage
107     * @see javax.jms.Message
108     * @see javax.jms.ObjectMessage
109     * @see javax.jms.TextMessage
110     */
111    
112    public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
113    
114        private DataOutputStream dataOut;
115        private ByteArrayOutputStream bytesOut;
116        private DataInputStream dataIn;
117        private int bytesToRead = -1;
118    
119    
120        /**
121         * Return the type of Packet
122         *
123         * @return integer representation of the type of Packet
124         */
125    
126        public int getPacketType() {
127            return ACTIVEMQ_STREAM_MESSAGE;
128        }
129    
130        /**
131         * @return Returns a shallow copy of the message instance
132         * @throws JMSException
133         */
134    
135        public ActiveMQMessage shallowCopy() throws JMSException {
136            ActiveMQStreamMessage other = new ActiveMQStreamMessage();
137            this.initializeOther(other);
138            try {
139                other.setBodyAsBytes(this.getBodyAsBytes());
140            }
141            catch (IOException e) {
142                JMSException jmsEx = new JMSException("setBodyAsBytes() failed");
143                jmsEx.setLinkedException(e);
144                throw jmsEx;
145            }
146            return other;
147        }
148    
149        /**
150         * @return Returns a deep copy of the message - note the header fields are only shallow copied
151         * @throws JMSException
152         */
153    
154        public ActiveMQMessage deepCopy() throws JMSException {
155            ActiveMQStreamMessage other = new ActiveMQStreamMessage();
156            this.initializeOther(other);
157            try {
158                if (this.getBodyAsBytes() != null) {
159                    ByteArray data = this.getBodyAsBytes().copy();
160                    other.setBodyAsBytes(data);
161                }
162            }
163            catch (IOException e) {
164                JMSException jmsEx = new JMSException("setBodyAsBytes() failed");
165                jmsEx.setLinkedException(e);
166                throw jmsEx;
167            }
168            return other;
169        }
170    
171    
172        /**
173         * Clears out the message body. Clearing a message's body does not clear
174         * its header values or property entries.
175         * <p/>
176         * <P>If this message body was read-only, calling this method leaves
177         * the message body in the same state as an empty body in a newly
178         * created message.
179         *
180         * @throws JMSException if the JMS provider fails to clear the message
181         *                      body due to some internal error.
182         */
183    
184        public void clearBody() throws JMSException {
185            super.clearBody();
186            this.dataOut = null;
187            this.dataIn = null;
188            this.bytesOut = null;
189        }
190    
191        /**
192         * Reads a <code>boolean</code> from the stream message.
193         *
194         * @return the <code>boolean</code> value read
195         * @throws JMSException                if the JMS provider fails to read the message
196         *                                     due to some internal error.
197         * @throws MessageEOFException         if unexpected end of message stream has
198         *                                     been reached.
199         * @throws MessageFormatException      if this type conversion is invalid.
200         * @throws MessageNotReadableException if the message is in write-only
201         *                                     mode.
202         */
203    
204        public boolean readBoolean() throws JMSException {
205            initializeReading();
206            try {
207                if (this.dataIn.available() == 0) {
208                    throw new MessageEOFException("reached end of data");
209                }
210    
211                this.dataIn.mark(10);
212                int type = this.dataIn.read();
213                if (type == BOOLEAN) {
214                    return this.dataIn.readBoolean();
215                }
216                if (type == STRING) {
217                    return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
218                } 
219                if (type == NULL) {
220                    this.dataIn.reset();
221                    throw new NullPointerException("Cannot convert NULL value to boolean.");
222                }
223                else {
224                    this.dataIn.reset();
225                    throw new MessageFormatException(" not a boolean type");
226                }
227            }
228            catch (EOFException e) {
229                JMSException jmsEx = new MessageEOFException(e.getMessage());
230                jmsEx.setLinkedException(e);
231                throw jmsEx;
232            }
233            catch (IOException e) {
234                JMSException jmsEx = new MessageFormatException(e.getMessage());
235                jmsEx.setLinkedException(e);
236                throw jmsEx;
237            }
238        }
239    
240    
241        /**
242         * Reads a <code>byte</code> value from the stream message.
243         *
244         * @return the next byte from the stream message as a 8-bit
245         *         <code>byte</code>
246         * @throws JMSException                if the JMS provider fails to read the message
247         *                                     due to some internal error.
248         * @throws MessageEOFException         if unexpected end of message stream has
249         *                                     been reached.
250         * @throws MessageFormatException      if this type conversion is invalid.
251         * @throws MessageNotReadableException if the message is in write-only
252         *                                     mode.
253         */
254    
255        public byte readByte() throws JMSException {
256            initializeReading();
257            try {
258                if (this.dataIn.available() == 0) {
259                    throw new MessageEOFException("reached end of data");
260                }
261    
262                this.dataIn.mark(10);
263                int type = this.dataIn.read();
264                if (type == BYTE) {
265                    return this.dataIn.readByte();
266                }
267                if (type == STRING) {
268                    return Byte.valueOf(this.dataIn.readUTF()).byteValue();
269                } 
270                if (type == NULL) {
271                    this.dataIn.reset();
272                    throw new NullPointerException("Cannot convert NULL value to byte.");
273                }
274                else {
275                    this.dataIn.reset();
276                    throw new MessageFormatException(" not a byte type");
277                }
278            }
279            catch (NumberFormatException mfe) {
280                try {
281                    this.dataIn.reset();
282                }
283                catch (IOException ioe) {
284                    JMSException jmsEx = new JMSException("reset failed");
285                    jmsEx.setLinkedException(ioe);
286                }
287                throw mfe;
288    
289            }
290            catch (EOFException e) {
291                JMSException jmsEx = new MessageEOFException(e.getMessage());
292                jmsEx.setLinkedException(e);
293                throw jmsEx;
294            }
295            catch (IOException e) {
296                JMSException jmsEx = new MessageFormatException(e.getMessage());
297                jmsEx.setLinkedException(e);
298                throw jmsEx;
299            }
300        }
301    
302    
303        /**
304         * Reads a 16-bit integer from the stream message.
305         *
306         * @return a 16-bit integer from the stream message
307         * @throws JMSException                if the JMS provider fails to read the message
308         *                                     due to some internal error.
309         * @throws MessageEOFException         if unexpected end of message stream has
310         *                                     been reached.
311         * @throws MessageFormatException      if this type conversion is invalid.
312         * @throws MessageNotReadableException if the message is in write-only
313         *                                     mode.
314         */
315    
316        public short readShort() throws JMSException {
317            initializeReading();
318            try {
319                if (this.dataIn.available() == 0) {
320                    throw new MessageEOFException("reached end of data");
321                }
322    
323                this.dataIn.mark(17);
324                int type = this.dataIn.read();
325                if (type == SHORT) {
326                    return this.dataIn.readShort();
327                }
328                if (type == BYTE) {
329                    return this.dataIn.readByte();
330                }
331                if (type == STRING) {
332                    return Short.valueOf(this.dataIn.readUTF()).shortValue();
333                } 
334                if (type == NULL) {
335                    this.dataIn.reset();
336                    throw new NullPointerException("Cannot convert NULL value to short.");
337                }
338                else {
339                    this.dataIn.reset();
340                    throw new MessageFormatException(" not a short type");
341                }
342            }
343            catch (NumberFormatException mfe) {
344                try {
345                    this.dataIn.reset();
346                }
347                catch (IOException ioe) {
348                    JMSException jmsEx = new JMSException("reset failed");
349                    jmsEx.setLinkedException(ioe);
350                }
351                throw mfe;
352    
353            }
354            catch (EOFException e) {
355                JMSException jmsEx = new MessageEOFException(e.getMessage());
356                jmsEx.setLinkedException(e);
357                throw jmsEx;
358            }
359            catch (IOException e) {
360                JMSException jmsEx = new MessageFormatException(e.getMessage());
361                jmsEx.setLinkedException(e);
362                throw jmsEx;
363            }
364    
365        }
366    
367    
368        /**
369         * Reads a Unicode character value from the stream message.
370         *
371         * @return a Unicode character from the stream message
372         * @throws JMSException                if the JMS provider fails to read the message
373         *                                     due to some internal error.
374         * @throws MessageEOFException         if unexpected end of message stream has
375         *                                     been reached.
376         * @throws MessageFormatException      if this type conversion is invalid
377         * @throws MessageNotReadableException if the message is in write-only
378         *                                     mode.
379         */
380    
381        public char readChar() throws JMSException {
382            initializeReading();
383            try {
384                if (this.dataIn.available() == 0) {
385                    throw new MessageEOFException("reached end of data");
386                }
387    
388                this.dataIn.mark(17);
389                int type = this.dataIn.read();
390                if (type == CHAR) {
391                    return this.dataIn.readChar();
392                } 
393                if (type == NULL) {
394                    this.dataIn.reset();
395                    throw new NullPointerException("Cannot convert NULL value to char.");
396                } else {
397                    this.dataIn.reset();
398                    throw new MessageFormatException(" not a char type");
399                }
400            }
401            catch (NumberFormatException mfe) {
402                try {
403                    this.dataIn.reset();
404                }
405                catch (IOException ioe) {
406                    JMSException jmsEx = new JMSException("reset failed");
407                    jmsEx.setLinkedException(ioe);
408                }
409                throw mfe;
410    
411            }
412            catch (EOFException e) {
413                JMSException jmsEx = new MessageEOFException(e.getMessage());
414                jmsEx.setLinkedException(e);
415                throw jmsEx;
416            }
417            catch (IOException e) {
418                JMSException jmsEx = new MessageFormatException(e.getMessage());
419                jmsEx.setLinkedException(e);
420                throw jmsEx;
421            }
422        }
423    
424    
425        /**
426         * Reads a 32-bit integer from the stream message.
427         *
428         * @return a 32-bit integer value from the stream message, interpreted
429         *         as an <code>int</code>
430         * @throws JMSException                if the JMS provider fails to read the message
431         *                                     due to some internal error.
432         * @throws MessageEOFException         if unexpected end of message stream has
433         *                                     been reached.
434         * @throws MessageFormatException      if this type conversion is invalid.
435         * @throws MessageNotReadableException if the message is in write-only
436         *                                     mode.
437         */
438    
439        public int readInt() throws JMSException {
440            initializeReading();
441            try {
442                if (this.dataIn.available() == 0) {
443                    throw new MessageEOFException("reached end of data");
444                }
445    
446                this.dataIn.mark(33);
447                int type = this.dataIn.read();
448                if (type == INT) {
449                    return this.dataIn.readInt();
450                }
451                if (type == SHORT) {
452                    return this.dataIn.readShort();
453                }
454                if (type == BYTE) {
455                    return this.dataIn.readByte();
456                }
457                if (type == STRING) {
458                    return Integer.valueOf(this.dataIn.readUTF()).intValue();
459                } 
460                if (type == NULL) {
461                    this.dataIn.reset();
462                    throw new NullPointerException("Cannot convert NULL value to int.");
463                }
464                else {
465                    this.dataIn.reset();
466                    throw new MessageFormatException(" not an int type");
467                }
468            }
469            catch (NumberFormatException mfe) {
470                try {
471                    this.dataIn.reset();
472                }
473                catch (IOException ioe) {
474                    JMSException jmsEx = new JMSException("reset failed");
475                    jmsEx.setLinkedException(ioe);
476                }
477                throw mfe;
478    
479            }
480            catch (EOFException e) {
481                JMSException jmsEx = new MessageEOFException(e.getMessage());
482                jmsEx.setLinkedException(e);
483                throw jmsEx;
484            }
485            catch (IOException e) {
486                JMSException jmsEx = new MessageFormatException(e.getMessage());
487                jmsEx.setLinkedException(e);
488                throw jmsEx;
489            }
490        }
491    
492    
493        /**
494         * Reads a 64-bit integer from the stream message.
495         *
496         * @return a 64-bit integer value from the stream message, interpreted as
497         *         a <code>long</code>
498         * @throws JMSException                if the JMS provider fails to read the message
499         *                                     due to some internal error.
500         * @throws MessageEOFException         if unexpected end of message stream has
501         *                                     been reached.
502         * @throws MessageFormatException      if this type conversion is invalid.
503         * @throws MessageNotReadableException if the message is in write-only
504         *                                     mode.
505         */
506    
507        public long readLong() throws JMSException {
508            initializeReading();
509            try {
510                if (this.dataIn.available() == 0) {
511                    throw new MessageEOFException("reached end of data");
512                }
513    
514                this.dataIn.mark(65);
515                int type = this.dataIn.read();
516                if (type == LONG) {
517                    return this.dataIn.readLong();
518                }
519                if (type == INT) {
520                    return this.dataIn.readInt();
521                }
522                if (type == SHORT) {
523                    return this.dataIn.readShort();
524                }
525                if (type == BYTE) {
526                    return this.dataIn.readByte();
527                }
528                if (type == STRING) {
529                    return Long.valueOf(this.dataIn.readUTF()).longValue();
530                } 
531                if (type == NULL) {
532                    this.dataIn.reset();
533                    throw new NullPointerException("Cannot convert NULL value to long.");
534                }
535                else {
536                    this.dataIn.reset();
537                    throw new MessageFormatException(" not a long type");
538                }
539            }
540            catch (NumberFormatException mfe) {
541                try {
542                    this.dataIn.reset();
543                }
544                catch (IOException ioe) {
545                    JMSException jmsEx = new JMSException("reset failed");
546                    jmsEx.setLinkedException(ioe);
547                }
548                throw mfe;
549    
550            }
551            catch (EOFException e) {
552                JMSException jmsEx = new MessageEOFException(e.getMessage());
553                jmsEx.setLinkedException(e);
554                throw jmsEx;
555            }
556            catch (IOException e) {
557                JMSException jmsEx = new MessageFormatException(e.getMessage());
558                jmsEx.setLinkedException(e);
559                throw jmsEx;
560            }
561        }
562    
563    
564        /**
565         * Reads a <code>float</code> from the stream message.
566         *
567         * @return a <code>float</code> value from the stream message
568         * @throws JMSException                if the JMS provider fails to read the message
569         *                                     due to some internal error.
570         * @throws MessageEOFException         if unexpected end of message stream has
571         *                                     been reached.
572         * @throws MessageFormatException      if this type conversion is invalid.
573         * @throws MessageNotReadableException if the message is in write-only
574         *                                     mode.
575         */
576    
577        public float readFloat() throws JMSException {
578            initializeReading();
579            try {
580                if (this.dataIn.available() == 0) {
581                    throw new MessageEOFException("reached end of data");
582                }
583    
584                this.dataIn.mark(33);
585                int type = this.dataIn.read();
586                if (type == FLOAT) {
587                    return this.dataIn.readFloat();
588                }
589                if (type == STRING) {
590                    return Float.valueOf(this.dataIn.readUTF()).floatValue();
591                } 
592                if (type == NULL) {
593                    this.dataIn.reset();
594                    throw new NullPointerException("Cannot convert NULL value to float.");
595                }
596                else {
597                    this.dataIn.reset();
598                    throw new MessageFormatException(" not a float type");
599                }
600            }
601            catch (NumberFormatException mfe) {
602                try {
603                    this.dataIn.reset();
604                }
605                catch (IOException ioe) {
606                    JMSException jmsEx = new JMSException("reset failed");
607                    jmsEx.setLinkedException(ioe);
608                }
609                throw mfe;
610    
611            }
612            catch (EOFException e) {
613                JMSException jmsEx = new MessageEOFException(e.getMessage());
614                jmsEx.setLinkedException(e);
615                throw jmsEx;
616            }
617            catch (IOException e) {
618                JMSException jmsEx = new MessageFormatException(e.getMessage());
619                jmsEx.setLinkedException(e);
620                throw jmsEx;
621            }
622        }
623    
624    
625        /**
626         * Reads a <code>double</code> from the stream message.
627         *
628         * @return a <code>double</code> value from the stream message
629         * @throws JMSException                if the JMS provider fails to read the message
630         *                                     due to some internal error.
631         * @throws MessageEOFException         if unexpected end of message stream has
632         *                                     been reached.
633         * @throws MessageFormatException      if this type conversion is invalid.
634         * @throws MessageNotReadableException if the message is in write-only
635         *                                     mode.
636         */
637    
638        public double readDouble() throws JMSException {
639            initializeReading();
640            try {
641                if (this.dataIn.available() == 0) {
642                    throw new MessageEOFException("reached end of data");
643                }
644    
645                this.dataIn.mark(65);
646                int type = this.dataIn.read();
647                if (type == DOUBLE) {
648                    return this.dataIn.readDouble();
649                }
650                if (type == FLOAT) {
651                    return this.dataIn.readFloat();
652                }
653                if (type == STRING) {
654                    return Double.valueOf(this.dataIn.readUTF()).doubleValue();
655                } 
656                if (type == NULL) {
657                    this.dataIn.reset();
658                    throw new NullPointerException("Cannot convert NULL value to double.");
659                }
660                else {
661                    this.dataIn.reset();
662                    throw new MessageFormatException(" not a double type");
663                }
664            }
665            catch (NumberFormatException mfe) {
666                try {
667                    this.dataIn.reset();
668                }
669                catch (IOException ioe) {
670                    JMSException jmsEx = new JMSException("reset failed");
671                    jmsEx.setLinkedException(ioe);
672                }
673                throw mfe;
674    
675            }
676            catch (EOFException e) {
677                JMSException jmsEx = new MessageEOFException(e.getMessage());
678                jmsEx.setLinkedException(e);
679                throw jmsEx;
680            }
681            catch (IOException e) {
682                JMSException jmsEx = new MessageFormatException(e.getMessage());
683                jmsEx.setLinkedException(e);
684                throw jmsEx;
685            }
686        }
687    
688    
689        /**
690         * Reads a <CODE>String</CODE> from the stream message.
691         *
692         * @return a Unicode string from the stream message
693         * @throws JMSException                if the JMS provider fails to read the message
694         *                                     due to some internal error.
695         * @throws MessageEOFException         if unexpected end of message stream has
696         *                                     been reached.
697         * @throws MessageFormatException      if this type conversion is invalid.
698         * @throws MessageNotReadableException if the message is in write-only
699         *                                     mode.
700         */
701    
702        public String readString() throws JMSException {
703            initializeReading();
704            try {
705                if (this.dataIn.available() == 0) {
706                    throw new MessageEOFException("reached end of data");
707                }
708    
709                this.dataIn.mark(65);
710                int type = this.dataIn.read();
711                if (type == NULL) {
712                    return null;
713                }
714                if (type == STRING) {
715                    return this.dataIn.readUTF();
716                }
717                if (type == LONG) {
718                    return new Long(this.dataIn.readLong()).toString();
719                }
720                if (type == INT) {
721                    return new Integer(this.dataIn.readInt()).toString();
722                }
723                if (type == SHORT) {
724                    return new Short(this.dataIn.readShort()).toString();
725                }
726                if (type == BYTE) {
727                    return new Byte(this.dataIn.readByte()).toString();
728                }
729                if (type == FLOAT) {
730                    return new Float(this.dataIn.readFloat()).toString();
731                }
732                if (type == DOUBLE) {
733                    return new Double(this.dataIn.readDouble()).toString();
734                }
735                if (type == BOOLEAN) {
736                    return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
737                }
738                if (type == CHAR) {
739                    return new Character(this.dataIn.readChar()).toString();
740                }
741                else {
742                    this.dataIn.reset();
743                    throw new MessageFormatException(" not a String type");
744                }
745            }
746            catch (NumberFormatException mfe) {
747                try {
748                    this.dataIn.reset();
749                }
750                catch (IOException ioe) {
751                    JMSException jmsEx = new JMSException("reset failed");
752                    jmsEx.setLinkedException(ioe);
753                }
754                throw mfe;
755    
756            }
757            catch (EOFException e) {
758                JMSException jmsEx = new MessageEOFException(e.getMessage());
759                jmsEx.setLinkedException(e);
760                throw jmsEx;
761            }
762            catch (IOException e) {
763                JMSException jmsEx = new MessageFormatException(e.getMessage());
764                jmsEx.setLinkedException(e);
765                throw jmsEx;
766            }
767        }
768    
769    
770        /**
771         * Reads a byte array field from the stream message into the
772         * specified <CODE>byte[]</CODE> object (the read buffer).
773         * <p/>
774         * <P>To read the field value, <CODE>readBytes</CODE> should be
775         * successively called
776         * until it returns a value less than the length of the read buffer.
777         * The value of the bytes in the buffer following the last byte
778         * read is undefined.
779         * <p/>
780         * <P>If <CODE>readBytes</CODE> returns a value equal to the length of the
781         * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
782         * are no more bytes to be read, this call returns -1.
783         * <p/>
784         * <P>If the byte array field value is null, <CODE>readBytes</CODE>
785         * returns -1.
786         * <p/>
787         * <P>If the byte array field value is empty, <CODE>readBytes</CODE>
788         * returns 0.
789         * <p/>
790         * <P>Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
791         * field value has been made,
792         * the full value of the field must be read before it is valid to read
793         * the next field. An attempt to read the next field before that has
794         * been done will throw a <CODE>MessageFormatException</CODE>.
795         * <p/>
796         * <P>To read the byte field value into a new <CODE>byte[]</CODE> object,
797         * use the <CODE>readObject</CODE> method.
798         *
799         * @param value the buffer into which the data is read
800         * @return the total number of bytes read into the buffer, or -1 if
801         *         there is no more data because the end of the byte field has been
802         *         reached
803         * @throws JMSException                if the JMS provider fails to read the message
804         *                                     due to some internal error.
805         * @throws MessageEOFException         if unexpected end of message stream has
806         *                                     been reached.
807         * @throws MessageFormatException      if this type conversion is invalid.
808         * @throws MessageNotReadableException if the message is in write-only
809         *                                     mode.
810         * @see #readObject()
811         */
812    
813        public int readBytes(byte[] value) throws JMSException {
814            initializeReading();
815            try {
816                if (value == null) {
817                    throw new NullPointerException();
818                }
819                if (bytesToRead == 0) {
820                    bytesToRead = -1;
821                    return -1;
822                }
823                else if (bytesToRead > 0) {
824                    if (value.length >= bytesToRead) {
825                        bytesToRead = 0;
826                        return dataIn.read(value, 0, bytesToRead);
827                    }
828                    else {
829                        bytesToRead -= value.length;
830                        return dataIn.read(value);
831                    }
832                }
833                else {
834                    if (this.dataIn.available() == 0) {
835                        throw new MessageEOFException("reached end of data");
836                    }
837                    if (this.dataIn.available() < 1) {
838                        throw new MessageFormatException("Not enough data left to read value");
839                    }
840                    this.dataIn.mark(value.length + 1);
841                    int type = this.dataIn.read();
842                    if (this.dataIn.available() < 1) {
843                        return -1;
844                    }
845                    if (type != BYTES) {
846                        throw new MessageFormatException("Not a byte array");
847                    }
848                    int len = this.dataIn.readInt();
849    
850                    if (len >= value.length) {
851                        bytesToRead = len - value.length;
852                        return this.dataIn.read(value);
853                    }
854                    else {
855                        bytesToRead = 0;
856                        return this.dataIn.read(value, 0, len);
857                    }
858                }
859            }
860            catch (EOFException e) {
861                JMSException jmsEx = new MessageEOFException(e.getMessage());
862                jmsEx.setLinkedException(e);
863                throw jmsEx;
864            }
865            catch (IOException e) {
866                JMSException jmsEx = new MessageFormatException(e.getMessage());
867                jmsEx.setLinkedException(e);
868                throw jmsEx;
869            }
870        }
871    
872    
873        /**
874         * Reads an object from the stream message.
875         * <p/>
876         * <P>This method can be used to return, in objectified format,
877         * an object in the Java programming language ("Java object") that has
878         * been written to the stream with the equivalent
879         * <CODE>writeObject</CODE> method call, or its equivalent primitive
880         * <CODE>write<I>type</I></CODE> method.
881         * <p/>
882         * <P>Note that byte values are returned as <CODE>byte[]</CODE>, not
883         * <CODE>Byte[]</CODE>.
884         * <p/>
885         * <P>An attempt to call <CODE>readObject</CODE> to read a byte field
886         * value into a new <CODE>byte[]</CODE> object before the full value of the
887         * byte field has been read will throw a
888         * <CODE>MessageFormatException</CODE>.
889         *
890         * @return a Java object from the stream message, in objectified
891         *         format (for example, if the object was written as an <CODE>int</CODE>,
892         *         an <CODE>Integer</CODE> is returned)
893         * @throws JMSException                if the JMS provider fails to read the message
894         *                                     due to some internal error.
895         * @throws MessageEOFException         if unexpected end of message stream has
896         *                                     been reached.
897         * @throws MessageFormatException      if this type conversion is invalid.
898         * @throws MessageNotReadableException if the message is in write-only
899         *                                     mode.
900         * @see #readBytes(byte[] value)
901         */
902    
903        public Object readObject() throws JMSException {
904            initializeReading();
905            try {
906                if (this.dataIn.available() == 0) {
907                    throw new MessageEOFException("reached end of data");
908                }
909    
910                this.dataIn.mark(65);
911                int type = this.dataIn.read();
912                if (type == NULL) {
913                    return null;
914                }
915                if (type == STRING) {
916                    return this.dataIn.readUTF();
917                }
918                if (type == LONG) {
919                    return new Long(this.dataIn.readLong());
920                }
921                if (type == INT) {
922                    return new Integer(this.dataIn.readInt());
923                }
924                if (type == SHORT) {
925                    return new Short(this.dataIn.readShort());
926                }
927                if (type == BYTE) {
928                    return new Byte(this.dataIn.readByte());
929                }
930                if (type == FLOAT) {
931                    return new Float(this.dataIn.readFloat());
932                }
933                if (type == DOUBLE) {
934                    return new Double(this.dataIn.readDouble());
935                }
936                if (type == BOOLEAN) {
937                    return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
938                }
939                if (type == CHAR) {
940                    return new Character(this.dataIn.readChar());
941                }
942                if (type == BYTES) {
943                    int len = this.dataIn.readInt();
944                    byte[] value = new byte[len];
945                    this.dataIn.read(value);
946                    return value;
947                }
948                else {
949                    this.dataIn.reset();
950                    throw new MessageFormatException("unknown type");
951                }
952            }
953            catch (NumberFormatException mfe) {
954                try {
955                    this.dataIn.reset();
956                }
957                catch (IOException ioe) {
958                    JMSException jmsEx = new JMSException("reset failed");
959                    jmsEx.setLinkedException(ioe);
960                }
961                throw mfe;
962    
963            }
964            catch (EOFException e) {
965                JMSException jmsEx = new MessageEOFException(e.getMessage());
966                jmsEx.setLinkedException(e);
967                throw jmsEx;
968            }
969            catch (IOException e) {
970                JMSException jmsEx = new MessageFormatException(e.getMessage());
971                jmsEx.setLinkedException(e);
972                throw jmsEx;
973            }
974        }
975    
976    
977        /**
978         * Writes a <code>boolean</code> to the stream message.
979         * The value <code>true</code> is written as the value
980         * <code>(byte)1</code>; the value <code>false</code> is written as
981         * the value <code>(byte)0</code>.
982         *
983         * @param value the <code>boolean</code> value to be written
984         * @throws JMSException                 if the JMS provider fails to write the message
985         *                                      due to some internal error.
986         * @throws MessageNotWriteableException if the message is in read-only
987         *                                      mode.
988         */
989    
990        public void writeBoolean(boolean value) throws JMSException {
991            initializeWriting();
992            try {
993                this.dataOut.write(BOOLEAN);
994                this.dataOut.writeBoolean(value);
995            }
996            catch (IOException ioe) {
997                JMSException jmsEx = new JMSException(ioe.getMessage());
998                jmsEx.setLinkedException(ioe);
999                throw jmsEx;
1000            }
1001        }
1002    
1003    
1004        /**
1005         * Writes a <code>byte</code> to the stream message.
1006         *
1007         * @param value the <code>byte</code> value to be written
1008         * @throws JMSException                 if the JMS provider fails to write the message
1009         *                                      due to some internal error.
1010         * @throws MessageNotWriteableException if the message is in read-only
1011         *                                      mode.
1012         */
1013    
1014        public void writeByte(byte value) throws JMSException {
1015            initializeWriting();
1016            try {
1017                this.dataOut.write(BYTE);
1018                this.dataOut.writeByte(value);
1019            }
1020            catch (IOException ioe) {
1021                JMSException jmsEx = new JMSException(ioe.getMessage());
1022                jmsEx.setLinkedException(ioe);
1023                throw jmsEx;
1024            }
1025        }
1026    
1027    
1028        /**
1029         * Writes a <code>short</code> to the stream message.
1030         *
1031         * @param value the <code>short</code> value to be written
1032         * @throws JMSException                 if the JMS provider fails to write the message
1033         *                                      due to some internal error.
1034         * @throws MessageNotWriteableException if the message is in read-only
1035         *                                      mode.
1036         */
1037    
1038        public void writeShort(short value) throws JMSException {
1039            initializeWriting();
1040            try {
1041                this.dataOut.write(SHORT);
1042                this.dataOut.writeShort(value);
1043            }
1044            catch (IOException ioe) {
1045                JMSException jmsEx = new JMSException(ioe.getMessage());
1046                jmsEx.setLinkedException(ioe);
1047                throw jmsEx;
1048            }
1049        }
1050    
1051    
1052        /**
1053         * Writes a <code>char</code> to the stream message.
1054         *
1055         * @param value the <code>char</code> value to be written
1056         * @throws JMSException                 if the JMS provider fails to write the message
1057         *                                      due to some internal error.
1058         * @throws MessageNotWriteableException if the message is in read-only
1059         *                                      mode.
1060         */
1061    
1062        public void writeChar(char value) throws JMSException {
1063            initializeWriting();
1064            try {
1065                this.dataOut.write(CHAR);
1066                this.dataOut.writeChar(value);
1067            }
1068            catch (IOException ioe) {
1069                JMSException jmsEx = new JMSException(ioe.getMessage());
1070                jmsEx.setLinkedException(ioe);
1071                throw jmsEx;
1072            }
1073        }
1074    
1075    
1076        /**
1077         * Writes an <code>int</code> to the stream message.
1078         *
1079         * @param value the <code>int</code> value to be written
1080         * @throws JMSException                 if the JMS provider fails to write the message
1081         *                                      due to some internal error.
1082         * @throws MessageNotWriteableException if the message is in read-only
1083         *                                      mode.
1084         */
1085    
1086        public void writeInt(int value) throws JMSException {
1087            initializeWriting();
1088            try {
1089                this.dataOut.write(INT);
1090                this.dataOut.writeInt(value);
1091            }
1092            catch (IOException ioe) {
1093                JMSException jmsEx = new JMSException(ioe.getMessage());
1094                jmsEx.setLinkedException(ioe);
1095                throw jmsEx;
1096            }
1097        }
1098    
1099    
1100        /**
1101         * Writes a <code>long</code> to the stream message.
1102         *
1103         * @param value the <code>long</code> value to be written
1104         * @throws JMSException                 if the JMS provider fails to write the message
1105         *                                      due to some internal error.
1106         * @throws MessageNotWriteableException if the message is in read-only
1107         *                                      mode.
1108         */
1109    
1110        public void writeLong(long value) throws JMSException {
1111            initializeWriting();
1112            try {
1113                this.dataOut.write(LONG);
1114                this.dataOut.writeLong(value);
1115            }
1116            catch (IOException ioe) {
1117                JMSException jmsEx = new JMSException(ioe.getMessage());
1118                jmsEx.setLinkedException(ioe);
1119                throw jmsEx;
1120            }
1121        }
1122    
1123    
1124        /**
1125         * Writes a <code>float</code> to the stream message.
1126         *
1127         * @param value the <code>float</code> value to be written
1128         * @throws JMSException                 if the JMS provider fails to write the message
1129         *                                      due to some internal error.
1130         * @throws MessageNotWriteableException if the message is in read-only
1131         *                                      mode.
1132         */
1133    
1134        public void writeFloat(float value) throws JMSException {
1135            initializeWriting();
1136            try {
1137                this.dataOut.write(FLOAT);
1138                this.dataOut.writeFloat(value);
1139            }
1140            catch (IOException ioe) {
1141                JMSException jmsEx = new JMSException(ioe.getMessage());
1142                jmsEx.setLinkedException(ioe);
1143                throw jmsEx;
1144            }
1145        }
1146    
1147    
1148        /**
1149         * Writes a <code>double</code> to the stream message.
1150         *
1151         * @param value the <code>double</code> value to be written
1152         * @throws JMSException                 if the JMS provider fails to write the message
1153         *                                      due to some internal error.
1154         * @throws MessageNotWriteableException if the message is in read-only
1155         *                                      mode.
1156         */
1157    
1158        public void writeDouble(double value) throws JMSException {
1159            initializeWriting();
1160            try {
1161                this.dataOut.write(DOUBLE);
1162                this.dataOut.writeDouble(value);
1163            }
1164            catch (IOException ioe) {
1165                JMSException jmsEx = new JMSException(ioe.getMessage());
1166                jmsEx.setLinkedException(ioe);
1167                throw jmsEx;
1168            }
1169        }
1170    
1171    
1172        /**
1173         * Writes a <code>String</code> to the stream message.
1174         *
1175         * @param value the <code>String</code> value to be written
1176         * @throws JMSException                 if the JMS provider fails to write the message
1177         *                                      due to some internal error.
1178         * @throws MessageNotWriteableException if the message is in read-only
1179         *                                      mode.
1180         */
1181    
1182        public void writeString(String value) throws JMSException {
1183            initializeWriting();
1184            try {
1185                if (value == null) {
1186                    this.dataOut.write(NULL);
1187                }
1188                else {
1189                    this.dataOut.write(STRING);
1190                    this.dataOut.writeUTF(value);
1191                }
1192            }
1193            catch (IOException ioe) {
1194                JMSException jmsEx = new JMSException(ioe.getMessage());
1195                jmsEx.setLinkedException(ioe);
1196                throw jmsEx;
1197            }
1198        }
1199    
1200    
1201        /**
1202         * Writes a byte array field to the stream message.
1203         * <p/>
1204         * <P>The byte array <code>value</code> is written to the message
1205         * as a byte array field. Consecutively written byte array fields are
1206         * treated as two distinct fields when the fields are read.
1207         *
1208         * @param value the byte array value to be written
1209         * @throws JMSException                 if the JMS provider fails to write the message
1210         *                                      due to some internal error.
1211         * @throws MessageNotWriteableException if the message is in read-only
1212         *                                      mode.
1213         */
1214    
1215        public void writeBytes(byte[] value) throws JMSException {
1216            writeBytes(value, 0, value.length);
1217        }
1218    
1219    
1220        /**
1221         * Writes a portion of a byte array as a byte array field to the stream
1222         * message.
1223         * <p/>
1224         * <P>The a portion of the byte array <code>value</code> is written to the
1225         * message as a byte array field. Consecutively written byte
1226         * array fields are treated as two distinct fields when the fields are
1227         * read.
1228         *
1229         * @param value  the byte array value to be written
1230         * @param offset the initial offset within the byte array
1231         * @param length the number of bytes to use
1232         * @throws JMSException                 if the JMS provider fails to write the message
1233         *                                      due to some internal error.
1234         * @throws MessageNotWriteableException if the message is in read-only
1235         *                                      mode.
1236         */
1237    
1238        public void writeBytes(byte[] value, int offset, int length) throws JMSException {
1239            initializeWriting();
1240            try {
1241                this.dataOut.write(BYTES);
1242                this.dataOut.writeInt(length);
1243                this.dataOut.write(value, offset, length);
1244            }
1245            catch (IOException ioe) {
1246                JMSException jmsEx = new JMSException(ioe.getMessage());
1247                jmsEx.setLinkedException(ioe);
1248                throw jmsEx;
1249            }
1250        }
1251    
1252    
1253        /**
1254         * Writes an object to the stream message.
1255         * <p/>
1256         * <P>This method works only for the objectified primitive
1257         * object types (<code>Integer</code>, <code>Double</code>,
1258         * <code>Long</code>&nbsp;...), <code>String</code> objects, and byte
1259         * arrays.
1260         *
1261         * @param value the Java object to be written
1262         * @throws JMSException                 if the JMS provider fails to write the message
1263         *                                      due to some internal error.
1264         * @throws MessageFormatException       if the object is invalid.
1265         * @throws MessageNotWriteableException if the message is in read-only
1266         *                                      mode.
1267         */
1268    
1269        public void writeObject(Object value) throws JMSException {
1270            initializeWriting();
1271            if (value == null) {
1272                try {
1273                    this.dataOut.write(NULL);
1274                }
1275                catch (IOException ioe) {
1276                    JMSException jmsEx = new JMSException(ioe.getMessage());
1277                    jmsEx.setLinkedException(ioe);
1278                    throw jmsEx;
1279                }
1280            }
1281            else if (value instanceof String) {
1282                writeString(value.toString());
1283            }
1284            else if (value instanceof Character) {
1285                writeChar(((Character) value).charValue());
1286            }
1287            else if (value instanceof Boolean) {
1288                writeBoolean(((Boolean) value).booleanValue());
1289            }
1290            else if (value instanceof Byte) {
1291                writeByte(((Byte) value).byteValue());
1292            }
1293            else if (value instanceof Short) {
1294                writeShort(((Short) value).shortValue());
1295            }
1296            else if (value instanceof Integer) {
1297                writeInt(((Integer) value).intValue());
1298            }
1299            else if (value instanceof Float) {
1300                writeFloat(((Float) value).floatValue());
1301            }
1302            else if (value instanceof Double) {
1303                writeDouble(((Double) value).doubleValue());
1304            }
1305            else if (value instanceof byte[]) {
1306                writeBytes((byte[]) value);
1307            }
1308        }
1309    
1310    
1311        /**
1312         * Puts the message body in read-only mode and repositions the stream of
1313         * bytes to the beginning.
1314         *
1315         * @throws JMSException if an internal error occurs
1316         */
1317    
1318        public void reset() throws JMSException {
1319            super.readOnlyMessage = true;
1320            if (this.dataOut != null) {
1321                try {
1322                    this.dataOut.flush();
1323                    byte[] data = this.bytesOut.toByteArray();
1324                    super.setBodyAsBytes(data,0,data.length);
1325                    dataOut.close();
1326                }
1327                catch (IOException ioe) {
1328                    JMSException jmsEx = new JMSException("reset failed: " + ioe.getMessage());
1329                    jmsEx.setLinkedException(ioe);
1330                    throw jmsEx;
1331                }
1332            }
1333            this.bytesOut = null;
1334            this.dataIn = null;
1335            this.dataOut = null;
1336        }
1337    
1338        /**
1339         * @param bodyAsBytes The bodyAsBytes to set.
1340         * @param offset
1341         * @param length
1342         */
1343        public void setBodyAsBytes(byte[] bodyAsBytes,int offset, int length) {
1344            super.setBodyAsBytes(bodyAsBytes,offset,length);
1345            dataOut = null;
1346            dataIn = null;
1347        }
1348    
1349        /**
1350         * @return Returns the data body
1351         * @throws IOException if an exception occurs retreiving the data
1352         */
1353        public ByteArray getBodyAsBytes() throws IOException {
1354            if (this.dataOut != null) {
1355                this.dataOut.flush();
1356                byte[] data = this.bytesOut.toByteArray();
1357                super.setBodyAsBytes(data,0,data.length);
1358                dataOut.close();
1359                dataOut = null;
1360            }
1361            return super.getBodyAsBytes();
1362        }
1363        
1364        private void initializeWriting() throws MessageNotWriteableException {
1365            if (super.readOnlyMessage) {
1366                throw new MessageNotWriteableException("This message is in read-only mode");
1367            }
1368            if (this.dataOut == null) {
1369                this.bytesOut = new ByteArrayOutputStream();
1370                this.dataOut = new DataOutputStream(this.bytesOut);
1371            }
1372        }
1373    
1374    
1375        private void initializeReading() throws MessageNotReadableException {
1376            if (!super.readOnlyMessage) {
1377                throw new MessageNotReadableException("This message is in write-only mode");
1378            }
1379            try {
1380                ByteArray data = super.getBodyAsBytes();
1381                if (this.dataIn == null && data != null) {
1382                    if (ByteArrayCompression.isCompressed(data)){
1383                        ByteArrayCompression compression = new ByteArrayCompression();
1384                        data = compression.inflate(data);
1385                    }
1386                    ByteArrayInputStream bytesIn = new ByteArrayInputStream(data.getBuf(),data.getOffset(),data.getLength());
1387                    this.dataIn = new DataInputStream(bytesIn);
1388                }
1389            }
1390            catch (IOException e) {
1391                MessageNotReadableException mnr = new MessageNotReadableException("getBodyAsBytes failed");
1392                mnr.setLinkedException(e);
1393                throw mnr;
1394            }
1395        }
1396    
1397        public String toString() {
1398            return super.toString() + " ActiveMQStreamMessage{ " +
1399                    "bytesOut = " + bytesOut +
1400                    ", dataOut = " + dataOut +
1401                    ", dataIn = " + dataIn +
1402                    ", bytesToRead = " + bytesToRead +
1403                    " }";
1404        }
1405    }