Package rx.observables
Class AsyncOnSubscribe<S,T>
- java.lang.Object
-
- rx.observables.AsyncOnSubscribe<S,T>
-
- Type Parameters:
S
- the type of the user-define state used ingenerateState(S)
,next(S, Long, Observer)
, andonUnsubscribe(S)
.T
- the type ofSubscribers
that will be compatible withthis
.
- All Implemented Interfaces:
Action
,Action1<Subscriber<? super T>>
,Function
,Observable.OnSubscribe<T>
- Direct Known Subclasses:
AsyncOnSubscribe.AsyncOnSubscribeImpl
@Experimental public abstract class AsyncOnSubscribe<S,T> extends java.lang.Object implements Observable.OnSubscribe<T>
A utility class to createOnSubscribe<T>
functions that respond correctly to back pressure requests from subscribers. This is an improvement overObservable.create(OnSubscribe)
which does not provide any means of managing back pressure requests out-of-the-box. This variant of an OnSubscribe function allows for the asynchronous processing of requests.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static class
AsyncOnSubscribe.AsyncOnSubscribeImpl<S,T>
An implementation of AsyncOnSubscribe that delegatesnext(Object, long, Observer)
,generateState()
, andonUnsubscribe(Object)
to provided functions/closures.(package private) static class
AsyncOnSubscribe.AsyncOuterManager<S,T>
(package private) static class
AsyncOnSubscribe.UnicastSubject<T>
-
Constructor Summary
Constructors Constructor Description AsyncOnSubscribe()
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
call(Subscriber<? super T> actualSubscriber)
static <S,T>
AsyncOnSubscribe<S,T>createSingleState(Func0<? extends S> generator, Action3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>> next)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.static <S,T>
AsyncOnSubscribe<S,T>createSingleState(Func0<? extends S> generator, Action3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>> next, Action1<? super S> onUnsubscribe)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.static <S,T>
AsyncOnSubscribe<S,T>createStateful(Func0<? extends S> generator, Func3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>,? extends S> next)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.static <S,T>
AsyncOnSubscribe<S,T>createStateful(Func0<? extends S> generator, Func3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>,? extends S> next, Action1<? super S> onUnsubscribe)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.static <T> AsyncOnSubscribe<java.lang.Void,T>
createStateless(Action2<java.lang.Long,? super Observer<Observable<? extends T>>> next)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.static <T> AsyncOnSubscribe<java.lang.Void,T>
createStateless(Action2<java.lang.Long,? super Observer<Observable<? extends T>>> next, Action0 onUnsubscribe)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.protected abstract S
generateState()
Executed once when subscribed to by a subscriber (viacall(Subscriber)
) to produce a state value.protected abstract S
next(S state, long requested, Observer<Observable<? extends T>> observer)
Called to produce data to the downstream subscribers.protected void
onUnsubscribe(S state)
Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed.
-
-
-
Method Detail
-
generateState
protected abstract S generateState()
Executed once when subscribed to by a subscriber (viacall(Subscriber)
) to produce a state value. This value is passed intonext(S state, Observer
on the first iteration. Subsequent iterations ofobserver) next
will receive the state returned by the previous invocation ofnext
.- Returns:
- the initial state value
-
next
protected abstract S next(S state, long requested, Observer<Observable<? extends T>> observer)
Called to produce data to the downstream subscribers. To emit data to a downstream subscriber callobserver.onNext(t)
. To signal an error condition callobserver.onError(throwable)
or throw an Exception. To signal the end of a data stream callobserver.onCompleted()
. Implementations of this method must follow the following rules.- Must not call
observer.onNext(t)
more than 1 time per invocation. - Must not call
observer.onNext(t)
concurrently.
state
argument of the next invocation of this method.- Parameters:
state
- the state value (fromgenerateState()
on the first invocation or the previous invocation of this method.requested
- the amount of data requested. An observable emitted to the observer should not exceed this amount.observer
- the observer of data emitted by- Returns:
- the next iteration's state value
- Must not call
-
onUnsubscribe
protected void onUnsubscribe(S state)
Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed. This method will be invoked exactly once.- Parameters:
state
- the last state value returned fromnext(S, Long, Observer)
orgenerateState()
at the time when a terminal event is emitted fromnext(Object, long, Observer)
or unsubscribing.
-
createSingleState
@Experimental public static <S,T> AsyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>> next)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.- Type Parameters:
T
- the type of the generated valuesS
- the type of the associated state with each Subscriber- Parameters:
generator
- generates the initial state value (seegenerateState()
)next
- produces data to the downstream subscriber (seenext(S, long, Observer)
)- Returns:
- an AsyncOnSubscribe that emits data in a protocol compatible with back-pressure.
-
createSingleState
@Experimental public static <S,T> AsyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>> next, Action1<? super S> onUnsubscribe)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers. This overload creates a AsyncOnSubscribe without an explicit clean up step.- Type Parameters:
T
- the type of the generated valuesS
- the type of the associated state with each Subscriber- Parameters:
generator
- generates the initial state value (seegenerateState()
)next
- produces data to the downstream subscriber (seenext(S, long, Observer)
)onUnsubscribe
- clean up behavior (seeonUnsubscribe(S)
)- Returns:
- an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateful
@Experimental public static <S,T> AsyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>,? extends S> next, Action1<? super S> onUnsubscribe)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.- Type Parameters:
T
- the type of the generated valuesS
- the type of the associated state with each Subscriber- Parameters:
generator
- generates the initial state value (seegenerateState()
)next
- produces data to the downstream subscriber (seenext(S, long, Observer)
)onUnsubscribe
- clean up behavior (seeonUnsubscribe(S)
)- Returns:
- an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateful
@Experimental public static <S,T> AsyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>,? extends S> next)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers.- Type Parameters:
T
- the type of the generated valuesS
- the type of the associated state with each Subscriber- Parameters:
generator
- generates the initial state value (seegenerateState()
)next
- produces data to the downstream subscriber (seenext(S, long, Observer)
)- Returns:
- an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateless
@Experimental public static <T> AsyncOnSubscribe<java.lang.Void,T> createStateless(Action2<java.lang.Long,? super Observer<Observable<? extends T>>> next)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers. This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state value. This should be used when thenext
function closes over it's state.- Type Parameters:
T
- the type of the generated values- Parameters:
next
- produces data to the downstream subscriber (seenext(S, long, Observer)
)- Returns:
- an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateless
@Experimental public static <T> AsyncOnSubscribe<java.lang.Void,T> createStateless(Action2<java.lang.Long,? super Observer<Observable<? extends T>>> next, Action0 onUnsubscribe)
Generates a synchronousAsyncOnSubscribe
that calls the providednext
function to generate data to downstream subscribers. This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state value. This should be used when thenext
function closes over it's state.- Type Parameters:
T
- the type of the generated values- Parameters:
next
- produces data to the downstream subscriber (seenext(S, long, Observer)
)onUnsubscribe
- clean up behavior (seeonUnsubscribe(S)
)- Returns:
- an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
call
public final void call(Subscriber<? super T> actualSubscriber)
-
-