Package rx.observables
Class AsyncOnSubscribe<S,T>
java.lang.Object
rx.observables.AsyncOnSubscribe<S,T>
- Type Parameters:
S- the type of the user-define state used ingenerateState(S),next(S, Long, Observer), andonUnsubscribe(S).T- the type ofSubscribersthat will be compatible withthis.
- All Implemented Interfaces:
Action,Action1<Subscriber<? super T>>,Function,Observable.OnSubscribe<T>
- Direct Known Subclasses:
AsyncOnSubscribe.AsyncOnSubscribeImpl
@Experimental
public abstract class AsyncOnSubscribe<S,T>
extends Object
implements Observable.OnSubscribe<T>
A utility class to create
OnSubscribe<T> functions that respond correctly to back
pressure requests from subscribers. This is an improvement over
Observable.create(OnSubscribe) which does not provide
any means of managing back pressure requests out-of-the-box. This variant of an OnSubscribe
function allows for the asynchronous processing of requests.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final classAn implementation of AsyncOnSubscribe that delegatesnext(Object, long, Observer),generateState(), andonUnsubscribe(Object)to provided functions/closures.(package private) static final class(package private) static final class -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionfinal voidcall(Subscriber<? super T> actualSubscriber) static <S,T> AsyncOnSubscribe <S, T> createSingleState(Func0<? extends S> generator, Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <S,T> AsyncOnSubscribe <S, T> createSingleState(Func0<? extends S> generator, Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next, Action1<? super S> onUnsubscribe) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <S,T> AsyncOnSubscribe <S, T> createStateful(Func0<? extends S> generator, Func3<? super S, Long, ? super Observer<Observable<? extends T>>, ? extends S> next) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <S,T> AsyncOnSubscribe <S, T> createStateful(Func0<? extends S> generator, Func3<? super S, Long, ? super Observer<Observable<? extends T>>, ? extends S> next, Action1<? super S> onUnsubscribe) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <T> AsyncOnSubscribe<Void, T> createStateless(Action2<Long, ? super Observer<Observable<? extends T>>> next) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <T> AsyncOnSubscribe<Void, T> createStateless(Action2<Long, ? super Observer<Observable<? extends T>>> next, Action0 onUnsubscribe) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.protected abstract SExecuted once when subscribed to by a subscriber (viacall(Subscriber)) to produce a state value.protected abstract Snext(S state, long requested, Observer<Observable<? extends T>> observer) Called to produce data to the downstream subscribers.protected voidonUnsubscribe(S state) Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed.
-
Constructor Details
-
AsyncOnSubscribe
public AsyncOnSubscribe()
-
-
Method Details
-
generateState
Executed once when subscribed to by a subscriber (viacall(Subscriber)) to produce a state value. This value is passed intonext(S state, Observeron the first iteration. Subsequent iterations ofobserver) nextwill receive the state returned by the previous invocation ofnext.- Returns:
- the initial state value
-
next
Called to produce data to the downstream subscribers. To emit data to a downstream subscriber callobserver.onNext(t). To signal an error condition callobserver.onError(throwable)or throw an Exception. To signal the end of a data stream callobserver.onCompleted(). Implementations of this method must follow the following rules.- Must not call
observer.onNext(t)more than 1 time per invocation. - Must not call
observer.onNext(t)concurrently.
stateargument of the next invocation of this method.- Parameters:
state- the state value (fromgenerateState()on the first invocation or the previous invocation of this method.requested- the amount of data requested. An observable emitted to the observer should not exceed this amount.observer- the observer of data emitted by- Returns:
- the next iteration's state value
- Must not call
-
onUnsubscribe
Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed. This method will be invoked exactly once.- Parameters:
state- the last state value returned fromnext(S, Long, Observer)orgenerateState()at the time when a terminal event is emitted fromnext(Object, long, Observer)or unsubscribing.
-
createSingleState
@Experimental public static <S,T> AsyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.- Type Parameters:
S- the type of the associated state with each SubscriberT- the type of the generated values- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, long, Observer))- Returns:
- an AsyncOnSubscribe that emits data in a protocol compatible with back-pressure.
-
createSingleState
@Experimental public static <S,T> AsyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next, Action1<? super S> onUnsubscribe) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers. This overload creates a AsyncOnSubscribe without an explicit clean up step.- Type Parameters:
S- the type of the associated state with each SubscriberT- the type of the generated values- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, long, Observer))onUnsubscribe- clean up behavior (seeonUnsubscribe(S))- Returns:
- an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateful
@Experimental public static <S,T> AsyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func3<? super S, Long, ? super Observer<Observable<? extends T>>, ? extends S> next, Action1<? super S> onUnsubscribe) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.- Type Parameters:
S- the type of the associated state with each SubscriberT- the type of the generated values- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, long, Observer))onUnsubscribe- clean up behavior (seeonUnsubscribe(S))- Returns:
- an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateful
@Experimental public static <S,T> AsyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func3<? super S, Long, ? super Observer<Observable<? extends T>>, ? extends S> next) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.- Type Parameters:
S- the type of the associated state with each SubscriberT- the type of the generated values- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, long, Observer))- Returns:
- an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateless
@Experimental public static <T> AsyncOnSubscribe<Void,T> createStateless(Action2<Long, ? super Observer<Observable<? extends T>>> next) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers. This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state value. This should be used when thenextfunction closes over it's state.- Type Parameters:
T- the type of the generated values- Parameters:
next- produces data to the downstream subscriber (seenext(S, long, Observer))- Returns:
- an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateless
@Experimental public static <T> AsyncOnSubscribe<Void,T> createStateless(Action2<Long, ? super Observer<Observable<? extends T>>> next, Action0 onUnsubscribe) Generates a synchronousAsyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers. This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state value. This should be used when thenextfunction closes over it's state.- Type Parameters:
T- the type of the generated values- Parameters:
next- produces data to the downstream subscriber (seenext(S, long, Observer))onUnsubscribe- clean up behavior (seeonUnsubscribe(S))- Returns:
- an AsyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
call
-