Skip to content

Commit e2b1d2f

Browse files
sobersantaderar
andauthored
Removed unnecessary upstream.cancel() call for casually finished upstream sequences. (ReactiveX#6992)
* no upstream.cancel() in FlowablePublishMulticast when the sequence is finished normally via onComplete/onError from upstream; minor code cleanup - unnecessary Disposable implementation to avoid method name clash * cleanup in FlowablePublishFunctionTest: refactored anonymous classes to lambdas * reduced visibility for dispose() and isDisposed() in the inner MulticastProcessor Co-authored-by: derar <[email protected]>
1 parent ca222c2 commit e2b1d2f

File tree

2 files changed

+93
-182
lines changed

2 files changed

+93
-182
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.reactivestreams.*;
2323

2424
import io.reactivex.rxjava3.core.*;
25-
import io.reactivex.rxjava3.disposables.Disposable;
2625
import io.reactivex.rxjava3.exceptions.*;
2726
import io.reactivex.rxjava3.functions.Function;
2827
import io.reactivex.rxjava3.internal.fuseable.*;
@@ -124,7 +123,7 @@ public void cancel() {
124123
}
125124
}
126125

127-
static final class MulticastProcessor<T> extends Flowable<T> implements FlowableSubscriber<T>, Disposable {
126+
static final class MulticastProcessor<T> extends Flowable<T> implements FlowableSubscriber<T> {
128127

129128
@SuppressWarnings("rawtypes")
130129
static final MulticastSubscription[] EMPTY = new MulticastSubscription[0];
@@ -192,19 +191,19 @@ public void onSubscribe(Subscription s) {
192191
}
193192
}
194193

195-
@Override
196-
public void dispose() {
197-
SubscriptionHelper.cancel(upstream);
198-
if (wip.getAndIncrement() == 0) {
199-
SimpleQueue<T> q = queue;
200-
if (q != null) {
201-
q.clear();
194+
void dispose() {
195+
if (!done) {
196+
SubscriptionHelper.cancel(upstream);
197+
if (wip.getAndIncrement() == 0) {
198+
SimpleQueue<T> q = queue;
199+
if (q != null) {
200+
q.clear();
201+
}
202202
}
203203
}
204204
}
205205

206-
@Override
207-
public boolean isDisposed() {
206+
boolean isDisposed() {
208207
return upstream.get() == SubscriptionHelper.CANCELLED;
209208
}
210209

0 commit comments

Comments
 (0)