001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.protocols.internal;
028    
029    
030    
031    import java.io.InputStream;
032    import java.io.IOException;
033    import java.nio.ByteBuffer;
034    import java.util.concurrent.ArrayBlockingQueue;
035    
036    import org.opends.server.protocols.ldap.LDAPMessage;
037    
038    
039    
040    /**
041     * This class provides an implementation of a
042     * {@code java.io.InputStream} that can be used to facilitate internal
043     * communication with the Directory Server.  On the backend, this
044     * input stream will be populated by ASN.1 elements encoded from LDAP
045     * messages created from internal operation responses.
046     */
047    @org.opends.server.types.PublicAPI(
048         stability=org.opends.server.types.StabilityLevel.UNCOMMITTED,
049         mayInstantiate=false,
050         mayExtend=false,
051         mayInvoke=true)
052    public final class InternalLDAPInputStream
053           extends InputStream
054    {
055      // The queue of LDAP messages providing the data to be made
056      // available to the client.
057      private ArrayBlockingQueue<LDAPMessage> messageQueue;
058    
059      // Indicates whether this stream has been closed.
060      private boolean closed;
061    
062      // The byte buffer with partial data to be written to the client.
063      private ByteBuffer partialMessageBuffer;
064    
065      // The internal LDAP socket serviced by this input stream.
066      private InternalLDAPSocket socket;
067    
068    
069    
070      /**
071       * Creates a new internal LDAP input stream that will service the
072       * provided internal LDAP socket.
073       *
074       * @param  socket  The internal LDAP socket serviced by this
075       *                 internal LDAP input stream.
076       */
077      public InternalLDAPInputStream(InternalLDAPSocket socket)
078      {
079        this.socket = socket;
080    
081        messageQueue = new ArrayBlockingQueue<LDAPMessage>(10);
082        partialMessageBuffer = null;
083        closed = false;
084      }
085    
086    
087    
088      /**
089       * Adds the provided LDAP message to the set of messages to be
090       * returned to the client.  Note that this may block if there is
091       * already a significant backlog of messages to be returned.
092       *
093       * @param  message  The message to add to the set of messages to be
094       *                  returned to the client.
095       */
096      @org.opends.server.types.PublicAPI(
097           stability=org.opends.server.types.StabilityLevel.PRIVATE,
098           mayInstantiate=false,
099           mayExtend=false,
100           mayInvoke=false)
101      void addLDAPMessage(LDAPMessage message)
102      {
103        // If the stream is closed, then simply drop the message.
104        if (closed)
105        {
106          return;
107        }
108    
109        try
110        {
111          messageQueue.put(message);
112          return;
113        }
114        catch (Exception e)
115        {
116          // This shouldn't happen, but if it does then try three more
117          // times before giving up and dropping the message.
118          for (int i=0; i < 3; i++)
119          {
120            try
121            {
122              messageQueue.put(message);
123              break;
124            } catch (Exception e2) {}
125          }
126    
127          return;
128        }
129      }
130    
131    
132    
133      /**
134       * Retrieves the number of bytes that can be read (or skipped over)
135       * from this input stream without blocking.
136       *
137       * @return  The number of bytes that can be read (or skipped over)
138       *          from this input stream wihtout blocking.
139       */
140      @Override()
141      public synchronized int available()
142      {
143        if (partialMessageBuffer == null)
144        {
145          LDAPMessage message = messageQueue.poll();
146          if ((message == null) || (message instanceof NullLDAPMessage))
147          {
148            if (message instanceof NullLDAPMessage)
149            {
150              closed = true;
151            }
152    
153            return 0;
154          }
155          else
156          {
157            partialMessageBuffer =
158                 ByteBuffer.wrap(message.encode().encode());
159            return partialMessageBuffer.remaining();
160          }
161        }
162        else
163        {
164          return partialMessageBuffer.remaining();
165        }
166      }
167    
168    
169    
170      /**
171       * Closes this input stream.  This will add a special marker
172       * element to the message queue indicating that the end of the
173       * stream has been reached.  If the queue is full, thenit will be
174       * cleared before adding the marker element.
175       */
176      @Override()
177      public void close()
178      {
179        socket.close();
180      }
181    
182    
183    
184      /**
185       * Closes this input stream through an internal mechanism that will
186       * not cause an infinite recursion loop by trying to also close the
187       * input stream.
188       */
189      @org.opends.server.types.PublicAPI(
190           stability=org.opends.server.types.StabilityLevel.PRIVATE,
191           mayInstantiate=false,
192           mayExtend=false,
193           mayInvoke=false)
194      void closeInternal()
195      {
196        if (closed)
197        {
198          return;
199        }
200    
201        closed = true;
202        NullLDAPMessage nullMessage = new NullLDAPMessage();
203    
204        while (! messageQueue.offer(nullMessage))
205        {
206          messageQueue.clear();
207        }
208      }
209    
210    
211    
212      /**
213       * Marks the current position in the input stream.  This will not
214       * have any effect, as this input stream inplementation does not
215       * support marking.
216       *
217       * @param  readLimit  The maximum limit of bytes that can be read
218       *                    before the mark position becomes invalid.
219       */
220      @Override()
221      public void mark(int readLimit)
222      {
223        // No implementation is required.
224      }
225    
226    
227    
228      /**
229       * Indicates whether this input stream inplementation supports the
230       * use of the {@code mark} and {@code reset} methods.  This
231       * implementation does not support that functionality.
232       *
233       * @return  {@code false} because this implementation does not
234       *          support the use of the {@code mark} and {@code reset}
235       *          methods.
236       */
237      @Override()
238      public boolean markSupported()
239      {
240        return false;
241      }
242    
243    
244    
245      /**
246       * Reads the next byte of data from the input stream, blocking if
247       * necessary until there is data available.
248       *
249       * @return  The next byte of data read from the input stream, or -1
250       *          if the end of the input stream has been reached.
251       *
252       * @throws  IOException  If a problem occurs while trying to read
253       *                       data from the stream.
254       */
255      @Override()
256      public synchronized int read()
257             throws IOException
258      {
259        if (partialMessageBuffer != null)
260        {
261          if (partialMessageBuffer.remaining() > 0)
262          {
263            int i = (0xFF & partialMessageBuffer.get());
264            if (partialMessageBuffer.remaining() == 0)
265            {
266              partialMessageBuffer = null;
267            }
268    
269            return i;
270          }
271          else
272          {
273            partialMessageBuffer = null;
274          }
275        }
276    
277        if (closed)
278        {
279          return -1;
280        }
281    
282        try
283        {
284          LDAPMessage message = messageQueue.take();
285          if (message instanceof NullLDAPMessage)
286          {
287            messageQueue.clear();
288            closed = true;
289            return -1;
290          }
291    
292          partialMessageBuffer =
293               ByteBuffer.wrap(message.encode().encode());
294          return (0xFF & partialMessageBuffer.get());
295        }
296        catch (Exception e)
297        {
298          throw new IOException(e.getMessage());
299        }
300      }
301    
302    
303    
304      /**
305       * Reads some number of bytes from the input stream, blocking if
306       * necessary until there is data available, and adds them to the
307       * provided array starting at position 0.
308       *
309       * @param  b  The array to which the data is to be written.
310       *
311       * @return  The number of bytes actually written into the
312       *          provided array, or -1 if the end of the stream has been
313       *          reached.
314       *
315       * @throws  IOException  If a problem occurs while trying to read
316       *                       data from the stream.
317       */
318      @Override()
319      public int read(byte[] b)
320             throws IOException
321      {
322        return read(b, 0, b.length);
323      }
324    
325    
326    
327      /**
328       * Reads some number of bytes from the input stream, blocking if
329       * necessary until there is data available, and adds them to the
330       * provided array starting at the specified position.
331       *
332       * @param  b    The array to which the data is to be written.
333       * @param  off  The offset in the array at which to start writing
334       *              data.
335       * @param  len  The maximum number of bytes that may be added to the
336       *              array.
337       *
338       * @return  The number of bytes actually written into the
339       *          provided array, or -1 if the end of the stream has been
340       *          reached.
341       *
342       * @throws  IOException  If a problem occurs while trying to read
343       *                       data from the stream.
344       */
345      @Override()
346      public synchronized int read(byte[] b, int off, int len)
347             throws IOException
348      {
349        if (partialMessageBuffer != null)
350        {
351          int remaining = partialMessageBuffer.remaining();
352          if (remaining > 0)
353          {
354            if (remaining <= len)
355            {
356              // We can fit all the remaining data in the provided array,
357              // so that's all we'll try to put in it.
358              partialMessageBuffer.get(b, off, remaining);
359              partialMessageBuffer = null;
360              return remaining;
361            }
362            else
363            {
364              // The array is too small to hold the rest of the data, so
365              // only take as much as we can.
366              partialMessageBuffer.get(b, off, len);
367              return len;
368            }
369          }
370          else
371          {
372            partialMessageBuffer = null;
373          }
374        }
375    
376        if (closed)
377        {
378          return -1;
379        }
380    
381        try
382        {
383          LDAPMessage message = messageQueue.take();
384          if (message instanceof NullLDAPMessage)
385          {
386            messageQueue.clear();
387            closed = true;
388            return -1;
389          }
390    
391          byte[] encodedMessage = message.encode().encode();
392          if (encodedMessage.length <= len)
393          {
394            // We can fit the entire message in the array.
395            System.arraycopy(encodedMessage, 0, b, off,
396                             encodedMessage.length);
397            return encodedMessage.length;
398          }
399          else
400          {
401            // We can only fit part of the message in the array,
402            // so we need to save the rest for later.
403            System.arraycopy(encodedMessage, 0, b, off, len);
404            partialMessageBuffer = ByteBuffer.wrap(encodedMessage);
405            partialMessageBuffer.position(len);
406            return len;
407          }
408        }
409        catch (Exception e)
410        {
411          throw new IOException(e.getMessage());
412        }
413      }
414    
415    
416    
417      /**
418       * Repositions this stream to the position at the time that the
419       * {@code mark} method was called on this stream.  This will not
420       * have any effect, as this input stream inplementation does not
421       * support marking.
422       */
423      @Override()
424      public void reset()
425      {
426        // No implementation is required.
427      }
428    
429    
430    
431      /**
432       * Skips over and discards up to the specified number of bytes of
433       * data from this input stream.  This implementation will always
434       * skip the requested number of bytes unless the end of the stream
435       * is reached.
436       *
437       * @param  n  The maximum number of bytes to skip.
438       *
439       * @return  The number of bytes actually skipped.
440       *
441       * @throws  IOException  If a problem occurs while trying to read
442       *                       data from the input stream.
443       */
444      @Override()
445      public synchronized long skip(long n)
446             throws IOException
447      {
448        byte[] b;
449        if (n > 8192)
450        {
451          b = new byte[8192];
452        }
453        else
454        {
455          b = new byte[(int) n];
456        }
457    
458        long totalBytesRead = 0L;
459        while (totalBytesRead < n)
460        {
461          int maxLen = (int) Math.min((n - totalBytesRead), b.length);
462    
463          int bytesRead = read(b, 0, maxLen);
464          if (bytesRead < 0)
465          {
466            if (totalBytesRead > 0)
467            {
468              return totalBytesRead;
469            }
470            else
471            {
472              return bytesRead;
473            }
474          }
475          else
476          {
477            totalBytesRead += bytesRead;
478          }
479        }
480    
481        return totalBytesRead;
482      }
483    
484    
485    
486      /**
487       * Retrieves a string representation of this internal LDAP socket.
488       *
489       * @return  A string representation of this internal LDAP socket.
490       */
491      @Override()
492      public String toString()
493      {
494        return "InternalLDAPInputStream";
495      }
496    }
497