Skip to content

Commit c8e22d8

Browse files
yshrsmzakarnokd
authored andcommitted
add unsubscribeOn to Single type (ReactiveX#5138) (ReactiveX#5146)
1 parent 2a0427b commit c8e22d8

File tree

2 files changed

+106
-0
lines changed

2 files changed

+106
-0
lines changed

src/main/java/rx/Single.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2808,6 +2808,50 @@ public final Single<T> delaySubscription(Observable<?> other) {
28082808
return create(new SingleOnSubscribeDelaySubscriptionOther<T>(this, other));
28092809
}
28102810

2811+
/**
2812+
* Returns a Single which makes sure when a subscriber cancels the subscription,
2813+
* the dispose is called on the specified scheduler
2814+
* @param scheduler the target scheduler where to execute the cancellation
2815+
* @return the new Single instance
2816+
*/
2817+
public final Single<T> unsubscribeOn(final Scheduler scheduler) {
2818+
return create(new OnSubscribe<T>() {
2819+
@Override
2820+
public void call(final SingleSubscriber<? super T> t) {
2821+
final SingleSubscriber<T> single = new SingleSubscriber<T>() {
2822+
@Override
2823+
public void onSuccess(T value) {
2824+
t.onSuccess(value);
2825+
}
2826+
2827+
@Override
2828+
public void onError(Throwable error) {
2829+
t.onError(error);
2830+
}
2831+
};
2832+
2833+
t.add(Subscriptions.create(new Action0() {
2834+
@Override
2835+
public void call() {
2836+
final Scheduler.Worker w = scheduler.createWorker();
2837+
w.schedule(new Action0() {
2838+
@Override
2839+
public void call() {
2840+
try {
2841+
single.unsubscribe();
2842+
} finally {
2843+
w.unsubscribe();
2844+
}
2845+
}
2846+
});
2847+
}
2848+
}));
2849+
2850+
Single.this.subscribe(single);
2851+
}
2852+
});
2853+
}
2854+
28112855
// -------------------------------------------------------------------------
28122856
// Fluent test support, super handy and reduces test preparation boilerplate
28132857
// -------------------------------------------------------------------------

src/test/java/rx/SingleTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import rx.Single.OnSubscribe;
2929
import rx.exceptions.*;
3030
import rx.functions.*;
31+
import rx.internal.util.RxThreadFactory;
3132
import rx.observers.*;
3233
import rx.plugins.RxJavaHooks;
3334
import rx.schedulers.*;
@@ -2230,4 +2231,65 @@ public void call(Throwable t) {
22302231

22312232
assertEquals(1, calls[0]);
22322233
}
2234+
2235+
@Test
2236+
public void unsubscribeOnSuccess() throws InterruptedException {
2237+
final AtomicReference<String> name = new AtomicReference<String>();
2238+
2239+
final CountDownLatch cdl = new CountDownLatch(1);
2240+
2241+
TestSubscriber<Integer> ts = TestSubscriber.create();
2242+
2243+
Single.fromCallable(new Callable<Integer>() {
2244+
@Override
2245+
public Integer call() throws Exception {
2246+
return 1;
2247+
}
2248+
})
2249+
.doOnUnsubscribe(new Action0() {
2250+
@Override
2251+
public void call() {
2252+
name.set(Thread.currentThread().getName());
2253+
cdl.countDown();
2254+
}
2255+
})
2256+
.subscribeOn(Schedulers.io())
2257+
.unsubscribeOn(Schedulers.computation())
2258+
.subscribe(ts);
2259+
2260+
cdl.await();
2261+
2262+
ts.awaitTerminalEvent();
2263+
ts.assertReceivedOnNext(Arrays.asList(1));
2264+
2265+
assertTrue(name.get().startsWith("RxComputation"));
2266+
}
2267+
2268+
@Test
2269+
public void unsubscribeOnError() throws InterruptedException {
2270+
final AtomicReference<String> name = new AtomicReference<String>();
2271+
2272+
final CountDownLatch cdl = new CountDownLatch(1);
2273+
2274+
TestSubscriber<Integer> ts = TestSubscriber.create();
2275+
2276+
Single.<Integer>error(new RuntimeException())
2277+
.doOnUnsubscribe(new Action0() {
2278+
@Override
2279+
public void call() {
2280+
name.set(Thread.currentThread().getName());
2281+
cdl.countDown();
2282+
}
2283+
})
2284+
.subscribeOn(Schedulers.io())
2285+
.unsubscribeOn(Schedulers.computation())
2286+
.subscribe(ts);
2287+
2288+
cdl.await();
2289+
2290+
ts.awaitTerminalEvent();
2291+
ts.assertError(RuntimeException.class);
2292+
2293+
assertTrue(name.get().startsWith("RxComputation"));
2294+
}
22332295
}

0 commit comments

Comments
 (0)