001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.streams;
020    import java.io.EOFException;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import javax.jms.JMSException;
024    import javax.jms.MessageConsumer;
025    
026    import org.activemq.io.util.ByteArray;
027    import org.activemq.message.ActiveMQMessage;
028    
029    /**
030     * An inputStream that reads data from a MessageConsumer
031     * 
032     * @version $Revision: 1.1.1.1 $
033     */
034    public class JMSInputStream extends InputStream {
035        private static final int ARRAY_SIZE = 10;
036        private boolean closed;
037        protected ByteArray[] arrays = new ByteArray[ARRAY_SIZE];
038        private int offset;
039        private int current = 0;
040        protected int clen = 0;
041        private int markArray = -1;
042        private int markOffset = -1;
043        private MessageConsumer consumer;
044    
045        /**
046         * Construct an input stream to read from a JMS Consumer
047         * 
048         * @param consumer
049         */
050        public JMSInputStream(MessageConsumer consumer) {
051            this.consumer = consumer;
052        }
053    
054        /**
055         * Read the next byte from this stream.
056         * 
057         * @return the next byte
058         * @throws IOException
059         */
060        public int read() throws IOException {
061            if (closed)
062                throw new EOFException("JMSInputStream is closed");
063            if (current == clen) {
064                fillBuffer(1);
065            }
066            int c = (arrays[current].get(offset) & 0xff);
067            offset++;
068            if (offset == arrays[current].getLength()) {
069                offset = 0;
070                releaseBuffer(current);
071                current++;
072            }
073            return c;
074        }
075    
076        /**
077         * Read data from this input stream into the given byte array starting at offset 0 for b.length bytes. Returns the
078         * actual number of bytes read;
079         * 
080         * @param b
081         * @return the number of bytes read
082         * @throws IOException
083         */
084        public int read(byte b[]) throws IOException {
085            return read(b, 0, b.length);
086        }
087    
088        /**
089         * Read data from this input stream into the given byte array starting at offset 'off' for 'len' bytes. Returns the
090         * actual number of bytes read.
091         * 
092         * @param b buffer to read data into
093         * @param off offset into b
094         * @param len the maximum length
095         * @return the number of bytes actually read
096         * @throws IOException
097         */
098        public int read(byte b[], int off, int len) throws IOException {
099            if (closed)
100                throw new EOFException("JMSInputStream is closed");
101            int n = off;
102            int total = 0;
103            int last = Math.min(off + len, b.length);
104            if (current == clen) {
105                fillBuffer(len);
106            }
107            while ((current < clen) && (n < last)) {
108                int num_left = arrays[current].getLength() - offset;
109                int tocopy = Math.min(num_left, last - n);
110                System.arraycopy(arrays[current].getBuf(), offset, b, n, tocopy);
111                total += tocopy;
112                n += tocopy;
113                offset += tocopy;
114                if (offset == arrays[current].getLength()) {
115                    offset = 0;
116                    releaseBuffer(current);
117                    current++;
118                }
119            }
120            return total;
121        }
122    
123        /**
124         * Skip n bytes in this stream; returns the number of bytes actually skipped (which may be less than the number
125         * requested).
126         * 
127         * @param length the number of bytes to skip
128         * @return the number of bytes actually skipped
129         * @throws IOException
130         */
131        public long skip(long length) throws IOException {
132            if (closed)
133                throw new EOFException("JMSInputStream is closed!");
134            int requested = Math.min((int) length, Integer.MAX_VALUE);
135            int totalskipped = 0;
136            while ((current < clen) && (arrays[current] != null) && (requested > 0)) {
137                if (current == clen) {
138                    break;
139                }
140                int num_left = arrays[current].getLength() - offset;
141                if (num_left < requested) {
142                    requested -= num_left;
143                    totalskipped += num_left;
144                    releaseBuffer(current);
145                    current++;
146                    offset = 0;
147                }
148                else {
149                    totalskipped += requested;
150                    offset += requested;
151                    requested = 0;
152                }
153            }
154            return totalskipped;
155        }
156    
157        /**
158         * Return the number of bytes available for reading.
159         * 
160         * @return the number of bytes available
161         * @throws IOException
162         */
163        public int available() throws IOException {
164            if (closed)
165                throw new EOFException("JMSInputStream is closed!");
166            fillBuffer(0);
167            if (current == clen)
168                return 0;
169            int num_left = arrays[current].getLength() - offset;
170            for (int i = current + 1;i < clen;i++) {
171                if (arrays[i] == null)
172                    break;
173                num_left += arrays[i].getLength();
174            }
175            return num_left;
176        }
177    
178        /**
179         * close the stream and the MessageConsumer
180         */
181        public void close() {
182            try {
183                consumer.close();
184            }
185            catch (JMSException jmsEx) {
186            }
187        }
188    
189        /**
190         * @return true
191         */
192        public boolean markSupported() {
193            return true;
194        }
195    
196        /**
197         * Returns the stream to the position of the previous mark().
198         * 
199         * @throws IOException
200         */
201        public void reset() throws IOException {
202            if (markArray == -1)
203                throw new IOException("PooledArrayInputStream not marked!");
204            current = markArray;
205            offset = markOffset;
206            markArray = -1;
207        }
208    
209        /**
210         * Set the stream's mark to the current position.
211         * 
212         * @param readlimit
213         */
214        public void mark(int readlimit) {
215            markArray = current;
216            markOffset = offset;
217        }
218    
219        /**
220         * release up to the current buffer to GC
221         * 
222         * @param index
223         */
224        private void releaseBuffer(int index) {
225            if (markArray < 0 || index < markArray) {
226                for (int i = 0;i <= index;i++) {
227                    arrays[index] = null;
228                }
229            }
230        }
231    
232        /**
233         * fill the buffer
234         * 
235         * @param requiredLength
236         * @throws IOException
237         */
238        private void fillBuffer(int requiredLength) throws IOException {
239            int len = 0;
240            try {
241                do {
242                    if (!closed) {
243                        ActiveMQMessage msg = null;
244                        if (len == 0 && requiredLength > 0) {
245                            msg = (ActiveMQMessage) consumer.receive(2000);
246                        }
247                        else {
248                            msg = (ActiveMQMessage) consumer.receiveNoWait();
249                        }
250                        if (msg != null) {
251                            ByteArray ba = msg.getBodyAsBytes();
252                            if (ba != null) {
253                                len += ba.getLength();
254                                process(ba);
255                            }
256                        }
257                        else if (closed) {
258                            break;
259                        }
260                    }
261                }
262                while (len < requiredLength && !closed);
263            }
264            catch (JMSException jmsEx) {
265                throw new IOException(jmsEx.getMessage());
266            }
267        }
268    
269        /**
270         * Add an array to this PooledArrayInputStream.
271         * 
272         * @param ba
273         */
274        private void process(ByteArray ba) {
275            if (current == clen && (clen + 1) == arrays.length) {
276                offset = 0;
277                current = 0;
278                clen = 0;
279                if (arrays.length > ARRAY_SIZE && markArray == -1) {
280                    arrays = new ByteArray[ARRAY_SIZE];
281                }
282            }
283            arrays[clen] = ba;
284            clen++;
285            if (clen == arrays.length) {
286                ByteArray[] old = arrays;
287                arrays = new ByteArray[old.length + ARRAY_SIZE];
288                System.arraycopy(old, 0, arrays, 0, old.length);
289            }
290        }
291    }