Skip to content

Commit 522ce79

Browse files
committed
Fix the issue that Sample doesn't call 'unsubscribe'
1 parent dd73c15 commit 522ce79

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

src/main/java/rx/internal/operators/OperatorSampleWithTime.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
5050
child.add(worker);
5151

5252
SamplerSubscriber<T> sampler = new SamplerSubscriber<T>(s);
53+
child.add(sampler);
5354
worker.schedulePeriodically(sampler, time, time, unit);
5455

5556
return sampler;

src/test/java/rx/internal/operators/OperatorSampleTest.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,12 @@
2828
import org.junit.Test;
2929
import org.mockito.InOrder;
3030

31-
import rx.Observable;
31+
import rx.*;
3232
import rx.Observable.OnSubscribe;
33-
import rx.Observer;
34-
import rx.Scheduler;
35-
import rx.Subscriber;
3633
import rx.functions.Action0;
3734
import rx.schedulers.TestScheduler;
3835
import rx.subjects.PublishSubject;
36+
import rx.subscriptions.Subscriptions;
3937

4038
public class OperatorSampleTest {
4139
private TestScheduler scheduler;
@@ -271,4 +269,19 @@ public void sampleWithSamplerThrows() {
271269
inOrder.verify(observer2, times(1)).onError(any(RuntimeException.class));
272270
verify(observer, never()).onCompleted();
273271
}
272+
273+
@Test
274+
public void testSampleUnsubscribe() {
275+
final Subscription s = mock(Subscription.class);
276+
Observable<Integer> o = Observable.create(
277+
new OnSubscribe<Integer>() {
278+
@Override
279+
public void call(Subscriber<? super Integer> subscriber) {
280+
subscriber.add(s);
281+
}
282+
}
283+
);
284+
o.throttleLast(1, TimeUnit.MILLISECONDS).subscribe().unsubscribe();
285+
verify(s).unsubscribe();
286+
}
274287
}

0 commit comments

Comments
 (0)