Package rx.internal.util.unsafe
Class MpmcArrayQueue<E>
java.lang.Object
java.util.AbstractCollection<E>
java.util.AbstractQueue<E>
rx.internal.util.unsafe.ConcurrentCircularArrayQueueL0Pad<E>
rx.internal.util.unsafe.ConcurrentCircularArrayQueue<E>
rx.internal.util.unsafe.ConcurrentSequencedCircularArrayQueue<E>
rx.internal.util.unsafe.MpmcArrayQueueL1Pad<E>
rx.internal.util.unsafe.MpmcArrayQueueProducerField<E>
rx.internal.util.unsafe.MpmcArrayQueueL2Pad<E>
rx.internal.util.unsafe.MpmcArrayQueueConsumerField<E>
rx.internal.util.unsafe.MpmcArrayQueue<E>
- Type Parameters:
E- type of the element stored in theQueue
- All Implemented Interfaces:
Iterable<E>,Collection<E>,Queue<E>,MessagePassingQueue<E>
A Multi-Producer-Multi-Consumer queue based on a
This implementation follows patterns documented on the package level for False Sharing protection.
The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each field of the struct. There is a further alternative in the experimental project which uses iteration phase markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as well as this implementation.
Tradeoffs to keep in mind:
ConcurrentCircularArrayQueue. This implies that
any and all threads may call the offer/poll/peek methods and correctness is maintained. This implementation follows patterns documented on the package level for False Sharing protection.
The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each field of the struct. There is a further alternative in the experimental project which uses iteration phase markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as well as this implementation.
Tradeoffs to keep in mind:
- Padding for false sharing: counter fields and queue fields are all padded as well as either side of both arrays. We are trading memory to avoid false sharing(active and passive).
- 2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the elements array. This is doubling/tripling the memory allocated for the buffer.
- Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or equal to the requested capacity.
-
Field Summary
FieldsModifier and TypeFieldDescription(package private) long(package private) long(package private) long(package private) long(package private) long(package private) long(package private) long(package private) long(package private) long(package private) long(package private) long(package private) long(package private) long(package private) long(package private) longFields inherited from class rx.internal.util.unsafe.MpmcArrayQueueL2Pad
p20, p21, p22, p23, p24, p25, p26Fields inherited from class rx.internal.util.unsafe.MpmcArrayQueueL1Pad
p10, p11, p12, p13, p14, p15, p16Fields inherited from class rx.internal.util.unsafe.ConcurrentSequencedCircularArrayQueue
sequenceBufferFields inherited from class rx.internal.util.unsafe.ConcurrentCircularArrayQueue
buffer, BUFFER_PAD, mask, SPARSE_SHIFT -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionbooleanisEmpty()This method's accuracy is subject to concurrent modifications happening as the observation is carried out.booleanCalled from a producer thread subject to the restrictions appropriate to the implementation and according to theQueue.offer(Object)interface.peek()Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.peek()interface.poll()Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.poll()interface.intsize()This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a best effort rather than absolute value.Methods inherited from class rx.internal.util.unsafe.MpmcArrayQueueConsumerField
casConsumerIndex, lvConsumerIndexMethods inherited from class rx.internal.util.unsafe.MpmcArrayQueueProducerField
casProducerIndex, lvProducerIndexMethods inherited from class rx.internal.util.unsafe.ConcurrentSequencedCircularArrayQueue
calcSequenceOffset, lvSequence, soSequenceMethods inherited from class rx.internal.util.unsafe.ConcurrentCircularArrayQueue
calcElementOffset, calcElementOffset, clear, iterator, lpElement, lpElement, lvElement, lvElement, soElement, soElement, spElement, spElementMethods inherited from class java.util.AbstractQueue
add, addAll, element, removeMethods inherited from class java.util.AbstractCollection
contains, containsAll, remove, removeAll, retainAll, toArray, toArray, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface java.util.Collection
contains, containsAll, equals, hashCode, parallelStream, remove, removeAll, removeIf, retainAll, spliterator, stream, toArray, toArray, toArray
-
Field Details
-
p40
long p40 -
p41
long p41 -
p42
long p42 -
p43
long p43 -
p44
long p44 -
p45
long p45 -
p46
long p46 -
p30
long p30 -
p31
long p31 -
p32
long p32 -
p33
long p33 -
p34
long p34 -
p35
long p35 -
p36
long p36 -
p37
long p37
-
-
Constructor Details
-
MpmcArrayQueue
public MpmcArrayQueue(int capacity)
-
-
Method Details
-
offer
Description copied from interface:MessagePassingQueueCalled from a producer thread subject to the restrictions appropriate to the implementation and according to theQueue.offer(Object)interface.- Parameters:
e-- Returns:
- true if element was inserted into the queue, false iff full
-
poll
Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.poll()interface.Because return null indicates queue is empty we cannot simply rely on next element visibility for poll and must test producer index when next element is not visible.
- Returns:
- a message from the queue if one is available, null iff empty
-
peek
Description copied from interface:MessagePassingQueueCalled from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.peek()interface.- Returns:
- a message from the queue if one is available, null iff empty
-
size
public int size()Description copied from interface:MessagePassingQueueThis method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).- Specified by:
sizein interfaceCollection<E>- Specified by:
sizein interfaceMessagePassingQueue<E>- Specified by:
sizein classAbstractCollection<E>- Returns:
- number of messages in the queue, between 0 and queue capacity or
Integer.MAX_VALUEif not bounded
-
isEmpty
public boolean isEmpty()Description copied from interface:MessagePassingQueueThis method's accuracy is subject to concurrent modifications happening as the observation is carried out.- Specified by:
isEmptyin interfaceCollection<E>- Specified by:
isEmptyin interfaceMessagePassingQueue<E>- Overrides:
isEmptyin classAbstractCollection<E>- Returns:
- true if empty, false otherwise
-