Skip to content

Commit e6824c1

Browse files
mostroverkhovakarnokd
authored andcommitted
1.X: UnicastSubject fail-fast and delay-error behavior (ReactiveX#5195)
* 1.X: unicastSubject does not replay onNext calls made prior to subscription if onError is also called prior to subscription. * cache delayError field in local variable
1 parent 0f1542d commit e6824c1

File tree

2 files changed

+153
-10
lines changed

2 files changed

+153
-10
lines changed

src/main/java/rx/subjects/UnicastSubject.java

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public final class UnicastSubject<T> extends Subject<T, T> {
4949
public static <T> UnicastSubject<T> create() {
5050
return create(16);
5151
}
52+
5253
/**
5354
* Constructs an empty UnicastSubject instance with a capacity hint.
5455
* <p>The capacity hint determines the internal queue's island size: the larger
@@ -59,7 +60,18 @@ public static <T> UnicastSubject<T> create() {
5960
* @return the created BufferUntilSubscriber instance
6061
*/
6162
public static <T> UnicastSubject<T> create(int capacityHint) {
62-
State<T> state = new State<T>(capacityHint, null);
63+
State<T> state = new State<T>(capacityHint, false, null);
64+
return new UnicastSubject<T>(state);
65+
}
66+
67+
/**
68+
* Constructs an empty UnicastSubject instance with the default capacity hint of 16 elements.
69+
*
70+
* @param delayError deliver pending next events before error.
71+
* @return the created UnicastSubject instance
72+
*/
73+
public static <T> UnicastSubject<T> create(boolean delayError) {
74+
State<T> state = new State<T>(16, delayError, null);
6375
return new UnicastSubject<T>(state);
6476
}
6577

@@ -78,7 +90,28 @@ public static <T> UnicastSubject<T> create(int capacityHint) {
7890
* @return the created BufferUntilSubscriber instance
7991
*/
8092
public static <T> UnicastSubject<T> create(int capacityHint, Action0 onTerminated) {
81-
State<T> state = new State<T>(capacityHint, onTerminated);
93+
State<T> state = new State<T>(capacityHint, false, onTerminated);
94+
return new UnicastSubject<T>(state);
95+
}
96+
97+
/**
98+
* Constructs an empty UnicastSubject instance with a capacity hint, delay error
99+
* flag and Action0 instance to call if the subject reaches its terminal state
100+
* or the single Subscriber unsubscribes mid-sequence.
101+
* <p>The capacity hint determines the internal queue's island size: the larger
102+
* it is the less frequent allocation will happen if there is no subscriber
103+
* or the subscriber hasn't caught up.
104+
* @param <T> the input and output value type
105+
* @param capacityHint the capacity hint for the internal queue
106+
* @param onTerminated the optional callback to call when subject reaches its terminal state
107+
* or the single Subscriber unsubscribes mid-sequence. It will be called
108+
* at most once.
109+
* @param delayError flag indicating whether to deliver pending next events before error.
110+
* @return the created BufferUntilSubscriber instance
111+
*/
112+
public static <T> UnicastSubject<T> create(int capacityHint,
113+
Action0 onTerminated, boolean delayError) {
114+
State<T> state = new State<T>(capacityHint, delayError, onTerminated);
82115
return new UnicastSubject<T>(state);
83116
}
84117

@@ -119,6 +152,8 @@ static final class State<T> extends AtomicLong implements Producer, Observer<T>,
119152
final AtomicReference<Subscriber<? super T>> subscriber;
120153
/** The queue holding values until the subscriber arrives and catches up. */
121154
final Queue<Object> queue;
155+
/** Deliver pending next events before error. */
156+
final boolean delayError;
122157
/** Atomically set to true on terminal condition. */
123158
final AtomicReference<Action0> terminateOnce;
124159
/** In case the source emitted an error. */
@@ -137,10 +172,12 @@ static final class State<T> extends AtomicLong implements Producer, Observer<T>,
137172
* reduce allocation frequency
138173
* @param onTerminated the action to call when the subject reaches its terminal state or
139174
* the single subscriber unsubscribes.
175+
* @param delayError deliver pending next events before error.
140176
*/
141-
public State(int capacityHint, Action0 onTerminated) {
177+
public State(int capacityHint, boolean delayError, Action0 onTerminated) {
142178
this.subscriber = new AtomicReference<Subscriber<? super T>>();
143179
this.terminateOnce = onTerminated != null ? new AtomicReference<Action0>(onTerminated) : null;
180+
this.delayError = delayError;
144181

145182
Queue<Object> q;
146183
if (capacityHint > 1) {
@@ -266,14 +303,14 @@ void replay() {
266303
emitting = true;
267304
}
268305
Queue<Object> q = queue;
306+
boolean delayError = this.delayError;
269307
for (;;) {
270308
Subscriber<? super T> s = subscriber.get();
271309
boolean unlimited = false;
272310
if (s != null) {
273311
boolean d = done;
274312
boolean empty = q.isEmpty();
275-
276-
if (checkTerminated(d, empty, s)) {
313+
if (checkTerminated(d, empty, delayError, s)) {
277314
return;
278315
}
279316
long r = get();
@@ -284,7 +321,7 @@ void replay() {
284321
d = done;
285322
Object v = q.poll();
286323
empty = v == null;
287-
if (checkTerminated(d, empty, s)) {
324+
if (checkTerminated(d, empty, delayError, s)) {
288325
return;
289326
}
290327
if (empty) {
@@ -348,23 +385,28 @@ public boolean isUnsubscribed() {
348385
* an error happened or the source terminated and the queue is empty
349386
* @param done indicates the source has called onCompleted
350387
* @param empty indicates if there are no more source values in the queue
388+
* @param delayError indicates whether to deliver pending next events before error
351389
* @param s the target Subscriber to emit events to
352390
* @return true if this Subject reached a terminal state and the drain loop should quit
353391
*/
354-
boolean checkTerminated(boolean done, boolean empty, Subscriber<? super T> s) {
392+
boolean checkTerminated(boolean done, boolean empty, boolean delayError, Subscriber<? super T> s) {
355393
if (s.isUnsubscribed()) {
356394
queue.clear();
357395
return true;
358396
}
359397
if (done) {
360398
Throwable e = error;
361-
if (e != null) {
399+
if (e != null && !delayError) {
362400
queue.clear();
363401
s.onError(e);
364402
return true;
365-
} else
403+
}
366404
if (empty) {
367-
s.onCompleted();
405+
if (e != null) {
406+
s.onError(e);
407+
} else {
408+
s.onCompleted();
409+
}
368410
return true;
369411
}
370412
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package rx.subjects;
2+
3+
import org.junit.Test;
4+
import rx.functions.Action0;
5+
import rx.observers.TestSubscriber;
6+
7+
public class UnicastSubjectTest {
8+
9+
@Test
10+
public void testOneArgFactoryDelayError() throws Exception {
11+
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
12+
UnicastSubject<Long> s = UnicastSubject.create(true);
13+
s.onNext(1L);
14+
s.onNext(2L);
15+
s.onError(new RuntimeException());
16+
s.subscribe(subscriber);
17+
subscriber.assertValueCount(2);
18+
subscriber.assertError(RuntimeException.class);
19+
}
20+
21+
@Test
22+
public void testOneArgFactoryNoDelayError() throws Exception {
23+
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
24+
UnicastSubject<Long> s = UnicastSubject.create(false);
25+
s.onNext(1L);
26+
s.onNext(2L);
27+
s.onError(new RuntimeException());
28+
s.subscribe(subscriber);
29+
subscriber.assertValueCount(0);
30+
subscriber.assertError(RuntimeException.class);
31+
}
32+
33+
@Test
34+
public void testThreeArgsFactoryDelayError() throws Exception {
35+
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
36+
UnicastSubject<Long> s = UnicastSubject.create(16, new NoopAction0(), true);
37+
s.onNext(1L);
38+
s.onNext(2L);
39+
s.onError(new RuntimeException());
40+
s.subscribe(subscriber);
41+
subscriber.assertValueCount(2);
42+
subscriber.assertError(RuntimeException.class);
43+
}
44+
45+
@Test
46+
public void testThreeArgsFactoryNoDelayError() throws Exception {
47+
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
48+
UnicastSubject<Long> s = UnicastSubject.create(16, new NoopAction0(), false);
49+
s.onNext(1L);
50+
s.onNext(2L);
51+
s.onError(new RuntimeException());
52+
s.subscribe(subscriber);
53+
subscriber.assertValueCount(0);
54+
subscriber.assertError(RuntimeException.class);
55+
}
56+
57+
@Test
58+
public void testZeroArgsFactory() throws Exception {
59+
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
60+
UnicastSubject<Long> s = UnicastSubject.create();
61+
s.onNext(1L);
62+
s.onNext(2L);
63+
s.onError(new RuntimeException());
64+
s.subscribe(subscriber);
65+
subscriber.assertValueCount(0);
66+
subscriber.assertError(RuntimeException.class);
67+
}
68+
69+
@Test
70+
public void testOneArgFactory() throws Exception {
71+
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
72+
UnicastSubject<Long> s = UnicastSubject.create(16);
73+
s.onNext(1L);
74+
s.onNext(2L);
75+
s.onError(new RuntimeException());
76+
s.subscribe(subscriber);
77+
subscriber.assertValueCount(0);
78+
subscriber.assertError(RuntimeException.class);
79+
}
80+
81+
@Test
82+
public void testTwoArgsFactory() throws Exception {
83+
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
84+
UnicastSubject<Long> s = UnicastSubject.create(16, new NoopAction0());
85+
s.onNext(1L);
86+
s.onNext(2L);
87+
s.onError(new RuntimeException());
88+
s.subscribe(subscriber);
89+
subscriber.assertValueCount(0);
90+
subscriber.assertError(RuntimeException.class);
91+
}
92+
93+
94+
95+
private static final class NoopAction0 implements Action0 {
96+
97+
@Override
98+
public void call() {
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)