Skip to content

Commit f5b02fa

Browse files
Merge pull request ReactiveX#1284 from benjchristensen/util-with-mpsc
Manual merge of Lock-free, MPSC-queue based
2 parents 798fa7e + 5b5d99f commit f5b02fa

File tree

3 files changed

+188
-0
lines changed

3 files changed

+188
-0
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import java.util.concurrent.atomic.AtomicReference;
19+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20+
21+
/**
22+
* A multiple-producer single consumer queue implementation with padded reference
23+
* to tail to avoid cache-line thrashing.
24+
* Based on Netty's <a href='https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java'>MpscQueue implementation</a> but using AtomicReferenceFieldUpdater
25+
* instead of Unsafe.
26+
* @param <E> the element type
27+
*/
28+
public final class MpscPaddedQueue<E> extends AtomicReference<MpscPaddedQueue.Node<E>> {
29+
@SuppressWarnings(value = "rawtypes")
30+
static final AtomicReferenceFieldUpdater<PaddedNode, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNode.class, Node.class, "tail");
31+
/** */
32+
private static final long serialVersionUID = 1L;
33+
/** The padded tail reference. */
34+
final PaddedNode<E> tail;
35+
/**
36+
* Initializes the empty queue.
37+
*/
38+
public MpscPaddedQueue() {
39+
Node<E> first = new Node<E>(null);
40+
tail = new PaddedNode<E>();
41+
tail.tail = first;
42+
set(first);
43+
}
44+
/**
45+
* Offer a new value.
46+
* @param v the value to offer
47+
*/
48+
public void offer(E v) {
49+
Node<E> n = new Node<E>(v);
50+
getAndSet(n).set(n);
51+
}
52+
53+
/**
54+
* @return Poll a value from the head of the queue or return null if the queue is empty.
55+
*/
56+
public E poll() {
57+
Node<E> n = peekNode();
58+
if (n == null) {
59+
return null;
60+
}
61+
E v = n.value;
62+
n.value = null; // do not retain this value as the node still stays in the queue
63+
TAIL_UPDATER.lazySet(tail, n);
64+
return v;
65+
}
66+
/**
67+
* Check if there is a node available without changing anything.
68+
*/
69+
private Node<E> peekNode() {
70+
for (;;) {
71+
@SuppressWarnings(value = "unchecked")
72+
Node<E> t = TAIL_UPDATER.get(tail);
73+
Node<E> n = t.get();
74+
if (n != null || get() == t) {
75+
return n;
76+
}
77+
}
78+
}
79+
/**
80+
* Clears the queue.
81+
*/
82+
public void clear() {
83+
for (;;) {
84+
if (poll() == null) {
85+
break;
86+
}
87+
}
88+
}
89+
/** Class that contains a Node reference padded around to fit a typical cache line. */
90+
static final class PaddedNode<E> {
91+
/** Padding, public to prevent optimizing it away. */
92+
public int p1;
93+
volatile Node<E> tail;
94+
/** Padding, public to prevent optimizing it away. */
95+
public long p2;
96+
/** Padding, public to prevent optimizing it away. */
97+
public long p3;
98+
/** Padding, public to prevent optimizing it away. */
99+
public long p4;
100+
/** Padding, public to prevent optimizing it away. */
101+
public long p5;
102+
/** Padding, public to prevent optimizing it away. */
103+
public long p6;
104+
}
105+
/**
106+
* Regular node with value and reference to the next node.
107+
*/
108+
static final class Node<E> {
109+
110+
E value;
111+
@SuppressWarnings(value = "rawtypes")
112+
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");
113+
volatile Node<E> tail;
114+
115+
public Node(E value) {
116+
this.value = value;
117+
}
118+
119+
public void set(Node<E> newTail) {
120+
TAIL_UPDATER.lazySet(this, newTail);
121+
}
122+
123+
@SuppressWarnings(value = "unchecked")
124+
public Node<E> get() {
125+
return TAIL_UPDATER.get(this);
126+
}
127+
}
128+
129+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
/**
21+
* An AtomicInteger with extra fields to pad it out to fit a typical cache line.
22+
*/
23+
public final class PaddedAtomicInteger extends AtomicInteger {
24+
private static final long serialVersionUID = 1L;
25+
/** Padding, public to prevent optimizing it away. */
26+
public int p1;
27+
/** Padding, public to prevent optimizing it away. */
28+
public int p2;
29+
/** Padding, public to prevent optimizing it away. */
30+
public int p3;
31+
/** Padding, public to prevent optimizing it away. */
32+
public int p4;
33+
/** Padding, public to prevent optimizing it away. */
34+
public int p5;
35+
/** Padding, public to prevent optimizing it away. */
36+
public int p6;
37+
/** Padding, public to prevent optimizing it away. */
38+
public int p7;
39+
/** Padding, public to prevent optimizing it away. */
40+
public int p8;
41+
/** Padding, public to prevent optimizing it away. */
42+
public int p9;
43+
/** Padding, public to prevent optimizing it away. */
44+
public int p10;
45+
/** Padding, public to prevent optimizing it away. */
46+
public int p11;
47+
/** Padding, public to prevent optimizing it away. */
48+
public int p12;
49+
/** Padding, public to prevent optimizing it away. */
50+
public int p13;
51+
/** @return prevents optimizing away the fields, most likely. */
52+
public int noopt() {
53+
return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13;
54+
}
55+
56+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
This `rx.internal.*` package is for internal use only. Any code here can change at any time and is not considered part of the public API, even if the classes are `public` so as to be used from other packages within `rx.*`.
2+
3+
If you depend on these classes, your code may break in any future RxJava release, even if it's just a patch release (major.minor.patch).

0 commit comments

Comments
 (0)