Skip to content

Commit 5b5d99f

Browse files
Move MPSC Queue to rx.internal.util
1 parent 397b1d3 commit 5b5d99f

File tree

5 files changed

+190
-187
lines changed

5 files changed

+190
-187
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,129 +1,129 @@
1-
/**
2-
* Copyright 2014 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.observers;
17-
18-
import java.util.concurrent.atomic.AtomicReference;
19-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20-
21-
/**
22-
* A multiple-producer single consumer queue implementation with padded reference
23-
* to tail to avoid cache-line thrashing.
24-
* Based on Netty's <a href='https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java'>MpscQueue implementation</a> but using AtomicReferenceFieldUpdater
25-
* instead of Unsafe.
26-
* @param <E> the element type
27-
*/
28-
public final class MpscPaddedQueue<E> extends AtomicReference<MpscPaddedQueue.Node<E>> {
29-
@SuppressWarnings(value = "rawtypes")
30-
static final AtomicReferenceFieldUpdater<PaddedNode, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNode.class, Node.class, "tail");
31-
/** */
32-
private static final long serialVersionUID = 1L;
33-
/** The padded tail reference. */
34-
final PaddedNode<E> tail;
35-
/**
36-
* Initializes the empty queue.
37-
*/
38-
public MpscPaddedQueue() {
39-
Node<E> first = new Node<E>(null);
40-
tail = new PaddedNode<E>();
41-
tail.tail = first;
42-
set(first);
43-
}
44-
/**
45-
* Offer a new value.
46-
* @param v the value to offer
47-
*/
48-
public void offer(E v) {
49-
Node<E> n = new Node<E>(v);
50-
getAndSet(n).set(n);
51-
}
52-
53-
/**
54-
* @return Poll a value from the head of the queue or return null if the queue is empty.
55-
*/
56-
public E poll() {
57-
Node<E> n = peekNode();
58-
if (n == null) {
59-
return null;
60-
}
61-
E v = n.value;
62-
n.value = null; // do not retain this value as the node still stays in the queue
63-
TAIL_UPDATER.lazySet(tail, n);
64-
return v;
65-
}
66-
/**
67-
* Check if there is a node available without changing anything.
68-
*/
69-
private Node<E> peekNode() {
70-
for (;;) {
71-
@SuppressWarnings(value = "unchecked")
72-
Node<E> t = TAIL_UPDATER.get(tail);
73-
Node<E> n = t.get();
74-
if (n != null || get() == t) {
75-
return n;
76-
}
77-
}
78-
}
79-
/**
80-
* Clears the queue.
81-
*/
82-
public void clear() {
83-
for (;;) {
84-
if (poll() == null) {
85-
break;
86-
}
87-
}
88-
}
89-
/** Class that contains a Node reference padded around to fit a typical cache line. */
90-
static final class PaddedNode<E> {
91-
/** Padding, public to prevent optimizing it away. */
92-
public int p1;
93-
volatile Node<E> tail;
94-
/** Padding, public to prevent optimizing it away. */
95-
public long p2;
96-
/** Padding, public to prevent optimizing it away. */
97-
public long p3;
98-
/** Padding, public to prevent optimizing it away. */
99-
public long p4;
100-
/** Padding, public to prevent optimizing it away. */
101-
public long p5;
102-
/** Padding, public to prevent optimizing it away. */
103-
public long p6;
104-
}
105-
/**
106-
* Regular node with value and reference to the next node.
107-
*/
108-
static final class Node<E> {
109-
110-
E value;
111-
@SuppressWarnings(value = "rawtypes")
112-
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");
113-
volatile Node<E> tail;
114-
115-
public Node(E value) {
116-
this.value = value;
117-
}
118-
119-
public void set(Node<E> newTail) {
120-
TAIL_UPDATER.lazySet(this, newTail);
121-
}
122-
123-
@SuppressWarnings(value = "unchecked")
124-
public Node<E> get() {
125-
return TAIL_UPDATER.get(this);
126-
}
127-
}
128-
129-
}
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import java.util.concurrent.atomic.AtomicReference;
19+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20+
21+
/**
22+
* A multiple-producer single consumer queue implementation with padded reference
23+
* to tail to avoid cache-line thrashing.
24+
* Based on Netty's <a href='https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java'>MpscQueue implementation</a> but using AtomicReferenceFieldUpdater
25+
* instead of Unsafe.
26+
* @param <E> the element type
27+
*/
28+
public final class MpscPaddedQueue<E> extends AtomicReference<MpscPaddedQueue.Node<E>> {
29+
@SuppressWarnings(value = "rawtypes")
30+
static final AtomicReferenceFieldUpdater<PaddedNode, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNode.class, Node.class, "tail");
31+
/** */
32+
private static final long serialVersionUID = 1L;
33+
/** The padded tail reference. */
34+
final PaddedNode<E> tail;
35+
/**
36+
* Initializes the empty queue.
37+
*/
38+
public MpscPaddedQueue() {
39+
Node<E> first = new Node<E>(null);
40+
tail = new PaddedNode<E>();
41+
tail.tail = first;
42+
set(first);
43+
}
44+
/**
45+
* Offer a new value.
46+
* @param v the value to offer
47+
*/
48+
public void offer(E v) {
49+
Node<E> n = new Node<E>(v);
50+
getAndSet(n).set(n);
51+
}
52+
53+
/**
54+
* @return Poll a value from the head of the queue or return null if the queue is empty.
55+
*/
56+
public E poll() {
57+
Node<E> n = peekNode();
58+
if (n == null) {
59+
return null;
60+
}
61+
E v = n.value;
62+
n.value = null; // do not retain this value as the node still stays in the queue
63+
TAIL_UPDATER.lazySet(tail, n);
64+
return v;
65+
}
66+
/**
67+
* Check if there is a node available without changing anything.
68+
*/
69+
private Node<E> peekNode() {
70+
for (;;) {
71+
@SuppressWarnings(value = "unchecked")
72+
Node<E> t = TAIL_UPDATER.get(tail);
73+
Node<E> n = t.get();
74+
if (n != null || get() == t) {
75+
return n;
76+
}
77+
}
78+
}
79+
/**
80+
* Clears the queue.
81+
*/
82+
public void clear() {
83+
for (;;) {
84+
if (poll() == null) {
85+
break;
86+
}
87+
}
88+
}
89+
/** Class that contains a Node reference padded around to fit a typical cache line. */
90+
static final class PaddedNode<E> {
91+
/** Padding, public to prevent optimizing it away. */
92+
public int p1;
93+
volatile Node<E> tail;
94+
/** Padding, public to prevent optimizing it away. */
95+
public long p2;
96+
/** Padding, public to prevent optimizing it away. */
97+
public long p3;
98+
/** Padding, public to prevent optimizing it away. */
99+
public long p4;
100+
/** Padding, public to prevent optimizing it away. */
101+
public long p5;
102+
/** Padding, public to prevent optimizing it away. */
103+
public long p6;
104+
}
105+
/**
106+
* Regular node with value and reference to the next node.
107+
*/
108+
static final class Node<E> {
109+
110+
E value;
111+
@SuppressWarnings(value = "rawtypes")
112+
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");
113+
volatile Node<E> tail;
114+
115+
public Node(E value) {
116+
this.value = value;
117+
}
118+
119+
public void set(Node<E> newTail) {
120+
TAIL_UPDATER.lazySet(this, newTail);
121+
}
122+
123+
@SuppressWarnings(value = "unchecked")
124+
public Node<E> get() {
125+
return TAIL_UPDATER.get(this);
126+
}
127+
}
128+
129+
}
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,56 @@
1-
/**
2-
* Copyright 2014 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.observers;
17-
18-
import java.util.concurrent.atomic.AtomicInteger;
19-
20-
/**
21-
* An AtomicInteger with extra fields to pad it out to fit a typical cache line.
22-
*/
23-
public final class PaddedAtomicInteger extends AtomicInteger {
24-
private static final long serialVersionUID = 1L;
25-
/** Padding, public to prevent optimizing it away. */
26-
public int p1;
27-
/** Padding, public to prevent optimizing it away. */
28-
public int p2;
29-
/** Padding, public to prevent optimizing it away. */
30-
public int p3;
31-
/** Padding, public to prevent optimizing it away. */
32-
public int p4;
33-
/** Padding, public to prevent optimizing it away. */
34-
public int p5;
35-
/** Padding, public to prevent optimizing it away. */
36-
public int p6;
37-
/** Padding, public to prevent optimizing it away. */
38-
public int p7;
39-
/** Padding, public to prevent optimizing it away. */
40-
public int p8;
41-
/** Padding, public to prevent optimizing it away. */
42-
public int p9;
43-
/** Padding, public to prevent optimizing it away. */
44-
public int p10;
45-
/** Padding, public to prevent optimizing it away. */
46-
public int p11;
47-
/** Padding, public to prevent optimizing it away. */
48-
public int p12;
49-
/** Padding, public to prevent optimizing it away. */
50-
public int p13;
51-
/** @return prevents optimizing away the fields, most likely. */
52-
public int noopt() {
53-
return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13;
54-
}
55-
56-
}
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
/**
21+
* An AtomicInteger with extra fields to pad it out to fit a typical cache line.
22+
*/
23+
public final class PaddedAtomicInteger extends AtomicInteger {
24+
private static final long serialVersionUID = 1L;
25+
/** Padding, public to prevent optimizing it away. */
26+
public int p1;
27+
/** Padding, public to prevent optimizing it away. */
28+
public int p2;
29+
/** Padding, public to prevent optimizing it away. */
30+
public int p3;
31+
/** Padding, public to prevent optimizing it away. */
32+
public int p4;
33+
/** Padding, public to prevent optimizing it away. */
34+
public int p5;
35+
/** Padding, public to prevent optimizing it away. */
36+
public int p6;
37+
/** Padding, public to prevent optimizing it away. */
38+
public int p7;
39+
/** Padding, public to prevent optimizing it away. */
40+
public int p8;
41+
/** Padding, public to prevent optimizing it away. */
42+
public int p9;
43+
/** Padding, public to prevent optimizing it away. */
44+
public int p10;
45+
/** Padding, public to prevent optimizing it away. */
46+
public int p11;
47+
/** Padding, public to prevent optimizing it away. */
48+
public int p12;
49+
/** Padding, public to prevent optimizing it away. */
50+
public int p13;
51+
/** @return prevents optimizing away the fields, most likely. */
52+
public int noopt() {
53+
return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13;
54+
}
55+
56+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
This `rx.internal.*` package is for internal use only. Any code here can change at any time and is not considered part of the public API, even if the classes are `public` so as to be used from other packages within `rx.*`.
2+
3+
If you depend on these classes, your code may break in any future RxJava release, even if it's just a patch release (major.minor.patch).

rxjava-core/src/main/java/rx/observers/SerializedObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,4 +206,4 @@ void drainQueue(FastList list) {
206206
}
207207
}
208208
}
209-
}
209+
}

rxjava-core/src/main/java/rx/operators/NotificationLite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,4 +182,4 @@ public T getValue(Object n) {
182182
public Throwable getError(Object n) {
183183
return ((OnErrorSentinel) n).e;
184184
}
185-
}
185+
}

0 commit comments

Comments
 (0)