Package rx.observables
Class SyncOnSubscribe<S,T>
java.lang.Object
rx.observables.SyncOnSubscribe<S,T>
- Type Parameters:
S- the type of the user-define state used ingenerateState(S),next(S, Subscriber), andonUnsubscribe(S).T- the type ofSubscribersthat will be compatible withthis.
- All Implemented Interfaces:
Action,Action1<Subscriber<? super T>>,Function,Observable.OnSubscribe<T>
- Direct Known Subclasses:
SyncOnSubscribe.SyncOnSubscribeImpl
@Beta
public abstract class SyncOnSubscribe<S,T>
extends Object
implements Observable.OnSubscribe<T>
A utility class to create
OnSubscribe<T> functions that respond correctly to back
pressure requests from subscribers. This is an improvement over
Observable.create(OnSubscribe) which does not provide
any means of managing back pressure requests out-of-the-box.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final classContains the producer loop that reacts to downstream requests of work.(package private) static final classAn implementation of SyncOnSubscribe that delegates,invalid reference
SyncOnSubscribe#next(Object, Subscriber)generateState(), andonUnsubscribe(Object)to provided functions/closures. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionfinal voidcall(Subscriber<? super T> subscriber) static <S,T> SyncOnSubscribe <S, T> createSingleState(Func0<? extends S> generator, Action2<? super S, ? super Observer<? super T>> next) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <S,T> SyncOnSubscribe <S, T> createSingleState(Func0<? extends S> generator, Action2<? super S, ? super Observer<? super T>> next, Action1<? super S> onUnsubscribe) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <S,T> SyncOnSubscribe <S, T> createStateful(Func0<? extends S> generator, Func2<? super S, ? super Observer<? super T>, ? extends S> next) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <S,T> SyncOnSubscribe <S, T> createStateful(Func0<? extends S> generator, Func2<? super S, ? super Observer<? super T>, ? extends S> next, Action1<? super S> onUnsubscribe) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <T> SyncOnSubscribe<Void, T> createStateless(Action1<? super Observer<? super T>> next) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.static <T> SyncOnSubscribe<Void, T> createStateless(Action1<? super Observer<? super T>> next, Action0 onUnsubscribe) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.protected abstract SExecuted once when subscribed to by a subscriber (viacall(Subscriber)) to produce a state value.protected abstract SCalled to produce data to the downstream subscribers.protected voidonUnsubscribe(S state) Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed.
-
Constructor Details
-
SyncOnSubscribe
public SyncOnSubscribe()
-
-
Method Details
-
call
-
generateState
Executed once when subscribed to by a subscriber (viacall(Subscriber)) to produce a state value. This value is passed intonext(S state, Observeron the first iteration. Subsequent iterations ofobserver) nextwill receive the state returned by the previous invocation ofnext.- Returns:
- the initial state value
-
next
Called to produce data to the downstream subscribers. To emit data to a downstream subscriber callobserver.onNext(t). To signal an error condition callobserver.onError(throwable)or throw an Exception. To signal the end of a data stream callobserver.onCompleted(). Implementations of this method must follow the following rules.- Must not call
observer.onNext(t)more than 1 time per invocation. - Must not call
observer.onNext(t)concurrently.
stateargument of the next invocation of this method.- Parameters:
state- the state value (fromgenerateState()on the first invocation or the previous invocation of this method.observer- the observer of data emitted by- Returns:
- the next iteration's state value
- Must not call
-
onUnsubscribe
Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed. This method will be invoked exactly once.- Parameters:
state- the last state value prior fromgenerateState()ornext(S, Observer<T>)before unsubscribe.
-
createSingleState
@Beta public static <S,T> SyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action2<? super S, ? super Observer<? super T>> next) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.- Type Parameters:
S- the type of the associated state with each SubscriberT- the type of the generated values- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, Subscriber))- Returns:
- a SyncOnSubscribe that emits data in a protocol compatible with back-pressure.
-
createSingleState
@Beta public static <S,T> SyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action2<? super S, ? super Observer<? super T>> next, Action1<? super S> onUnsubscribe) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers. This overload creates a SyncOnSubscribe without an explicit clean up step.- Type Parameters:
S- the type of the associated state with each SubscriberT- the type of the generated values- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, Subscriber))onUnsubscribe- clean up behavior (seeonUnsubscribe(S))- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateful
@Beta public static <S,T> SyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func2<? super S, ? super Observer<? super T>, ? extends S> next, Action1<? super S> onUnsubscribe) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.- Type Parameters:
S- the type of the associated state with each SubscriberT- the type of the generated values- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, Subscriber))onUnsubscribe- clean up behavior (seeonUnsubscribe(S))- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateful
@Beta public static <S,T> SyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func2<? super S, ? super Observer<? super T>, ? extends S> next) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers.- Type Parameters:
S- the type of the associated state with each SubscriberT- the type of the generated values- Parameters:
generator- generates the initial state value (seegenerateState())next- produces data to the downstream subscriber (seenext(S, Subscriber))- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateless
@Beta public static <T> SyncOnSubscribe<Void,T> createStateless(Action1<? super Observer<? super T>> next) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers. This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state value. This should be used when thenextfunction closes over it's state.- Type Parameters:
T- the type of the generated values- Parameters:
next- produces data to the downstream subscriber (seenext(S, Subscriber))- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-
createStateless
@Beta public static <T> SyncOnSubscribe<Void,T> createStateless(Action1<? super Observer<? super T>> next, Action0 onUnsubscribe) Generates a synchronousSyncOnSubscribethat calls the providednextfunction to generate data to downstream subscribers. This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state value. This should be used when thenextfunction closes over it's state.- Type Parameters:
T- the type of the generated values- Parameters:
next- produces data to the downstream subscriber (seenext(S, Subscriber))onUnsubscribe- clean up behavior (seeonUnsubscribe(S))- Returns:
- a SyncOnSubscribe that emits data downstream in a protocol compatible with back-pressure.
-