Skip to content

Commit d7d0a33

Browse files
pjastrzakarnokd
authored andcommitted
2.x: Fix Completable.andThen(Completable) not running on observeOn scheduler. (ReactiveX#6362)
* 2.x: Fix Completable.andThen(Completable) not running on scheduler defined with observeOn. * 2.x: Fix Completable.andThen(Completable) not running on scheduler defined with observeOn. * 2.x: Fix Completable.andThen(Completable) not running on scheduler defined with observeOn.
1 parent 090f99e commit d7d0a33

File tree

4 files changed

+297
-45
lines changed

4 files changed

+297
-45
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,7 +1171,8 @@ public final <T> Maybe<T> andThen(MaybeSource<T> next) {
11711171
@CheckReturnValue
11721172
@SchedulerSupport(SchedulerSupport.NONE)
11731173
public final Completable andThen(CompletableSource next) {
1174-
return concatWith(next);
1174+
ObjectHelper.requireNonNull(next, "next is null");
1175+
return RxJavaPlugins.onAssembly(new CompletableAndThenCompletable(this, next));
11751176
}
11761177

11771178
/**
@@ -1357,7 +1358,7 @@ public final Completable compose(CompletableTransformer transformer) {
13571358
@SchedulerSupport(SchedulerSupport.NONE)
13581359
public final Completable concatWith(CompletableSource other) {
13591360
ObjectHelper.requireNonNull(other, "other is null");
1360-
return concatArray(this, other);
1361+
return RxJavaPlugins.onAssembly(new CompletableAndThenCompletable(this, other));
13611362
}
13621363

13631364
/**
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.disposables.DisposableHelper;
21+
22+
public final class CompletableAndThenCompletable extends Completable {
23+
24+
final CompletableSource source;
25+
26+
final CompletableSource next;
27+
28+
public CompletableAndThenCompletable(CompletableSource source, CompletableSource next) {
29+
this.source = source;
30+
this.next = next;
31+
}
32+
33+
@Override
34+
protected void subscribeActual(CompletableObserver observer) {
35+
source.subscribe(new SourceObserver(observer, next));
36+
}
37+
38+
static final class SourceObserver
39+
extends AtomicReference<Disposable>
40+
implements CompletableObserver, Disposable {
41+
42+
private static final long serialVersionUID = -4101678820158072998L;
43+
44+
final CompletableObserver actualObserver;
45+
46+
final CompletableSource next;
47+
48+
SourceObserver(CompletableObserver actualObserver, CompletableSource next) {
49+
this.actualObserver = actualObserver;
50+
this.next = next;
51+
}
52+
53+
@Override
54+
public void onSubscribe(Disposable d) {
55+
if (DisposableHelper.setOnce(this, d)) {
56+
actualObserver.onSubscribe(this);
57+
}
58+
}
59+
60+
@Override
61+
public void onError(Throwable e) {
62+
actualObserver.onError(e);
63+
}
64+
65+
@Override
66+
public void onComplete() {
67+
next.subscribe(new NextObserver(this, actualObserver));
68+
}
69+
70+
@Override
71+
public void dispose() {
72+
DisposableHelper.dispose(this);
73+
}
74+
75+
@Override
76+
public boolean isDisposed() {
77+
return DisposableHelper.isDisposed(get());
78+
}
79+
}
80+
81+
static final class NextObserver implements CompletableObserver {
82+
83+
final AtomicReference<Disposable> parent;
84+
85+
final CompletableObserver downstream;
86+
87+
public NextObserver(AtomicReference<Disposable> parent, CompletableObserver downstream) {
88+
this.parent = parent;
89+
this.downstream = downstream;
90+
}
91+
92+
@Override
93+
public void onSubscribe(Disposable d) {
94+
DisposableHelper.replace(parent, d);
95+
}
96+
97+
@Override
98+
public void onComplete() {
99+
downstream.onComplete();
100+
}
101+
102+
@Override
103+
public void onError(Throwable e) {
104+
downstream.onError(e);
105+
}
106+
}
107+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import java.util.concurrent.CountDownLatch;
17+
import java.util.concurrent.atomic.AtomicInteger;
18+
19+
import org.junit.Test;
20+
21+
import io.reactivex.*;
22+
import io.reactivex.exceptions.TestException;
23+
import io.reactivex.functions.Action;
24+
import io.reactivex.observers.TestObserver;
25+
import io.reactivex.schedulers.Schedulers;
26+
27+
import static org.junit.Assert.*;
28+
29+
public class CompletableAndThenCompletableabTest {
30+
@Test(expected = NullPointerException.class)
31+
public void andThenCompletableCompleteNull() {
32+
Completable.complete()
33+
.andThen((Completable) null);
34+
}
35+
36+
@Test
37+
public void andThenCompletableCompleteComplete() {
38+
Completable.complete()
39+
.andThen(Completable.complete())
40+
.test()
41+
.assertComplete();
42+
}
43+
44+
@Test
45+
public void andThenCompletableCompleteError() {
46+
Completable.complete()
47+
.andThen(Completable.error(new TestException("test")))
48+
.test()
49+
.assertNotComplete()
50+
.assertNoValues()
51+
.assertError(TestException.class)
52+
.assertErrorMessage("test");
53+
}
54+
55+
@Test
56+
public void andThenCompletableCompleteNever() {
57+
Completable.complete()
58+
.andThen(Completable.never())
59+
.test()
60+
.assertNoValues()
61+
.assertNoErrors()
62+
.assertNotComplete();
63+
}
64+
65+
@Test
66+
public void andThenCompletableErrorComplete() {
67+
Completable.error(new TestException("bla"))
68+
.andThen(Completable.complete())
69+
.test()
70+
.assertNotComplete()
71+
.assertNoValues()
72+
.assertError(TestException.class)
73+
.assertErrorMessage("bla");
74+
}
75+
76+
@Test
77+
public void andThenCompletableErrorNever() {
78+
Completable.error(new TestException("bla"))
79+
.andThen(Completable.never())
80+
.test()
81+
.assertNotComplete()
82+
.assertNoValues()
83+
.assertError(TestException.class)
84+
.assertErrorMessage("bla");
85+
}
86+
87+
@Test
88+
public void andThenCompletableErrorError() {
89+
Completable.error(new TestException("error1"))
90+
.andThen(Completable.error(new TestException("error2")))
91+
.test()
92+
.assertNotComplete()
93+
.assertNoValues()
94+
.assertError(TestException.class)
95+
.assertErrorMessage("error1");
96+
}
97+
98+
@Test
99+
public void andThenCanceled() {
100+
final AtomicInteger completableRunCount = new AtomicInteger();
101+
Completable.fromRunnable(new Runnable() {
102+
@Override
103+
public void run() {
104+
completableRunCount.incrementAndGet();
105+
}
106+
})
107+
.andThen(Completable.complete())
108+
.test(true)
109+
.assertEmpty();
110+
assertEquals(1, completableRunCount.get());
111+
}
112+
113+
@Test
114+
public void andThenFirstCancels() {
115+
final TestObserver<Void> to = new TestObserver<Void>();
116+
Completable.fromRunnable(new Runnable() {
117+
@Override
118+
public void run() {
119+
to.cancel();
120+
}
121+
})
122+
.andThen(Completable.complete())
123+
.subscribe(to);
124+
to
125+
.assertNotComplete()
126+
.assertNoErrors();
127+
}
128+
129+
@Test
130+
public void andThenSecondCancels() {
131+
final TestObserver<Void> to = new TestObserver<Void>();
132+
Completable.complete()
133+
.andThen(Completable.fromRunnable(new Runnable() {
134+
@Override
135+
public void run() {
136+
to.cancel();
137+
}
138+
}))
139+
.subscribe(to);
140+
to
141+
.assertNotComplete()
142+
.assertNoErrors();
143+
}
144+
145+
@Test
146+
public void andThenDisposed() {
147+
TestHelper.checkDisposed(Completable.complete()
148+
.andThen(Completable.complete()));
149+
}
150+
151+
@Test
152+
public void andThenNoInterrupt() throws InterruptedException {
153+
for (int k = 0; k < 100; k++) {
154+
final int count = 10;
155+
final CountDownLatch latch = new CountDownLatch(count);
156+
final boolean[] interrupted = {false};
157+
158+
for (int i = 0; i < count; i++) {
159+
Completable.complete()
160+
.subscribeOn(Schedulers.io())
161+
.observeOn(Schedulers.io())
162+
.andThen(Completable.fromAction(new Action() {
163+
@Override
164+
public void run() throws Exception {
165+
try {
166+
Thread.sleep(30);
167+
} catch (InterruptedException e) {
168+
System.out.println("Interrupted! " + Thread.currentThread());
169+
interrupted[0] = true;
170+
}
171+
}
172+
}))
173+
.subscribe(new Action() {
174+
@Override
175+
public void run() throws Exception {
176+
latch.countDown();
177+
}
178+
});
179+
}
180+
181+
latch.await();
182+
assertFalse("The second Completable was interrupted!", interrupted[0]);
183+
}
184+
}
185+
}

src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,9 @@
1313

1414
package io.reactivex.internal.operators.completable;
1515

16-
import io.reactivex.Completable;
17-
import io.reactivex.Maybe;
18-
import io.reactivex.functions.Action;
19-
import io.reactivex.schedulers.Schedulers;
20-
21-
import java.util.concurrent.CountDownLatch;
22-
2316
import org.junit.Test;
24-
import static org.junit.Assert.*;
17+
18+
import io.reactivex.*;
2519

2620
public class CompletableAndThenTest {
2721
@Test(expected = NullPointerException.class)
@@ -69,39 +63,4 @@ public void andThenMaybeError() {
6963
.assertError(RuntimeException.class)
7064
.assertErrorMessage("bla");
7165
}
72-
73-
@Test
74-
public void andThenNoInterrupt() throws InterruptedException {
75-
for (int k = 0; k < 100; k++) {
76-
final int count = 10;
77-
final CountDownLatch latch = new CountDownLatch(count);
78-
final boolean[] interrupted = { false };
79-
80-
for (int i = 0; i < count; i++) {
81-
Completable.complete()
82-
.subscribeOn(Schedulers.io())
83-
.observeOn(Schedulers.io())
84-
.andThen(Completable.fromAction(new Action() {
85-
@Override
86-
public void run() throws Exception {
87-
try {
88-
Thread.sleep(30);
89-
} catch (InterruptedException e) {
90-
System.out.println("Interrupted! " + Thread.currentThread());
91-
interrupted[0] = true;
92-
}
93-
}
94-
}))
95-
.subscribe(new Action() {
96-
@Override
97-
public void run() throws Exception {
98-
latch.countDown();
99-
}
100-
});
101-
}
102-
103-
latch.await();
104-
assertFalse("The second Completable was interrupted!", interrupted[0]);
105-
}
106-
}
10766
}

0 commit comments

Comments
 (0)