Skip to content

Commit dd52daf

Browse files
Merge pull request ReactiveX#1264 from akarnokd/ObserveOnScheduleUnsubscribe
ObserveOn scheduled unsubscription
2 parents 78c250b + e1b4348 commit dd52daf

File tree

2 files changed

+66
-1
lines changed

2 files changed

+66
-1
lines changed

rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
1819
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
1920

2021
import rx.Observable.Operator;
2122
import rx.Scheduler;
2223
import rx.Subscriber;
24+
import rx.Subscription;
2325
import rx.functions.Action0;
2426
import rx.schedulers.ImmediateScheduler;
2527
import rx.schedulers.TrampolineScheduler;
@@ -59,6 +61,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
5961
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
6062
final Subscriber<? super T> observer;
6163
private final Scheduler.Worker recursiveScheduler;
64+
private final ScheduledUnsubscribe scheduledUnsubscribe;
6265
final NotificationLite<T> on = NotificationLite.instance();
6366
/** Guarded by this. */
6467
private FastList queue = new FastList();
@@ -72,11 +75,15 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> subscriber
7275
super(subscriber);
7376
this.observer = subscriber;
7477
this.recursiveScheduler = scheduler.createWorker();
75-
subscriber.add(recursiveScheduler);
78+
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
79+
subscriber.add(scheduledUnsubscribe);
7680
}
7781

7882
@Override
7983
public void onNext(final T t) {
84+
if (scheduledUnsubscribe.isUnsubscribed()) {
85+
return;
86+
}
8087
synchronized (this) {
8188
queue.add(on.next(t));
8289
}
@@ -85,6 +92,9 @@ public void onNext(final T t) {
8592

8693
@Override
8794
public void onCompleted() {
95+
if (scheduledUnsubscribe.isUnsubscribed()) {
96+
return;
97+
}
8898
synchronized (this) {
8999
queue.add(on.completed());
90100
}
@@ -93,6 +103,9 @@ public void onCompleted() {
93103

94104
@Override
95105
public void onError(final Throwable e) {
106+
if (scheduledUnsubscribe.isUnsubscribed()) {
107+
return;
108+
}
96109
synchronized (this) {
97110
queue.add(on.error(e));
98111
}
@@ -153,4 +166,32 @@ public void add(Object o) {
153166
size = s + 1;
154167
}
155168
}
169+
static final class ScheduledUnsubscribe implements Subscription {
170+
final Scheduler.Worker worker;
171+
volatile int once;
172+
static final AtomicIntegerFieldUpdater<ScheduledUnsubscribe> ONCE_UPDATER
173+
= AtomicIntegerFieldUpdater.newUpdater(ScheduledUnsubscribe.class, "once");
174+
175+
public ScheduledUnsubscribe(Scheduler.Worker worker) {
176+
this.worker = worker;
177+
}
178+
179+
@Override
180+
public boolean isUnsubscribed() {
181+
return once != 0;
182+
}
183+
184+
@Override
185+
public void unsubscribe() {
186+
if (ONCE_UPDATER.getAndSet(this, 1) == 0) {
187+
worker.schedule(new Action0() {
188+
@Override
189+
public void call() {
190+
worker.unsubscribe();
191+
}
192+
});
193+
}
194+
}
195+
196+
}
156197
}

rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,14 @@
3333

3434
import org.junit.Test;
3535
import org.mockito.InOrder;
36+
import static org.mockito.Matchers.anyInt;
3637
import org.mockito.invocation.InvocationOnMock;
3738
import org.mockito.stubbing.Answer;
3839

3940
import rx.Observable;
4041
import rx.Observer;
4142
import rx.Scheduler;
43+
import rx.exceptions.TestException;
4244
import rx.functions.Action0;
4345
import rx.functions.Action1;
4446
import rx.functions.Func1;
@@ -365,4 +367,26 @@ private static int randomIntFrom0to100() {
365367
x ^= (x << 4);
366368
return Math.abs((int) x % 100);
367369
}
370+
371+
@Test
372+
public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
373+
TestScheduler testScheduler = new TestScheduler();
374+
375+
Observable<Integer> source = Observable.concat(Observable.<Integer>error(new TestException()), Observable.just(1));
376+
377+
378+
@SuppressWarnings("unchecked")
379+
Observer<Integer> o = mock(Observer.class);
380+
InOrder inOrder = inOrder(o);
381+
382+
source.observeOn(testScheduler).subscribe(o);
383+
384+
inOrder.verify(o, never()).onError(any(TestException.class));
385+
386+
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
387+
388+
inOrder.verify(o).onError(any(TestException.class));
389+
inOrder.verify(o, never()).onNext(anyInt());
390+
inOrder.verify(o, never()).onCompleted();
391+
}
368392
}

0 commit comments

Comments
 (0)