Class OnSubscribePublishMulticast<T>

  • Type Parameters:
    T - the input and output type
    All Implemented Interfaces:
    java.io.Serializable, Action, Action1<Subscriber<? super T>>, Function, Observable.OnSubscribe<T>, Observer<T>, Subscription

    public final class OnSubscribePublishMulticast<T>
    extends java.util.concurrent.atomic.AtomicInteger
    implements Observable.OnSubscribe<T>, Observer<T>, Subscription
    Multicasts notifications coming through its input Subscriber view to its client Subscribers via lockstep backpressure mode.

    The difference between this class and OperatorPublish is that this class doesn't consume the upstream if there are no child subscribers but waits for them to show up. Plus if the upstream source terminates, late subscribers will be immediately terminated with the same terminal event unlike OperatorPublish which just waits for the next connection.

    The class extends AtomicInteger which is the work-in-progress gate for the drain-loop serializing subscriptions and child request changes.

    See Also:
    Serialized Form
    • Nested Class Summary

      Nested Classes 
      Modifier and Type Class Description
      (package private) static class  OnSubscribePublishMulticast.ParentSubscriber<T>
      The subscriber that must be used for subscribing to the upstream source.
      (package private) static class  OnSubscribePublishMulticast.PublishProducer<T>
      A Producer and Subscription that wraps a child Subscriber and manages its backpressure requests along with its unsubscription from the parent class.
    • Field Summary

      Fields 
      Modifier and Type Field Description
      (package private) boolean delayError
      Delays the error delivery to happen only after all values have been consumed.
      (package private) boolean done
      Indicates the upstream has completed.
      (package private) static OnSubscribePublishMulticast.PublishProducer<?>[] EMPTY
      Represents an empty array of subscriber wrapper, helps avoid allocating an empty array all the time.
      (package private) java.lang.Throwable error
      Holds onto the upstream's exception if done is true and this field is non-null.
      (package private) OnSubscribePublishMulticast.ParentSubscriber<T> parent
      The subscriber that can be 'connected' to the upstream source.
      (package private) int prefetch
      The number of items to prefetch from the upstreams source.
      (package private) Producer producer
      Holds the upstream producer if any, set through the parent subscriber.
      (package private) java.util.Queue<T> queue
      The prefetch queue holding onto a fixed amount of items until all all child subscribers have requested something.
      private static long serialVersionUID  
      (package private) OnSubscribePublishMulticast.PublishProducer<T>[] subscribers
      A copy-on-write array of currently subscribed child subscribers' wrapper structure.
      (package private) static OnSubscribePublishMulticast.PublishProducer<?>[] TERMINATED
      Represents a final state for this class that prevents new subscribers from subscribing to it.
    • Constructor Summary

      Constructors 
      Constructor Description
      OnSubscribePublishMulticast​(int prefetch, boolean delayError)
      Constructor, initializes the fields
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      (package private) boolean add​(OnSubscribePublishMulticast.PublishProducer<T> inner)
      Atomically adds the given wrapper of a child Subscriber to the subscribers array.
      void call​(Subscriber<? super T> t)  
      (package private) boolean checkTerminated​(boolean d, boolean empty)
      Given the current source state, terminates all child subscribers.
      (package private) void drain()
      The serialization loop that determines the minimum request of all subscribers and tries to emit as many items from the queue if they are available.
      boolean isUnsubscribed()
      Indicates whether this Subscription is currently unsubscribed.
      void onCompleted()
      Notifies the Observer that the Observable has finished sending push-based notifications.
      void onError​(java.lang.Throwable e)
      Notifies the Observer that the Observable has experienced an error condition.
      void onNext​(T t)
      Provides the Observer with a new item to observe.
      (package private) void remove​(OnSubscribePublishMulticast.PublishProducer<T> inner)
      Atomically removes the given wrapper, if present, from the subscribers array.
      (package private) void setProducer​(Producer p)
      Sets the main producer and issues the prefetch amount.
      Subscriber<T> subscriber()
      Returns the input subscriber of this class that must be subscribed to the upstream source.
      (package private) OnSubscribePublishMulticast.PublishProducer<T>[] terminate()
      Atomically swaps in the terminated state.
      void unsubscribe()
      Stops the receipt of notifications on the Subscriber that was registered when this Subscription was received.
      • Methods inherited from class java.util.concurrent.atomic.AtomicInteger

        accumulateAndGet, addAndGet, compareAndExchange, compareAndExchangeAcquire, compareAndExchangeRelease, compareAndSet, decrementAndGet, doubleValue, floatValue, get, getAcquire, getAndAccumulate, getAndAdd, getAndDecrement, getAndIncrement, getAndSet, getAndUpdate, getOpaque, getPlain, incrementAndGet, intValue, lazySet, longValue, set, setOpaque, setPlain, setRelease, toString, updateAndGet, weakCompareAndSet, weakCompareAndSetAcquire, weakCompareAndSetPlain, weakCompareAndSetRelease, weakCompareAndSetVolatile
      • Methods inherited from class java.lang.Number

        byteValue, shortValue
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
    • Field Detail

      • queue

        final java.util.Queue<T> queue
        The prefetch queue holding onto a fixed amount of items until all all child subscribers have requested something.
      • prefetch

        final int prefetch
        The number of items to prefetch from the upstreams source.
      • delayError

        final boolean delayError
        Delays the error delivery to happen only after all values have been consumed.
      • done

        volatile boolean done
        Indicates the upstream has completed.
      • error

        java.lang.Throwable error
        Holds onto the upstream's exception if done is true and this field is non-null.

        This field must be read after done or if subscribers == TERMINATED to establish a proper happens-before.

      • producer

        volatile Producer producer
        Holds the upstream producer if any, set through the parent subscriber.
    • Constructor Detail

      • OnSubscribePublishMulticast

        public OnSubscribePublishMulticast​(int prefetch,
                                           boolean delayError)
        Constructor, initializes the fields
        Parameters:
        prefetch - the prefetch amount, > 0 required
        delayError - delay the error delivery after the normal items?
        Throws:
        java.lang.IllegalArgumentException - if prefetch <= 0
    • Method Detail

      • setProducer

        void setProducer​(Producer p)
        Sets the main producer and issues the prefetch amount.
        Parameters:
        p - the producer to set
      • drain

        void drain()
        The serialization loop that determines the minimum request of all subscribers and tries to emit as many items from the queue if they are available.

        The execution of the drain-loop is guaranteed to be thread-safe.

      • checkTerminated

        boolean checkTerminated​(boolean d,
                                boolean empty)
        Given the current source state, terminates all child subscribers.
        Parameters:
        d - the source-done indicator
        empty - the queue-emptiness indicator
        Returns:
        true if the class reached its terminal state
      • add

        boolean add​(OnSubscribePublishMulticast.PublishProducer<T> inner)
        Atomically adds the given wrapper of a child Subscriber to the subscribers array.
        Parameters:
        inner - the wrapper
        Returns:
        true if successful, false if the terminal state has been reached in the meantime
      • subscriber

        public Subscriber<T> subscriber()
        Returns the input subscriber of this class that must be subscribed to the upstream source.
        Returns:
        the subscriber instance
      • unsubscribe

        public void unsubscribe()
        Description copied from interface: Subscription
        Stops the receipt of notifications on the Subscriber that was registered when this Subscription was received.

        This allows unregistering an Subscriber before it has finished receiving all events (i.e. before onCompleted is called).

        Specified by:
        unsubscribe in interface Subscription
      • isUnsubscribed

        public boolean isUnsubscribed()
        Description copied from interface: Subscription
        Indicates whether this Subscription is currently unsubscribed.
        Specified by:
        isUnsubscribed in interface Subscription
        Returns:
        true if this Subscription is currently unsubscribed, false otherwise