Package rx.internal.operators
Class OperatorMerge.MergeSubscriber<T>
- Type Parameters:
T- the value type
- All Implemented Interfaces:
Observer<Observable<? extends T>>,Subscription
- Enclosing class:
OperatorMerge<T>
The subscriber that observes Observables.
-
Field Summary
FieldsModifier and TypeFieldDescription(package private) final Subscriber<? super T> (package private) final boolean(package private) boolean(package private) booleanGuarded by this.(package private) static final OperatorMerge.InnerSubscriber<?>[]An empty array to avoid creating new empty arrays in removeInner.(package private) ConcurrentLinkedQueue<Throwable> Due to the emission loop, we need to store errors somewhere if !delayErrors.(package private) final Object(package private) OperatorMerge.InnerSubscriber<?>[]Copy-on-write array, guarded by innerGuard.(package private) longWhich was the last InnerSubscriber that emitted? Accessed if emitting == true.(package private) intWhat was its index in the innerSubscribers array? Accessed if emitting == true.(package private) final int(package private) booleanGuarded by this.(package private) final NotificationLite<T> (package private) OperatorMerge.MergeProducer<T> (package private) int(package private) final int(package private) CompositeSubscriptionTracks the active subscriptions to sources.(package private) longUsed to generate unique InnerSubscriber IDs. -
Constructor Summary
ConstructorsConstructorDescriptionMergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) -
Method Summary
Modifier and TypeMethodDescription(package private) voidaddInner(OperatorMerge.InnerSubscriber<T> inner) (package private) booleanCheck if the operator reached some terminal state: child unsubscribed, an error was reported and we don't delay errors.(package private) voidemit()(package private) void(package private) voidemitLoop()The standard emission loop serializing events and requests.protected voidemitScalar(OperatorMerge.InnerSubscriber<T> subscriber, T value, long r) protected voidemitScalar(T value, long r) (package private) CompositeSubscriptionvoidNotifies the Observer that theObservablehas finished sending push-based notifications.voidNotifies the Observer that theObservablehas experienced an error condition.voidonNext(Observable<? extends T> t) Provides the Observer with a new item to observe.protected voidqueueScalar(OperatorMerge.InnerSubscriber<T> subscriber, T value) protected voidqueueScalar(T value) (package private) voidprivate voidvoidrequestMore(long n) (package private) voidtryEmit(OperatorMerge.InnerSubscriber<T> subscriber, T value) Tries to emit the value directly to the child if no concurrent emission is happening at the moment.(package private) voidTries to emit the value directly to the child if no concurrent emission is happening at the moment.Methods inherited from class rx.Subscriber
add, isUnsubscribed, onStart, request, setProducer, unsubscribe
-
Field Details
-
child
-
delayErrors
final boolean delayErrors -
maxConcurrent
final int maxConcurrent -
producer
OperatorMerge.MergeProducer<T> producer -
queue
-
subscriptions
Tracks the active subscriptions to sources. -
errors
Due to the emission loop, we need to store errors somewhere if !delayErrors. -
nl
-
done
volatile boolean done -
emitting
boolean emittingGuarded by this. -
missed
boolean missedGuarded by this. -
innerGuard
-
innerSubscribers
Copy-on-write array, guarded by innerGuard. -
uniqueId
long uniqueIdUsed to generate unique InnerSubscriber IDs. Modified from onNext only. -
lastId
long lastIdWhich was the last InnerSubscriber that emitted? Accessed if emitting == true. -
lastIndex
int lastIndexWhat was its index in the innerSubscribers array? Accessed if emitting == true. -
EMPTY
An empty array to avoid creating new empty arrays in removeInner. -
scalarEmissionLimit
final int scalarEmissionLimit -
scalarEmissionCount
int scalarEmissionCount
-
-
Constructor Details
-
MergeSubscriber
-
-
Method Details
-
getOrCreateErrorQueue
-
getOrCreateComposite
CompositeSubscription getOrCreateComposite() -
onNext
Description copied from interface:ObserverProvides the Observer with a new item to observe.The
Observablemay call this method 0 or more times.The
Observablewill not call this method again after it calls eitherObserver.onCompleted()orObserver.onError(java.lang.Throwable).- Parameters:
t- the item emitted by the Observable
-
emitEmpty
void emitEmpty() -
reportError
private void reportError() -
onError
Description copied from interface:ObserverNotifies the Observer that theObservablehas experienced an error condition.If the
Observablecalls this method, it will not thereafter callObserver.onNext(T)orObserver.onCompleted().- Parameters:
e- the exception encountered by the Observable
-
onCompleted
public void onCompleted()Description copied from interface:ObserverNotifies the Observer that theObservablehas finished sending push-based notifications.The
Observablewill not call this method if it callsObserver.onError(java.lang.Throwable). -
addInner
-
removeInner
-
tryEmit
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.Since the scalar-value queue optimization applies to both the main source and the inner subscribers, we handle things in a shared manner.
- Parameters:
subscriber-value-
-
queueScalar
-
emitScalar
-
requestMore
public void requestMore(long n) -
tryEmit
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.Since the scalar-value queue optimization applies to both the main source and the inner subscribers, we handle things in a shared manner.
- Parameters:
value-subscriber-
-
queueScalar
-
emitScalar
-
emit
void emit() -
emitLoop
void emitLoop()The standard emission loop serializing events and requests. -
checkTerminate
boolean checkTerminate()Check if the operator reached some terminal state: child unsubscribed, an error was reported and we don't delay errors.- Returns:
- true if the child unsubscribed or there are errors available and merge doesn't delay errors.
-