001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.streams; 020 import java.io.EOFException; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import javax.jms.JMSException; 024 import javax.jms.MessageConsumer; 025 026 import org.activemq.io.util.ByteArray; 027 import org.activemq.message.ActiveMQMessage; 028 029 /** 030 * An inputStream that reads data from a MessageConsumer 031 * 032 * @version $Revision: 1.1.1.1 $ 033 */ 034 public class JMSInputStream extends InputStream { 035 private static final int ARRAY_SIZE = 10; 036 private boolean closed; 037 protected ByteArray[] arrays = new ByteArray[ARRAY_SIZE]; 038 private int offset; 039 private int current = 0; 040 protected int clen = 0; 041 private int markArray = -1; 042 private int markOffset = -1; 043 private MessageConsumer consumer; 044 045 /** 046 * Construct an input stream to read from a JMS Consumer 047 * 048 * @param consumer 049 */ 050 public JMSInputStream(MessageConsumer consumer) { 051 this.consumer = consumer; 052 } 053 054 /** 055 * Read the next byte from this stream. 056 * 057 * @return the next byte 058 * @throws IOException 059 */ 060 public int read() throws IOException { 061 if (closed) 062 throw new EOFException("JMSInputStream is closed"); 063 if (current == clen) { 064 fillBuffer(1); 065 } 066 int c = (arrays[current].get(offset) & 0xff); 067 offset++; 068 if (offset == arrays[current].getLength()) { 069 offset = 0; 070 releaseBuffer(current); 071 current++; 072 } 073 return c; 074 } 075 076 /** 077 * Read data from this input stream into the given byte array starting at offset 0 for b.length bytes. Returns the 078 * actual number of bytes read; 079 * 080 * @param b 081 * @return the number of bytes read 082 * @throws IOException 083 */ 084 public int read(byte b[]) throws IOException { 085 return read(b, 0, b.length); 086 } 087 088 /** 089 * Read data from this input stream into the given byte array starting at offset 'off' for 'len' bytes. Returns the 090 * actual number of bytes read. 091 * 092 * @param b buffer to read data into 093 * @param off offset into b 094 * @param len the maximum length 095 * @return the number of bytes actually read 096 * @throws IOException 097 */ 098 public int read(byte b[], int off, int len) throws IOException { 099 if (closed) 100 throw new EOFException("JMSInputStream is closed"); 101 int n = off; 102 int total = 0; 103 int last = Math.min(off + len, b.length); 104 if (current == clen) { 105 fillBuffer(len); 106 } 107 while ((current < clen) && (n < last)) { 108 int num_left = arrays[current].getLength() - offset; 109 int tocopy = Math.min(num_left, last - n); 110 System.arraycopy(arrays[current].getBuf(), offset, b, n, tocopy); 111 total += tocopy; 112 n += tocopy; 113 offset += tocopy; 114 if (offset == arrays[current].getLength()) { 115 offset = 0; 116 releaseBuffer(current); 117 current++; 118 } 119 } 120 return total; 121 } 122 123 /** 124 * Skip n bytes in this stream; returns the number of bytes actually skipped (which may be less than the number 125 * requested). 126 * 127 * @param length the number of bytes to skip 128 * @return the number of bytes actually skipped 129 * @throws IOException 130 */ 131 public long skip(long length) throws IOException { 132 if (closed) 133 throw new EOFException("JMSInputStream is closed!"); 134 int requested = Math.min((int) length, Integer.MAX_VALUE); 135 int totalskipped = 0; 136 while ((current < clen) && (arrays[current] != null) && (requested > 0)) { 137 if (current == clen) { 138 break; 139 } 140 int num_left = arrays[current].getLength() - offset; 141 if (num_left < requested) { 142 requested -= num_left; 143 totalskipped += num_left; 144 releaseBuffer(current); 145 current++; 146 offset = 0; 147 } 148 else { 149 totalskipped += requested; 150 offset += requested; 151 requested = 0; 152 } 153 } 154 return totalskipped; 155 } 156 157 /** 158 * Return the number of bytes available for reading. 159 * 160 * @return the number of bytes available 161 * @throws IOException 162 */ 163 public int available() throws IOException { 164 if (closed) 165 throw new EOFException("JMSInputStream is closed!"); 166 fillBuffer(0); 167 if (current == clen) 168 return 0; 169 int num_left = arrays[current].getLength() - offset; 170 for (int i = current + 1;i < clen;i++) { 171 if (arrays[i] == null) 172 break; 173 num_left += arrays[i].getLength(); 174 } 175 return num_left; 176 } 177 178 /** 179 * close the stream and the MessageConsumer 180 */ 181 public void close() { 182 try { 183 consumer.close(); 184 } 185 catch (JMSException jmsEx) { 186 } 187 } 188 189 /** 190 * @return true 191 */ 192 public boolean markSupported() { 193 return true; 194 } 195 196 /** 197 * Returns the stream to the position of the previous mark(). 198 * 199 * @throws IOException 200 */ 201 public void reset() throws IOException { 202 if (markArray == -1) 203 throw new IOException("PooledArrayInputStream not marked!"); 204 current = markArray; 205 offset = markOffset; 206 markArray = -1; 207 } 208 209 /** 210 * Set the stream's mark to the current position. 211 * 212 * @param readlimit 213 */ 214 public void mark(int readlimit) { 215 markArray = current; 216 markOffset = offset; 217 } 218 219 /** 220 * release up to the current buffer to GC 221 * 222 * @param index 223 */ 224 private void releaseBuffer(int index) { 225 if (markArray < 0 || index < markArray) { 226 for (int i = 0;i <= index;i++) { 227 arrays[index] = null; 228 } 229 } 230 } 231 232 /** 233 * fill the buffer 234 * 235 * @param requiredLength 236 * @throws IOException 237 */ 238 private void fillBuffer(int requiredLength) throws IOException { 239 int len = 0; 240 try { 241 do { 242 if (!closed) { 243 ActiveMQMessage msg = null; 244 if (len == 0 && requiredLength > 0) { 245 msg = (ActiveMQMessage) consumer.receive(2000); 246 } 247 else { 248 msg = (ActiveMQMessage) consumer.receiveNoWait(); 249 } 250 if (msg != null) { 251 ByteArray ba = msg.getBodyAsBytes(); 252 if (ba != null) { 253 len += ba.getLength(); 254 process(ba); 255 } 256 } 257 else if (closed) { 258 break; 259 } 260 } 261 } 262 while (len < requiredLength && !closed); 263 } 264 catch (JMSException jmsEx) { 265 throw new IOException(jmsEx.getMessage()); 266 } 267 } 268 269 /** 270 * Add an array to this PooledArrayInputStream. 271 * 272 * @param ba 273 */ 274 private void process(ByteArray ba) { 275 if (current == clen && (clen + 1) == arrays.length) { 276 offset = 0; 277 current = 0; 278 clen = 0; 279 if (arrays.length > ARRAY_SIZE && markArray == -1) { 280 arrays = new ByteArray[ARRAY_SIZE]; 281 } 282 } 283 arrays[clen] = ba; 284 clen++; 285 if (clen == arrays.length) { 286 ByteArray[] old = arrays; 287 arrays = new ByteArray[old.length + ARRAY_SIZE]; 288 System.arraycopy(old, 0, arrays, 0, old.length); 289 } 290 } 291 }