Class BackpressureUtils


  • public final class BackpressureUtils
    extends java.lang.Object
    Utility functions for use with backpressure.
    • Field Summary

      Fields 
      Modifier and Type Field Description
      (package private) static long COMPLETED_MASK
      Masks the most significant bit, i.e., 0x8000_0000_0000_0000L.
      (package private) static long REQUESTED_MASK
      Masks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF.
    • Constructor Summary

      Constructors 
      Modifier Constructor Description
      private BackpressureUtils()
      Utility class, no instances.
    • Method Summary

      All Methods Static Methods Concrete Methods Deprecated Methods 
      Modifier and Type Method Description
      static long addCap​(long a, long b)
      Adds two positive longs and caps the result at Long.MAX_VALUE.
      static <T> long getAndAddRequest​(java.util.concurrent.atomic.AtomicLongFieldUpdater<T> requested, T object, long n)
      Deprecated.
      Android has issues with reflection-based atomics
      static long getAndAddRequest​(java.util.concurrent.atomic.AtomicLong requested, long n)
      Adds n (not validated) to requested and returns the value prior to addition once the addition is successful (uses CAS semantics).
      static long multiplyCap​(long a, long b)
      Multiplies two positive longs and caps the result at Long.MAX_VALUE.
      static <T,​R>
      void
      postCompleteDone​(java.util.concurrent.atomic.AtomicLong requested, java.util.Queue<T> queue, Subscriber<? super R> actual, Func1<? super T,​? extends R> exitTransform)
      Signals the completion of the main sequence and switches to post-completion replay mode and allows exit transformation on the queued values.
      static <T> void postCompleteDone​(java.util.concurrent.atomic.AtomicLong requested, java.util.Queue<T> queue, Subscriber<? super T> actual)
      Signals the completion of the main sequence and switches to post-completion replay mode.
      (package private) static <T,​R>
      void
      postCompleteDrain​(java.util.concurrent.atomic.AtomicLong requested, java.util.Queue<T> queue, Subscriber<? super R> subscriber, Func1<? super T,​? extends R> exitTransform)
      Drains the queue based on the outstanding requests in post-completed mode (only!) and allows exit transformation on the queued values.
      static <T,​R>
      boolean
      postCompleteRequest​(java.util.concurrent.atomic.AtomicLong requested, long n, java.util.Queue<T> queue, Subscriber<? super R> actual, Func1<? super T,​? extends R> exitTransform)
      Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests and allows exit transformation on the queued values.
      static <T> boolean postCompleteRequest​(java.util.concurrent.atomic.AtomicLong requested, long n, java.util.Queue<T> queue, Subscriber<? super T> actual)
      Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
      static long produced​(java.util.concurrent.atomic.AtomicLong requested, long n)
      Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
      static boolean validate​(long n)
      Validates the requested amount and returns true if it is positive.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Field Detail

      • COMPLETED_MASK

        static final long COMPLETED_MASK
        Masks the most significant bit, i.e., 0x8000_0000_0000_0000L.
        See Also:
        Constant Field Values
      • REQUESTED_MASK

        static final long REQUESTED_MASK
        Masks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF.
        See Also:
        Constant Field Values
    • Constructor Detail

      • BackpressureUtils

        private BackpressureUtils()
        Utility class, no instances.
    • Method Detail

      • getAndAddRequest

        @Deprecated
        public static <T> long getAndAddRequest​(java.util.concurrent.atomic.AtomicLongFieldUpdater<T> requested,
                                                T object,
                                                long n)
        Deprecated.
        Android has issues with reflection-based atomics
        Adds n to requested field and returns the value prior to addition once the addition is successful (uses CAS semantics). If overflows then sets requested field to Long.MAX_VALUE.
        Type Parameters:
        T - the type of the target object on which the field updater operates
        Parameters:
        requested - atomic field updater for a request count
        object - contains the field updated by the updater
        n - the number of requests to add to the requested count
        Returns:
        requested value just prior to successful addition
      • getAndAddRequest

        public static long getAndAddRequest​(java.util.concurrent.atomic.AtomicLong requested,
                                            long n)
        Adds n (not validated) to requested and returns the value prior to addition once the addition is successful (uses CAS semantics). If overflows then sets requested field to Long.MAX_VALUE.
        Parameters:
        requested - atomic long that should be updated
        n - the number of requests to add to the requested count, positive (not validated)
        Returns:
        requested value just prior to successful addition
      • multiplyCap

        public static long multiplyCap​(long a,
                                       long b)
        Multiplies two positive longs and caps the result at Long.MAX_VALUE.
        Parameters:
        a - the first value
        b - the second value
        Returns:
        the capped product of a and b
      • addCap

        public static long addCap​(long a,
                                  long b)
        Adds two positive longs and caps the result at Long.MAX_VALUE.
        Parameters:
        a - the first value
        b - the second value
        Returns:
        the capped sum of a and b
      • postCompleteDone

        public static <T> void postCompleteDone​(java.util.concurrent.atomic.AtomicLong requested,
                                                java.util.Queue<T> queue,
                                                Subscriber<? super T> actual)
        Signals the completion of the main sequence and switches to post-completion replay mode.

        Don't modify the queue after calling this method!

        Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

        The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.

        Type Parameters:
        T - the value type to emit
        Parameters:
        requested - the holder of current requested amount
        queue - the queue holding values to be emitted after completion
        actual - the subscriber to receive the values
      • postCompleteRequest

        public static <T> boolean postCompleteRequest​(java.util.concurrent.atomic.AtomicLong requested,
                                                      long n,
                                                      java.util.Queue<T> queue,
                                                      Subscriber<? super T> actual)
        Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.

        Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

        Type Parameters:
        T - the value type to emit
        Parameters:
        requested - the holder of current requested amount
        n - the value requested;
        queue - the queue holding values to be emitted after completion
        actual - the subscriber to receive the values
        Returns:
        true if in the active mode and the request amount of n can be relayed to upstream, false if in the post-completed mode and the queue is draining.
      • postCompleteDone

        public static <T,​R> void postCompleteDone​(java.util.concurrent.atomic.AtomicLong requested,
                                                        java.util.Queue<T> queue,
                                                        Subscriber<? super R> actual,
                                                        Func1<? super T,​? extends R> exitTransform)
        Signals the completion of the main sequence and switches to post-completion replay mode and allows exit transformation on the queued values.

        Don't modify the queue after calling this method!

        Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

        The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.

        Type Parameters:
        T - the value type in the queue
        R - the value type to emit
        Parameters:
        requested - the holder of current requested amount
        queue - the queue holding values to be emitted after completion
        actual - the subscriber to receive the values
        exitTransform - the transformation to apply on the dequeued value to get the value to be emitted
      • postCompleteRequest

        public static <T,​R> boolean postCompleteRequest​(java.util.concurrent.atomic.AtomicLong requested,
                                                              long n,
                                                              java.util.Queue<T> queue,
                                                              Subscriber<? super R> actual,
                                                              Func1<? super T,​? extends R> exitTransform)
        Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests and allows exit transformation on the queued values.

        Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

        Type Parameters:
        T - the value type in the queue
        R - the value type to emit
        Parameters:
        requested - the holder of current requested amount
        n - the value requested;
        queue - the queue holding values to be emitted after completion
        actual - the subscriber to receive the values
        exitTransform - the transformation to apply on the dequeued value to get the value to be emitted
        Returns:
        true if in the active mode and the request amount of n can be relayed to upstream, false if in the post-completed mode and the queue is draining.
      • postCompleteDrain

        static <T,​R> void postCompleteDrain​(java.util.concurrent.atomic.AtomicLong requested,
                                                  java.util.Queue<T> queue,
                                                  Subscriber<? super R> subscriber,
                                                  Func1<? super T,​? extends R> exitTransform)
        Drains the queue based on the outstanding requests in post-completed mode (only!) and allows exit transformation on the queued values.
        Type Parameters:
        T - the value type in the queue
        R - the value type to emit
        Parameters:
        requested - the holder of current requested amount
        queue - the queue holding values to be emitted after completion
        subscriber - the subscriber to receive the values
        exitTransform - the transformation to apply on the dequeued value to get the value to be emitted
      • produced

        public static long produced​(java.util.concurrent.atomic.AtomicLong requested,
                                    long n)
        Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
        Parameters:
        requested - the requested amount holder
        n - the value to subtract from the requested amount, has to be positive (not verified)
        Returns:
        the new requested amount
        Throws:
        java.lang.IllegalStateException - if n is greater than the current requested amount, which indicates a bug in the request accounting logic
      • validate

        public static boolean validate​(long n)
        Validates the requested amount and returns true if it is positive.
        Parameters:
        n - the requested amount
        Returns:
        true if n is positive
        Throws:
        java.lang.IllegalArgumentException - if n is negative