Skip to content

Commit ac74a79

Browse files
BugFix: Unsubscribing does not work when using subscribeOn(Schedulers.newThread())
I believe this fixes ReactiveX#431
1 parent 2e2ab16 commit ac74a79

File tree

3 files changed

+106
-11
lines changed

3 files changed

+106
-11
lines changed

rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void run() {
5757
}
5858
}, initialDelay, period, unit);
5959

60-
subscriptions.add(Subscriptions.create(f));
60+
subscriptions.add(Subscriptions.from(f));
6161
return subscriptions;
6262

6363
} else {
@@ -84,7 +84,7 @@ public void run() {
8484
}
8585
}, delayTime, unit);
8686
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
87-
subscription.add(Subscriptions.create(f));
87+
subscription.add(Subscriptions.from(f));
8888
} else {
8989
// we are not a ScheduledExecutorService so can't directly schedule
9090
if (delayTime == 0) {
@@ -106,7 +106,7 @@ public void run() {
106106
}
107107
}, delayTime, unit);
108108
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
109-
subscription.add(Subscriptions.create(f));
109+
subscription.add(Subscriptions.from(f));
110110
}
111111
}
112112
return subscription;
@@ -134,7 +134,7 @@ public void run() {
134134
// we are an ExecutorService so get a Future back that supports unsubscribe
135135
Future<?> f = ((ExecutorService) executor).submit(r);
136136
// add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens
137-
subscription.add(Subscriptions.create(f));
137+
subscription.add(Subscriptions.from(f));
138138
} else {
139139
// we are the lowest common denominator so can't unsubscribe once we execute
140140
executor.execute(r);

rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,22 @@ public Thread newThread(Runnable r) {
5858
}
5959

6060
@Override
61-
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
61+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
62+
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
63+
// all subscriptions that may need to be unsubscribed
64+
final CompositeSubscription subscription = new CompositeSubscription(discardableAction);
65+
6266
final Scheduler _scheduler = this;
63-
return Subscriptions.from(executor.submit(new Runnable() {
67+
subscription.add(Subscriptions.from(executor.submit(new Runnable() {
6468

6569
@Override
6670
public void run() {
67-
action.call(_scheduler, state);
71+
Subscription s = discardableAction.call(_scheduler);
72+
subscription.add(s);
6873
}
69-
}));
74+
})));
75+
76+
return subscription;
7077
}
7178

7279
@Override
@@ -89,15 +96,15 @@ public void run() {
8996
}, delayTime, unit);
9097

9198
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
92-
subscription.add(Subscriptions.create(f));
99+
subscription.add(Subscriptions.from(f));
93100

94101
return subscription;
95102
}
96103

97104
}
98105

99106
@Override
100-
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
107+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
101108
EventLoopScheduler s = new EventLoopScheduler();
102109
return s.schedule(state, action);
103110
}
@@ -122,7 +129,7 @@ public void run() {
122129
}, delay, unit);
123130

124131
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
125-
subscription.add(Subscriptions.create(f));
132+
subscription.add(Subscriptions.from(f));
126133

127134
return subscription;
128135
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package rx.concurrency;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import org.junit.Test;
10+
11+
import rx.Observable;
12+
import rx.Observer;
13+
import rx.Scheduler;
14+
import rx.operators.SafeObservableSubscription;
15+
import rx.util.functions.Func1;
16+
17+
public class SchedulerUnsubscribeTest {
18+
19+
/**
20+
* Bug report: https://github.com/Netflix/RxJava/issues/431
21+
*/
22+
@Test
23+
public void testUnsubscribeOfNewThread() throws InterruptedException {
24+
testUnSubscribeForScheduler(Schedulers.newThread());
25+
}
26+
27+
@Test
28+
public void testUnsubscribeOfThreadPoolForIO() throws InterruptedException {
29+
testUnSubscribeForScheduler(Schedulers.threadPoolForIO());
30+
}
31+
32+
@Test
33+
public void testUnsubscribeOfThreadPoolForComputation() throws InterruptedException {
34+
testUnSubscribeForScheduler(Schedulers.threadPoolForComputation());
35+
}
36+
37+
@Test
38+
public void testUnsubscribeOfCurrentThread() throws InterruptedException {
39+
testUnSubscribeForScheduler(Schedulers.currentThread());
40+
}
41+
42+
public void testUnSubscribeForScheduler(Scheduler scheduler) throws InterruptedException {
43+
44+
final AtomicInteger countReceived = new AtomicInteger();
45+
final AtomicInteger countGenerated = new AtomicInteger();
46+
final SafeObservableSubscription s = new SafeObservableSubscription();
47+
final CountDownLatch latch = new CountDownLatch(1);
48+
49+
s.wrap(Observable.interval(50, TimeUnit.MILLISECONDS)
50+
.map(new Func1<Long, Long>() {
51+
@Override
52+
public Long call(Long aLong) {
53+
System.out.println("generated " + aLong);
54+
countGenerated.incrementAndGet();
55+
return aLong;
56+
}
57+
})
58+
.subscribeOn(scheduler)
59+
.observeOn(Schedulers.currentThread())
60+
.subscribe(new Observer<Long>() {
61+
@Override
62+
public void onCompleted() {
63+
System.out.println("--- completed");
64+
}
65+
66+
@Override
67+
public void onError(Throwable e) {
68+
System.out.println("--- onError");
69+
}
70+
71+
@Override
72+
public void onNext(Long args) {
73+
if (countReceived.incrementAndGet() == 2) {
74+
s.unsubscribe();
75+
latch.countDown();
76+
}
77+
System.out.println("==> Received " + args);
78+
}
79+
}));
80+
81+
latch.await(1000, TimeUnit.MILLISECONDS);
82+
83+
System.out.println("----------- it thinks it is finished ------------------ ");
84+
Thread.sleep(100);
85+
86+
assertEquals(2, countGenerated.get());
87+
}
88+
}

0 commit comments

Comments
 (0)