Class OnSubscribePublishMulticast<T>
- java.lang.Object
-
- java.lang.Number
-
- java.util.concurrent.atomic.AtomicInteger
-
- rx.internal.operators.OnSubscribePublishMulticast<T>
-
- Type Parameters:
T
- the input and output type
- All Implemented Interfaces:
java.io.Serializable
,Action
,Action1<Subscriber<? super T>>
,Function
,Observable.OnSubscribe<T>
,Observer<T>
,Subscription
public final class OnSubscribePublishMulticast<T> extends java.util.concurrent.atomic.AtomicInteger implements Observable.OnSubscribe<T>, Observer<T>, Subscription
Multicasts notifications coming through its input Subscriber view to its client Subscribers via lockstep backpressure mode.The difference between this class and OperatorPublish is that this class doesn't consume the upstream if there are no child subscribers but waits for them to show up. Plus if the upstream source terminates, late subscribers will be immediately terminated with the same terminal event unlike OperatorPublish which just waits for the next connection.
The class extends AtomicInteger which is the work-in-progress gate for the drain-loop serializing subscriptions and child request changes.
- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static class
OnSubscribePublishMulticast.ParentSubscriber<T>
The subscriber that must be used for subscribing to the upstream source.(package private) static class
OnSubscribePublishMulticast.PublishProducer<T>
A Producer and Subscription that wraps a child Subscriber and manages its backpressure requests along with its unsubscription from the parent class.
-
Field Summary
Fields Modifier and Type Field Description (package private) boolean
delayError
Delays the error delivery to happen only after all values have been consumed.(package private) boolean
done
Indicates the upstream has completed.(package private) static OnSubscribePublishMulticast.PublishProducer<?>[]
EMPTY
Represents an empty array of subscriber wrapper, helps avoid allocating an empty array all the time.(package private) java.lang.Throwable
error
Holds onto the upstream's exception if done is true and this field is non-null.(package private) OnSubscribePublishMulticast.ParentSubscriber<T>
parent
The subscriber that can be 'connected' to the upstream source.(package private) int
prefetch
The number of items to prefetch from the upstreams source.(package private) Producer
producer
Holds the upstream producer if any, set through the parent subscriber.(package private) java.util.Queue<T>
queue
The prefetch queue holding onto a fixed amount of items until all all child subscribers have requested something.private static long
serialVersionUID
(package private) OnSubscribePublishMulticast.PublishProducer<T>[]
subscribers
A copy-on-write array of currently subscribed child subscribers' wrapper structure.(package private) static OnSubscribePublishMulticast.PublishProducer<?>[]
TERMINATED
Represents a final state for this class that prevents new subscribers from subscribing to it.
-
Constructor Summary
Constructors Constructor Description OnSubscribePublishMulticast(int prefetch, boolean delayError)
Constructor, initializes the fields
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) boolean
add(OnSubscribePublishMulticast.PublishProducer<T> inner)
Atomically adds the given wrapper of a child Subscriber to the subscribers array.void
call(Subscriber<? super T> t)
(package private) boolean
checkTerminated(boolean d, boolean empty)
Given the current source state, terminates all child subscribers.(package private) void
drain()
The serialization loop that determines the minimum request of all subscribers and tries to emit as many items from the queue if they are available.boolean
isUnsubscribed()
Indicates whether thisSubscription
is currently unsubscribed.void
onCompleted()
Notifies the Observer that theObservable
has finished sending push-based notifications.void
onError(java.lang.Throwable e)
Notifies the Observer that theObservable
has experienced an error condition.void
onNext(T t)
Provides the Observer with a new item to observe.(package private) void
remove(OnSubscribePublishMulticast.PublishProducer<T> inner)
Atomically removes the given wrapper, if present, from the subscribers array.(package private) void
setProducer(Producer p)
Sets the main producer and issues the prefetch amount.Subscriber<T>
subscriber()
Returns the input subscriber of this class that must be subscribed to the upstream source.(package private) OnSubscribePublishMulticast.PublishProducer<T>[]
terminate()
Atomically swaps in the terminated state.void
unsubscribe()
Stops the receipt of notifications on theSubscriber
that was registered when this Subscription was received.-
Methods inherited from class java.util.concurrent.atomic.AtomicInteger
accumulateAndGet, addAndGet, compareAndExchange, compareAndExchangeAcquire, compareAndExchangeRelease, compareAndSet, decrementAndGet, doubleValue, floatValue, get, getAcquire, getAndAccumulate, getAndAdd, getAndDecrement, getAndIncrement, getAndSet, getAndUpdate, getOpaque, getPlain, incrementAndGet, intValue, lazySet, longValue, set, setOpaque, setPlain, setRelease, toString, updateAndGet, weakCompareAndSet, weakCompareAndSetAcquire, weakCompareAndSetPlain, weakCompareAndSetRelease, weakCompareAndSetVolatile
-
-
-
-
Field Detail
-
serialVersionUID
private static final long serialVersionUID
- See Also:
- Constant Field Values
-
queue
final java.util.Queue<T> queue
The prefetch queue holding onto a fixed amount of items until all all child subscribers have requested something.
-
prefetch
final int prefetch
The number of items to prefetch from the upstreams source.
-
delayError
final boolean delayError
Delays the error delivery to happen only after all values have been consumed.
-
parent
final OnSubscribePublishMulticast.ParentSubscriber<T> parent
The subscriber that can be 'connected' to the upstream source.
-
done
volatile boolean done
Indicates the upstream has completed.
-
error
java.lang.Throwable error
Holds onto the upstream's exception if done is true and this field is non-null.This field must be read after done or if subscribers == TERMINATED to establish a proper happens-before.
-
producer
volatile Producer producer
Holds the upstream producer if any, set through the parent subscriber.
-
subscribers
volatile OnSubscribePublishMulticast.PublishProducer<T>[] subscribers
A copy-on-write array of currently subscribed child subscribers' wrapper structure.
-
EMPTY
static final OnSubscribePublishMulticast.PublishProducer<?>[] EMPTY
Represents an empty array of subscriber wrapper, helps avoid allocating an empty array all the time.
-
TERMINATED
static final OnSubscribePublishMulticast.PublishProducer<?>[] TERMINATED
Represents a final state for this class that prevents new subscribers from subscribing to it.
-
-
Constructor Detail
-
OnSubscribePublishMulticast
public OnSubscribePublishMulticast(int prefetch, boolean delayError)
Constructor, initializes the fields- Parameters:
prefetch
- the prefetch amount, > 0 requireddelayError
- delay the error delivery after the normal items?- Throws:
java.lang.IllegalArgumentException
- if prefetch <= 0
-
-
Method Detail
-
call
public void call(Subscriber<? super T> t)
-
onNext
public void onNext(T t)
Description copied from interface:Observer
Provides the Observer with a new item to observe.The
Observable
may call this method 0 or more times.The
Observable
will not call this method again after it calls eitherObserver.onCompleted()
orObserver.onError(java.lang.Throwable)
.
-
onError
public void onError(java.lang.Throwable e)
Description copied from interface:Observer
Notifies the Observer that theObservable
has experienced an error condition.If the
Observable
calls this method, it will not thereafter callObserver.onNext(T)
orObserver.onCompleted()
.
-
onCompleted
public void onCompleted()
Description copied from interface:Observer
Notifies the Observer that theObservable
has finished sending push-based notifications.The
Observable
will not call this method if it callsObserver.onError(java.lang.Throwable)
.- Specified by:
onCompleted
in interfaceObserver<T>
-
setProducer
void setProducer(Producer p)
Sets the main producer and issues the prefetch amount.- Parameters:
p
- the producer to set
-
drain
void drain()
The serialization loop that determines the minimum request of all subscribers and tries to emit as many items from the queue if they are available.The execution of the drain-loop is guaranteed to be thread-safe.
-
checkTerminated
boolean checkTerminated(boolean d, boolean empty)
Given the current source state, terminates all child subscribers.- Parameters:
d
- the source-done indicatorempty
- the queue-emptiness indicator- Returns:
- true if the class reached its terminal state
-
terminate
OnSubscribePublishMulticast.PublishProducer<T>[] terminate()
Atomically swaps in the terminated state.- Returns:
- the last set of subscribers before the state change or an empty array
-
add
boolean add(OnSubscribePublishMulticast.PublishProducer<T> inner)
Atomically adds the given wrapper of a child Subscriber to the subscribers array.- Parameters:
inner
- the wrapper- Returns:
- true if successful, false if the terminal state has been reached in the meantime
-
remove
void remove(OnSubscribePublishMulticast.PublishProducer<T> inner)
Atomically removes the given wrapper, if present, from the subscribers array.- Parameters:
inner
- the wrapper to remove
-
subscriber
public Subscriber<T> subscriber()
Returns the input subscriber of this class that must be subscribed to the upstream source.- Returns:
- the subscriber instance
-
unsubscribe
public void unsubscribe()
Description copied from interface:Subscription
Stops the receipt of notifications on theSubscriber
that was registered when this Subscription was received.This allows unregistering an
Subscriber
before it has finished receiving all events (i.e. before onCompleted is called).- Specified by:
unsubscribe
in interfaceSubscription
-
isUnsubscribed
public boolean isUnsubscribed()
Description copied from interface:Subscription
Indicates whether thisSubscription
is currently unsubscribed.- Specified by:
isUnsubscribed
in interfaceSubscription
- Returns:
true
if thisSubscription
is currently unsubscribed,false
otherwise
-
-