Skip to content

Commit 58a9d34

Browse files
Restore use of SpmcArrayQueue in RxRingBuffer
- Modification of SpmcArrayQueue with fix from JCTools/JCTools#21 - Restore RxRingBuffer to use SpmcArrayQueue - this reduces object allocation significantly (see pull request for screenshots)
1 parent eedeeb9 commit 58a9d34

File tree

2 files changed

+32
-16
lines changed

2 files changed

+32
-16
lines changed

rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import rx.Subscription;
2222
import rx.exceptions.MissingBackpressureException;
2323
import rx.internal.operators.NotificationLite;
24+
import rx.internal.util.unsafe.SpmcArrayQueue;
25+
import rx.internal.util.unsafe.SpscArrayQueue;
2426
import rx.internal.util.unsafe.UnsafeAccess;
2527

2628
/**
@@ -31,28 +33,16 @@ public class RxRingBuffer implements Subscription {
3133

3234
public static RxRingBuffer getSpscInstance() {
3335
if (UnsafeAccess.isUnsafeAvailable()) {
34-
// using SynchronizedQueue until issues are solved with SpscArrayQueue offer rejection
35-
// RxRingBufferSpmcTest.testConcurrency occasionally fails with a
36-
// BackpressureException when using SpscArrayQueue
37-
// return new RxRingBuffer(SPSC_POOL, SIZE); // this is the one we were trying to use
38-
// return new RxRingBuffer(new SpscArrayQueue<Object>(SIZE), SIZE);
39-
// the performance of this is sufficient (actually faster in some cases)
40-
return new RxRingBuffer(new SynchronizedQueue<Object>(SIZE), SIZE);
36+
// TODO the SpscArrayQueue isn't ready yet so using SpmcArrayQueue for now
37+
return new RxRingBuffer(SPMC_POOL, SIZE);
4138
} else {
4239
return new RxRingBuffer();
4340
}
4441
}
4542

4643
public static RxRingBuffer getSpmcInstance() {
4744
if (UnsafeAccess.isUnsafeAvailable()) {
48-
// using SynchronizedQueue until issues are solved with SpmcArrayQueue offer rejection
49-
// RxRingBufferSpmcTest.testConcurrency occasionally fails with a
50-
// BackpressureException when using SpmcArrayQueue/MpmcArrayQueue
51-
// return new RxRingBuffer(SPMC_POOL, SIZE); // this is the one we were trying to use
52-
// return new RxRingBuffer(new SpmcArrayQueue<Object>(SIZE), SIZE);
53-
// return new RxRingBuffer(new MpmcArrayQueue<Object>(SIZE), SIZE);
54-
// the performance of this is sufficient (actually faster in some cases)
55-
return new RxRingBuffer(new SynchronizedQueue<Object>(SIZE), SIZE);
45+
return new RxRingBuffer(SPMC_POOL, SIZE);
5646
} else {
5747
return new RxRingBuffer();
5848
}
@@ -170,6 +160,24 @@ public static RxRingBuffer getSpmcInstance() {
170160

171161
public static final int SIZE = 1024;
172162

163+
private static ObjectPool<Queue<Object>> SPSC_POOL = new ObjectPool<Queue<Object>>() {
164+
165+
@Override
166+
protected SpscArrayQueue<Object> createObject() {
167+
return new SpscArrayQueue<Object>(SIZE);
168+
}
169+
170+
};
171+
172+
private static ObjectPool<Queue<Object>> SPMC_POOL = new ObjectPool<Queue<Object>>() {
173+
174+
@Override
175+
protected SpmcArrayQueue<Object> createObject() {
176+
return new SpmcArrayQueue<Object>(SIZE);
177+
}
178+
179+
};
180+
173181
private RxRingBuffer(Queue<Object> queue, int size) {
174182
this.queue = queue;
175183
this.pool = null;

rxjava-core/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,15 @@ public boolean offer(final E e) {
135135
final long currProducerIndex = lvProducerIndex();
136136
final long offset = calcElementOffset(currProducerIndex);
137137
if (null != lvElement(lb, offset)) {
138-
return false;
138+
// strict check as per https://github.com/JCTools/JCTools/issues/21#issuecomment-50204120
139+
int size = (int) (currProducerIndex - lvConsumerIndex());
140+
if (size == capacity) {
141+
return false;
142+
}
143+
else {
144+
// spin wait for slot to clear, buggers wait freedom
145+
while (null != lvElement(lb, offset));
146+
}
139147
}
140148
spElement(lb, offset, e);
141149
// single producer, so store ordered is valid. It is also required to correctly publish the element

0 commit comments

Comments
 (0)