Class ReplaySubject<T>
- Type Parameters:
T- the type of items observed and emitted by the Subject
- All Implemented Interfaces:
Observer<T>
Observer that subscribes.
Example usage:
ReplaySubject<Object> subject = ReplaySubject.create();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onCompleted();
// both of the following will get the onNext/onCompleted calls from above
subject.subscribe(observer1);
subject.subscribe(observer2);
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static interfaceThe base interface for buffering signals to be replayed to individual Subscribers.(package private) static final classA producer and subscription implementation that tracks the current replay position of a particular subscriber.(package private) static final class(package private) static final class(package private) static final classHolds onto the array of Subscriber-wrapping ReplayProducers and the buffer that holds values to be replayed; it manages subscription and signal dispatching.(package private) static final classAn unbounded ReplayBuffer implementation that uses linked-arrays to avoid copy-on-grow situation with ArrayList.Nested classes/interfaces inherited from class rx.Observable
Observable.OnSubscribe<T>, Observable.Operator<R,T>, Observable.Transformer<T, R> -
Field Summary
FieldsModifier and TypeFieldDescriptionprivate static final Object[]An empty array to trigger getValues() to return a new array.(package private) final ReplaySubject.ReplayState<T> The state storing the history and the references. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> ReplaySubject<T> create()Creates an unbounded replay subject.static <T> ReplaySubject<T> create(int capacity) Creates an unbounded replay subject with the specified initial buffer capacity.(package private) static <T> ReplaySubject<T> Creates an unbounded replay subject with the bounded-implementation for testing purposes.(package private) static <T> ReplaySubject<T> Creates an unbounded replay subject with the time-bounded-implementation for testing purposes.static <T> ReplaySubject<T> createWithSize(int size) Creates a size-bounded replay subject.static <T> ReplaySubject<T> createWithTime(long time, TimeUnit unit, Scheduler scheduler) Creates a time-bounded replay subject.static <T> ReplaySubject<T> createWithTimeAndSize(long time, TimeUnit unit, int size, Scheduler scheduler) Creates a time- and size-bounded replay subject.Returns the Throwable that terminated the Subject.getValue()Object[]Returns a snapshot of the currently buffered non-terminal events.T[]Returns a snapshot of the currently buffered non-terminal events into the providedaarray or creates a new array if it has not enough capacity.booleanbooleanCheck if the Subject has terminated normally.booleanbooleanCheck if the Subject has terminated with an exception.booleanhasValue()voidNotifies the Observer that theObservablehas finished sending push-based notifications.voidNotifies the Observer that theObservablehas experienced an error condition.voidProvides the Observer with a new item to observe.intsize()Returns the current number of items (non-terminal events) available for replay.(package private) intMethods inherited from class rx.subjects.Subject
toSerializedMethods inherited from class rx.Observable
all, amb, amb, amb, amb, amb, amb, amb, amb, amb, ambWith, asObservable, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, cache, cache, cacheWithInitialCapacity, cast, collect, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concat, concat, concat, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEager, concatEager, concatEager, concatEager, concatEager, concatEager, concatEager, concatEager, concatMap, concatMapDelayError, concatMapEager, concatMapEager, concatMapEager, concatMapIterable, concatWith, contains, count, countLong, create, create, create, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doOnCompleted, doOnEach, doOnEach, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, doOnUnsubscribe, elementAt, elementAtOrDefault, empty, error, exists, extend, filter, finallyDo, first, first, firstOrDefault, firstOrDefault, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, forEach, forEach, forEach, from, from, from, from, from, fromAsync, fromCallable, groupBy, groupBy, groupBy, groupJoin, ignoreElements, interval, interval, interval, interval, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, last, lastOrDefault, lastOrDefault, lift, limit, map, materialize, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, nest, never, observeOn, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onErrorResumeNext, onErrorResumeNext, onErrorReturn, onExceptionResumeNext, onTerminateDetach, publish, publish, range, range, rebatchRequests, reduce, reduce, repeat, repeat, repeat, repeat, repeatWhen, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retryWhen, retryWhen, sample, sample, sample, scan, scan, sequenceEqual, sequenceEqual, serialize, share, single, single, singleOrDefault, singleOrDefault, skip, skip, skip, skipLast, skipLast, skipLast, skipUntil, skipWhile, startWith, startWith, startWith, startWith, startWith, startWith, startWith, startWith, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, switchIfEmpty, switchMap, switchMapDelayError, switchOnNext, switchOnNextDelayError, take, take, take, takeFirst, takeLast, takeLast, takeLast, takeLast, takeLast, takeLastBuffer, takeLastBuffer, takeLastBuffer, takeLastBuffer, takeLastBuffer, takeUntil, takeUntil, takeWhile, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timer, timer, timestamp, timestamp, toBlocking, toCompletable, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSingle, toSortedList, toSortedList, toSortedList, toSortedList, unsafeSubscribe, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith
-
Field Details
-
state
The state storing the history and the references. -
EMPTY_ARRAY
An empty array to trigger getValues() to return a new array.
-
-
Constructor Details
-
ReplaySubject
ReplaySubject(ReplaySubject.ReplayState<T> state)
-
-
Method Details
-
create
Creates an unbounded replay subject.The internal buffer is backed by an
ArrayListand starts with an initial capacity of 16. Once the number of items reaches this capacity, it will grow as necessary (usually by 50%). However, as the number of items grows, this causes frequent array reallocation and copying, and may hurt performance and latency. This can be avoided with thecreate(int)overload which takes an initial capacity parameter and can be tuned to reduce the array reallocation frequency as needed.- Type Parameters:
T- the type of items observed and emitted by the Subject- Returns:
- the created subject
-
create
Creates an unbounded replay subject with the specified initial buffer capacity.Use this method to avoid excessive array reallocation while the internal buffer grows to accommodate new items. For example, if you know that the buffer will hold 32k items, you can ask the
ReplaySubjectto preallocate its internal array with a capacity to hold that many items. Once the items start to arrive, the internal array won't need to grow, creating less garbage and no overhead due to frequent array-copying.- Type Parameters:
T- the type of items observed and emitted by the Subject- Parameters:
capacity- the initial buffer capacity- Returns:
- the created subject
-
createUnbounded
Creates an unbounded replay subject with the bounded-implementation for testing purposes.This variant behaves like the regular unbounded
ReplaySubjectcreated viacreate()but uses the structures of the bounded-implementation. This is by no means intended for the replacement of the original, array-backed and unboundedReplaySubjectdue to the additional overhead of the linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior of the bounded implementations without the interference of the eviction policies.- Type Parameters:
T- the type of items observed and emitted by the Subject- Returns:
- the created subject
-
createUnboundedTime
Creates an unbounded replay subject with the time-bounded-implementation for testing purposes.This variant behaves like the regular unbounded
ReplaySubjectcreated viacreate()but uses the structures of the bounded-implementation. This is by no means intended for the replacement of the original, array-backed and unboundedReplaySubjectdue to the additional overhead of the linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior of the bounded implementations without the interference of the eviction policies.- Type Parameters:
T- the type of items observed and emitted by the Subject- Returns:
- the created subject
-
createWithSize
Creates a size-bounded replay subject.In this setting, the
ReplaySubjectholds at mostsizeitems in its internal buffer and discards the oldest item.When observers subscribe to a terminated
ReplaySubject, they are guaranteed to see at mostsizeonNextevents followed by a termination event.If an observer subscribes while the
ReplaySubjectis active, it will observe all items in the buffer at that point in time and each item observed afterwards, even if the buffer evicts items due to the size constraint in the mean time. In other words, once an Observer subscribes, it will receive items without gaps in the sequence.- Type Parameters:
T- the type of items observed and emitted by the Subject- Parameters:
size- the maximum number of buffered items- Returns:
- the created subject
-
createWithTime
Creates a time-bounded replay subject.In this setting, the
ReplaySubjectinternally tags each observed item with a timestamp value supplied by theSchedulerand keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination event, leaving the buffer empty.Once the subject is terminated, observers subscribing to it will receive items that remained in the buffer after the terminal event, regardless of their age.
If an observer subscribes while the
ReplaySubjectis active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an observer subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.Note that terminal notifications (
onErrorandonCompleted) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then anonCompletednotification arrives at T=10. If an observer subscribes at T=11, it will find an emptyReplaySubjectwith just anonCompletednotification.- Type Parameters:
T- the type of items observed and emitted by the Subject- Parameters:
time- the maximum age of the contained itemsunit- the time unit oftimescheduler- theSchedulerthat provides the current time- Returns:
- the created subject
-
createWithTimeAndSize
public static <T> ReplaySubject<T> createWithTimeAndSize(long time, TimeUnit unit, int size, Scheduler scheduler) Creates a time- and size-bounded replay subject.In this setting, the
ReplaySubjectinternally tags each received item with a timestamp value supplied by theSchedulerand holds at mostsizeitems in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches itssizelimit.When observers subscribe to a terminated
ReplaySubject, they observe the items that remained in the buffer after the terminal notification, regardless of their age, but at mostsizeitems.If an observer subscribes while the
ReplaySubjectis active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an observer subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.Note that terminal notifications (
onErrorandonCompleted) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then anonCompletednotification arrives at T=10. If an observer subscribes at T=11, it will find an emptyReplaySubjectwith just anonCompletednotification.- Type Parameters:
T- the type of items observed and emitted by the Subject- Parameters:
time- the maximum age of the contained itemsunit- the time unit oftimesize- the maximum number of buffered itemsscheduler- theSchedulerthat provides the current time- Returns:
- the created subject
-
onNext
Description copied from interface:ObserverProvides the Observer with a new item to observe.The
Observablemay call this method 0 or more times.The
Observablewill not call this method again after it calls eitherObserver.onCompleted()orObserver.onError(java.lang.Throwable).- Parameters:
t- the item emitted by the Observable
-
onError
Description copied from interface:ObserverNotifies the Observer that theObservablehas experienced an error condition.If the
Observablecalls this method, it will not thereafter callObserver.onNext(T)orObserver.onCompleted().- Parameters:
e- the exception encountered by the Observable
-
onCompleted
public void onCompleted()Description copied from interface:ObserverNotifies the Observer that theObservablehas finished sending push-based notifications.The
Observablewill not call this method if it callsObserver.onError(java.lang.Throwable). -
subscriberCount
int subscriberCount()- Returns:
- Returns the number of subscribers.
-
hasObservers
public boolean hasObservers()Description copied from class:Subject- Specified by:
hasObserversin classSubject<T,T> - Returns:
- true if there is at least one Observer subscribed to this Subject, false otherwise
-
hasThrowable
Check if the Subject has terminated with an exception.- Returns:
- true if the subject has received a throwable through
onError.
-
hasCompleted
Check if the Subject has terminated normally.- Returns:
- true if the subject completed normally via
onCompleted
-
getThrowable
Returns the Throwable that terminated the Subject.- Returns:
- the Throwable that terminated the Subject or
nullif the subject hasn't terminated yet or it terminated normally.
-
size
Returns the current number of items (non-terminal events) available for replay.- Returns:
- the number of items available
-
hasAnyValue
- Returns:
- true if the Subject holds at least one non-terminal event available for replay
-
hasValue
-
getValues
Returns a snapshot of the currently buffered non-terminal events into the providedaarray or creates a new array if it has not enough capacity.- Parameters:
a- the array to fill in- Returns:
- the array
aif it had enough capacity or a new array containing the available values
-
getValues
Returns a snapshot of the currently buffered non-terminal events.The operation is threadsafe.
- Returns:
- a snapshot of the currently buffered non-terminal events.
- Since:
- (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
-
getValue
-