001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.protocols.internal; 028 029 030 031 import java.io.InputStream; 032 import java.io.IOException; 033 import java.nio.ByteBuffer; 034 import java.util.concurrent.ArrayBlockingQueue; 035 036 import org.opends.server.protocols.ldap.LDAPMessage; 037 038 039 040 /** 041 * This class provides an implementation of a 042 * {@code java.io.InputStream} that can be used to facilitate internal 043 * communication with the Directory Server. On the backend, this 044 * input stream will be populated by ASN.1 elements encoded from LDAP 045 * messages created from internal operation responses. 046 */ 047 @org.opends.server.types.PublicAPI( 048 stability=org.opends.server.types.StabilityLevel.UNCOMMITTED, 049 mayInstantiate=false, 050 mayExtend=false, 051 mayInvoke=true) 052 public final class InternalLDAPInputStream 053 extends InputStream 054 { 055 // The queue of LDAP messages providing the data to be made 056 // available to the client. 057 private ArrayBlockingQueue<LDAPMessage> messageQueue; 058 059 // Indicates whether this stream has been closed. 060 private boolean closed; 061 062 // The byte buffer with partial data to be written to the client. 063 private ByteBuffer partialMessageBuffer; 064 065 // The internal LDAP socket serviced by this input stream. 066 private InternalLDAPSocket socket; 067 068 069 070 /** 071 * Creates a new internal LDAP input stream that will service the 072 * provided internal LDAP socket. 073 * 074 * @param socket The internal LDAP socket serviced by this 075 * internal LDAP input stream. 076 */ 077 public InternalLDAPInputStream(InternalLDAPSocket socket) 078 { 079 this.socket = socket; 080 081 messageQueue = new ArrayBlockingQueue<LDAPMessage>(10); 082 partialMessageBuffer = null; 083 closed = false; 084 } 085 086 087 088 /** 089 * Adds the provided LDAP message to the set of messages to be 090 * returned to the client. Note that this may block if there is 091 * already a significant backlog of messages to be returned. 092 * 093 * @param message The message to add to the set of messages to be 094 * returned to the client. 095 */ 096 @org.opends.server.types.PublicAPI( 097 stability=org.opends.server.types.StabilityLevel.PRIVATE, 098 mayInstantiate=false, 099 mayExtend=false, 100 mayInvoke=false) 101 void addLDAPMessage(LDAPMessage message) 102 { 103 // If the stream is closed, then simply drop the message. 104 if (closed) 105 { 106 return; 107 } 108 109 try 110 { 111 messageQueue.put(message); 112 return; 113 } 114 catch (Exception e) 115 { 116 // This shouldn't happen, but if it does then try three more 117 // times before giving up and dropping the message. 118 for (int i=0; i < 3; i++) 119 { 120 try 121 { 122 messageQueue.put(message); 123 break; 124 } catch (Exception e2) {} 125 } 126 127 return; 128 } 129 } 130 131 132 133 /** 134 * Retrieves the number of bytes that can be read (or skipped over) 135 * from this input stream without blocking. 136 * 137 * @return The number of bytes that can be read (or skipped over) 138 * from this input stream wihtout blocking. 139 */ 140 @Override() 141 public synchronized int available() 142 { 143 if (partialMessageBuffer == null) 144 { 145 LDAPMessage message = messageQueue.poll(); 146 if ((message == null) || (message instanceof NullLDAPMessage)) 147 { 148 if (message instanceof NullLDAPMessage) 149 { 150 closed = true; 151 } 152 153 return 0; 154 } 155 else 156 { 157 partialMessageBuffer = 158 ByteBuffer.wrap(message.encode().encode()); 159 return partialMessageBuffer.remaining(); 160 } 161 } 162 else 163 { 164 return partialMessageBuffer.remaining(); 165 } 166 } 167 168 169 170 /** 171 * Closes this input stream. This will add a special marker 172 * element to the message queue indicating that the end of the 173 * stream has been reached. If the queue is full, thenit will be 174 * cleared before adding the marker element. 175 */ 176 @Override() 177 public void close() 178 { 179 socket.close(); 180 } 181 182 183 184 /** 185 * Closes this input stream through an internal mechanism that will 186 * not cause an infinite recursion loop by trying to also close the 187 * input stream. 188 */ 189 @org.opends.server.types.PublicAPI( 190 stability=org.opends.server.types.StabilityLevel.PRIVATE, 191 mayInstantiate=false, 192 mayExtend=false, 193 mayInvoke=false) 194 void closeInternal() 195 { 196 if (closed) 197 { 198 return; 199 } 200 201 closed = true; 202 NullLDAPMessage nullMessage = new NullLDAPMessage(); 203 204 while (! messageQueue.offer(nullMessage)) 205 { 206 messageQueue.clear(); 207 } 208 } 209 210 211 212 /** 213 * Marks the current position in the input stream. This will not 214 * have any effect, as this input stream inplementation does not 215 * support marking. 216 * 217 * @param readLimit The maximum limit of bytes that can be read 218 * before the mark position becomes invalid. 219 */ 220 @Override() 221 public void mark(int readLimit) 222 { 223 // No implementation is required. 224 } 225 226 227 228 /** 229 * Indicates whether this input stream inplementation supports the 230 * use of the {@code mark} and {@code reset} methods. This 231 * implementation does not support that functionality. 232 * 233 * @return {@code false} because this implementation does not 234 * support the use of the {@code mark} and {@code reset} 235 * methods. 236 */ 237 @Override() 238 public boolean markSupported() 239 { 240 return false; 241 } 242 243 244 245 /** 246 * Reads the next byte of data from the input stream, blocking if 247 * necessary until there is data available. 248 * 249 * @return The next byte of data read from the input stream, or -1 250 * if the end of the input stream has been reached. 251 * 252 * @throws IOException If a problem occurs while trying to read 253 * data from the stream. 254 */ 255 @Override() 256 public synchronized int read() 257 throws IOException 258 { 259 if (partialMessageBuffer != null) 260 { 261 if (partialMessageBuffer.remaining() > 0) 262 { 263 int i = (0xFF & partialMessageBuffer.get()); 264 if (partialMessageBuffer.remaining() == 0) 265 { 266 partialMessageBuffer = null; 267 } 268 269 return i; 270 } 271 else 272 { 273 partialMessageBuffer = null; 274 } 275 } 276 277 if (closed) 278 { 279 return -1; 280 } 281 282 try 283 { 284 LDAPMessage message = messageQueue.take(); 285 if (message instanceof NullLDAPMessage) 286 { 287 messageQueue.clear(); 288 closed = true; 289 return -1; 290 } 291 292 partialMessageBuffer = 293 ByteBuffer.wrap(message.encode().encode()); 294 return (0xFF & partialMessageBuffer.get()); 295 } 296 catch (Exception e) 297 { 298 throw new IOException(e.getMessage()); 299 } 300 } 301 302 303 304 /** 305 * Reads some number of bytes from the input stream, blocking if 306 * necessary until there is data available, and adds them to the 307 * provided array starting at position 0. 308 * 309 * @param b The array to which the data is to be written. 310 * 311 * @return The number of bytes actually written into the 312 * provided array, or -1 if the end of the stream has been 313 * reached. 314 * 315 * @throws IOException If a problem occurs while trying to read 316 * data from the stream. 317 */ 318 @Override() 319 public int read(byte[] b) 320 throws IOException 321 { 322 return read(b, 0, b.length); 323 } 324 325 326 327 /** 328 * Reads some number of bytes from the input stream, blocking if 329 * necessary until there is data available, and adds them to the 330 * provided array starting at the specified position. 331 * 332 * @param b The array to which the data is to be written. 333 * @param off The offset in the array at which to start writing 334 * data. 335 * @param len The maximum number of bytes that may be added to the 336 * array. 337 * 338 * @return The number of bytes actually written into the 339 * provided array, or -1 if the end of the stream has been 340 * reached. 341 * 342 * @throws IOException If a problem occurs while trying to read 343 * data from the stream. 344 */ 345 @Override() 346 public synchronized int read(byte[] b, int off, int len) 347 throws IOException 348 { 349 if (partialMessageBuffer != null) 350 { 351 int remaining = partialMessageBuffer.remaining(); 352 if (remaining > 0) 353 { 354 if (remaining <= len) 355 { 356 // We can fit all the remaining data in the provided array, 357 // so that's all we'll try to put in it. 358 partialMessageBuffer.get(b, off, remaining); 359 partialMessageBuffer = null; 360 return remaining; 361 } 362 else 363 { 364 // The array is too small to hold the rest of the data, so 365 // only take as much as we can. 366 partialMessageBuffer.get(b, off, len); 367 return len; 368 } 369 } 370 else 371 { 372 partialMessageBuffer = null; 373 } 374 } 375 376 if (closed) 377 { 378 return -1; 379 } 380 381 try 382 { 383 LDAPMessage message = messageQueue.take(); 384 if (message instanceof NullLDAPMessage) 385 { 386 messageQueue.clear(); 387 closed = true; 388 return -1; 389 } 390 391 byte[] encodedMessage = message.encode().encode(); 392 if (encodedMessage.length <= len) 393 { 394 // We can fit the entire message in the array. 395 System.arraycopy(encodedMessage, 0, b, off, 396 encodedMessage.length); 397 return encodedMessage.length; 398 } 399 else 400 { 401 // We can only fit part of the message in the array, 402 // so we need to save the rest for later. 403 System.arraycopy(encodedMessage, 0, b, off, len); 404 partialMessageBuffer = ByteBuffer.wrap(encodedMessage); 405 partialMessageBuffer.position(len); 406 return len; 407 } 408 } 409 catch (Exception e) 410 { 411 throw new IOException(e.getMessage()); 412 } 413 } 414 415 416 417 /** 418 * Repositions this stream to the position at the time that the 419 * {@code mark} method was called on this stream. This will not 420 * have any effect, as this input stream inplementation does not 421 * support marking. 422 */ 423 @Override() 424 public void reset() 425 { 426 // No implementation is required. 427 } 428 429 430 431 /** 432 * Skips over and discards up to the specified number of bytes of 433 * data from this input stream. This implementation will always 434 * skip the requested number of bytes unless the end of the stream 435 * is reached. 436 * 437 * @param n The maximum number of bytes to skip. 438 * 439 * @return The number of bytes actually skipped. 440 * 441 * @throws IOException If a problem occurs while trying to read 442 * data from the input stream. 443 */ 444 @Override() 445 public synchronized long skip(long n) 446 throws IOException 447 { 448 byte[] b; 449 if (n > 8192) 450 { 451 b = new byte[8192]; 452 } 453 else 454 { 455 b = new byte[(int) n]; 456 } 457 458 long totalBytesRead = 0L; 459 while (totalBytesRead < n) 460 { 461 int maxLen = (int) Math.min((n - totalBytesRead), b.length); 462 463 int bytesRead = read(b, 0, maxLen); 464 if (bytesRead < 0) 465 { 466 if (totalBytesRead > 0) 467 { 468 return totalBytesRead; 469 } 470 else 471 { 472 return bytesRead; 473 } 474 } 475 else 476 { 477 totalBytesRead += bytesRead; 478 } 479 } 480 481 return totalBytesRead; 482 } 483 484 485 486 /** 487 * Retrieves a string representation of this internal LDAP socket. 488 * 489 * @return A string representation of this internal LDAP socket. 490 */ 491 @Override() 492 public String toString() 493 { 494 return "InternalLDAPInputStream"; 495 } 496 } 497