Skip to content

Commit 4a32963

Browse files
jbarr21akarnokd
authored andcommitted
2.x: fix Observable.zip to dispose eagerly (ReactiveX#5121)
1 parent 9a342fd commit 4a32963

File tree

2 files changed

+54
-8
lines changed

2 files changed

+54
-8
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
115115
public void dispose() {
116116
if (!cancelled) {
117117
cancelled = true;
118+
cancelSources();
118119
if (getAndIncrement() == 0) {
119120
clear();
120121
}
@@ -126,9 +127,19 @@ public boolean isDisposed() {
126127
return cancelled;
127128
}
128129

129-
void clear() {
130+
void cancel() {
131+
clear();
132+
cancelSources();
133+
}
134+
135+
void cancelSources() {
130136
for (ZipObserver<?, ?> zs : observers) {
131137
zs.dispose();
138+
}
139+
}
140+
141+
void clear() {
142+
for (ZipObserver<?, ?> zs : observers) {
132143
zs.queue.clear();
133144
}
134145
}
@@ -168,7 +179,7 @@ public void drain() {
168179
if (z.done && !delayError) {
169180
Throwable ex = z.error;
170181
if (ex != null) {
171-
clear();
182+
cancel();
172183
a.onError(ex);
173184
return;
174185
}
@@ -186,7 +197,7 @@ public void drain() {
186197
v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
187198
} catch (Throwable ex) {
188199
Exceptions.throwIfFatal(ex);
189-
clear();
200+
cancel();
190201
a.onError(ex);
191202
return;
192203
}
@@ -205,15 +216,15 @@ public void drain() {
205216

206217
boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean delayError, ZipObserver<?, ?> source) {
207218
if (cancelled) {
208-
clear();
219+
cancel();
209220
return true;
210221
}
211222

212223
if (d) {
213224
if (delayError) {
214225
if (empty) {
215226
Throwable e = source.error;
216-
clear();
227+
cancel();
217228
if (e != null) {
218229
a.onError(e);
219230
} else {
@@ -224,12 +235,12 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean
224235
} else {
225236
Throwable e = source.error;
226237
if (e != null) {
227-
clear();
238+
cancel();
228239
a.onError(e);
229240
return true;
230241
} else
231242
if (empty) {
232-
clear();
243+
cancel();
233244
a.onComplete();
234245
return true;
235246
}

src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1392,4 +1392,39 @@ public List<Object> apply(Object t1, Object t2) throws Exception {
13921392
assertTrue(list.toString(), list.contains("RxSi"));
13931393
assertTrue(list.toString(), list.contains("RxCo"));
13941394
}
1395-
}}
1395+
}
1396+
1397+
@Test
1398+
public void eagerDispose() {
1399+
final PublishSubject<Integer> ps1 = PublishSubject.create();
1400+
final PublishSubject<Integer> ps2 = PublishSubject.create();
1401+
1402+
TestObserver<Integer> ts = new TestObserver<Integer>() {
1403+
@Override
1404+
public void onNext(Integer t) {
1405+
super.onNext(t);
1406+
cancel();
1407+
if (ps1.hasObservers()) {
1408+
onError(new IllegalStateException("ps1 not disposed"));
1409+
} else
1410+
if (ps2.hasObservers()) {
1411+
onError(new IllegalStateException("ps2 not disposed"));
1412+
} else {
1413+
onComplete();
1414+
}
1415+
}
1416+
};
1417+
1418+
Observable.zip(ps1, ps2, new BiFunction<Integer, Integer, Integer>() {
1419+
@Override
1420+
public Integer apply(Integer t1, Integer t2) throws Exception {
1421+
return t1 + t2;
1422+
}
1423+
})
1424+
.subscribe(ts);
1425+
1426+
ps1.onNext(1);
1427+
ps2.onNext(2);
1428+
ts.assertResult(3);
1429+
}
1430+
}

0 commit comments

Comments
 (0)