Package rx.internal.util
Class IndexedRingBuffer<E>
- java.lang.Object
-
- rx.internal.util.IndexedRingBuffer<E>
-
- Type Parameters:
E
-
- All Implemented Interfaces:
Subscription
public final class IndexedRingBuffer<E> extends java.lang.Object implements Subscription
Add/Remove without object allocation (after initial construction).This is meant for hundreds or single-digit thousands of elements that need to be rapidly added and randomly or sequentially removed while avoiding object allocation.
On Intel Core i7, 2.3Mhz, Mac Java 8:
- adds per second single-threaded => ~32,598,500 for 100 - adds per second single-threaded => ~23,200,000 for 10,000 - adds + removes per second single-threaded => 15,562,100 for 100 - adds + removes per second single-threaded => 8,760,000 for 10,000
Benchmark (size) Mode Samples Score Score error Units r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 263571.721 9856.994 ops/s r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 1763.417 211.998 ops/s r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 139850.115 17143.705 ops/s r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 809.982 72.931 ops/s
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static class
IndexedRingBuffer.ElementSection<E>
(package private) static class
IndexedRingBuffer.IndexSection
-
Field Summary
Fields Modifier and Type Field Description (package private) static int
defaultSize
private IndexedRingBuffer.ElementSection<E>
elements
(package private) java.util.concurrent.atomic.AtomicInteger
index
private static ObjectPool<IndexedRingBuffer<?>>
POOL
private IndexedRingBuffer.IndexSection
removed
(package private) java.util.concurrent.atomic.AtomicInteger
removedIndex
(package private) static int
SIZE
-
Constructor Summary
Constructors Constructor Description IndexedRingBuffer()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description int
add(E e)
Add an element and return the index where it was added to allow removal.int
forEach(Func1<? super E,java.lang.Boolean> action)
int
forEach(Func1<? super E,java.lang.Boolean> action, int startIndex)
Loop through each element in the buffer and call a specific function.private int
forEach(Func1<? super E,java.lang.Boolean> action, int startIndex, int endIndex)
private IndexedRingBuffer.ElementSection<E>
getElementSection(int index)
private int
getIndexForAdd()
private int
getIndexFromPreviouslyRemoved()
Returns -1 if nothing, 0 or greater if the index should be usedprivate IndexedRingBuffer.IndexSection
getIndexSection(int index)
static <T> IndexedRingBuffer<T>
getInstance()
boolean
isUnsubscribed()
Indicates whether thisSubscription
is currently unsubscribed.private void
pushRemovedIndex(int elementIndex)
void
releaseToPool()
This resets the arrays, nulls out references and returns it to the pool.E
remove(int index)
void
unsubscribe()
Stops the receipt of notifications on theSubscriber
that was registered when this Subscription was received.
-
-
-
Field Detail
-
elements
private final IndexedRingBuffer.ElementSection<E> elements
-
removed
private final IndexedRingBuffer.IndexSection removed
-
index
final java.util.concurrent.atomic.AtomicInteger index
-
removedIndex
final java.util.concurrent.atomic.AtomicInteger removedIndex
-
POOL
private static final ObjectPool<IndexedRingBuffer<?>> POOL
-
defaultSize
static int defaultSize
-
SIZE
static final int SIZE
-
-
Method Detail
-
getInstance
public static <T> IndexedRingBuffer<T> getInstance()
-
releaseToPool
public void releaseToPool()
This resets the arrays, nulls out references and returns it to the pool. This extra CPU cost is far smaller than the object allocation cost of not pooling.
-
unsubscribe
public void unsubscribe()
Description copied from interface:Subscription
Stops the receipt of notifications on theSubscriber
that was registered when this Subscription was received.This allows unregistering an
Subscriber
before it has finished receiving all events (i.e. before onCompleted is called).- Specified by:
unsubscribe
in interfaceSubscription
-
add
public int add(E e)
Add an element and return the index where it was added to allow removal.- Parameters:
e
- the element to add- Returns:
- the index where the element was added
-
remove
public E remove(int index)
-
getIndexSection
private IndexedRingBuffer.IndexSection getIndexSection(int index)
-
getElementSection
private IndexedRingBuffer.ElementSection<E> getElementSection(int index)
-
getIndexForAdd
private int getIndexForAdd()
-
getIndexFromPreviouslyRemoved
private int getIndexFromPreviouslyRemoved()
Returns -1 if nothing, 0 or greater if the index should be used- Returns:
-
pushRemovedIndex
private void pushRemovedIndex(int elementIndex)
-
isUnsubscribed
public boolean isUnsubscribed()
Description copied from interface:Subscription
Indicates whether thisSubscription
is currently unsubscribed.- Specified by:
isUnsubscribed
in interfaceSubscription
- Returns:
true
if thisSubscription
is currently unsubscribed,false
otherwise
-
forEach
public int forEach(Func1<? super E,java.lang.Boolean> action, int startIndex)
Loop through each element in the buffer and call a specific function.- Parameters:
action
- that processes each item and returns true if it wants to continue to the nextstartIndex
- at which index the loop should start- Returns:
- int of next index to process, or last index seen if it exited early
-
-