Skip to content

Commit 47b513d

Browse files
authored
3.x: Fix replay() not resetting when the connection is disposed (ReactiveX#6921)
1 parent b34e5f6 commit 47b513d

File tree

8 files changed

+368
-39
lines changed

8 files changed

+368
-39
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public void connect(Consumer<? super Disposable> connection) {
179179
}
180180

181181
// create a new subscriber-to-source
182-
ReplaySubscriber<T> u = new ReplaySubscriber<>(buf);
182+
ReplaySubscriber<T> u = new ReplaySubscriber<>(buf, current);
183183
// try setting it as the current subscriber-to-source
184184
if (!current.compareAndSet(ps, u)) {
185185
// did not work, perhaps a new subscriber arrived
@@ -249,9 +249,13 @@ static final class ReplaySubscriber<T>
249249
/** Tracks the amount already requested from the upstream. */
250250
long requestedFromUpstream;
251251

252+
/** The current connection. */
253+
final AtomicReference<ReplaySubscriber<T>> current;
254+
252255
@SuppressWarnings("unchecked")
253-
ReplaySubscriber(ReplayBuffer<T> buffer) {
256+
ReplaySubscriber(ReplayBuffer<T> buffer, AtomicReference<ReplaySubscriber<T>> current) {
254257
this.buffer = buffer;
258+
this.current = current;
255259
this.management = new AtomicInteger();
256260
this.subscribers = new AtomicReference<>(EMPTY);
257261
this.shouldConnect = new AtomicBoolean();
@@ -266,9 +270,7 @@ public boolean isDisposed() {
266270
@Override
267271
public void dispose() {
268272
subscribers.set(TERMINATED);
269-
// unlike OperatorPublish, we can't null out the terminated so
270-
// late subscribers can still get replay
271-
// current.compareAndSet(ReplaySubscriber.this, null);
273+
current.compareAndSet(ReplaySubscriber.this, null);
272274
// we don't care if it fails because it means the current has
273275
// been replaced in the meantime
274276
SubscriptionHelper.cancel(this);
@@ -1198,7 +1200,7 @@ public void subscribe(Subscriber<? super T> child) {
11981200
return;
11991201
}
12001202
// create a new subscriber to source
1201-
ReplaySubscriber<T> u = new ReplaySubscriber<>(buf);
1203+
ReplaySubscriber<T> u = new ReplaySubscriber<>(buf, curr);
12021204
// let's try setting it as the current subscriber-to-source
12031205
if (!curr.compareAndSet(null, u)) {
12041206
// didn't work, maybe someone else did it or the current subscriber

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public void connect(Consumer<? super Disposable> connection) {
174174
// create a new subscriber-to-source
175175
ReplayBuffer<T> buf = bufferFactory.call();
176176

177-
ReplayObserver<T> u = new ReplayObserver<>(buf);
177+
ReplayObserver<T> u = new ReplayObserver<>(buf, current);
178178
// try setting it as the current subscriber-to-source
179179
if (!current.compareAndSet(ps, u)) {
180180
// did not work, perhaps a new subscriber arrived
@@ -240,8 +240,12 @@ static final class ReplayObserver<T>
240240
*/
241241
final AtomicBoolean shouldConnect;
242242

243-
ReplayObserver(ReplayBuffer<T> buffer) {
243+
/** The current connection. */
244+
final AtomicReference<ReplayObserver<T>> current;
245+
246+
ReplayObserver(ReplayBuffer<T> buffer, AtomicReference<ReplayObserver<T>> current) {
244247
this.buffer = buffer;
248+
this.current = current;
245249

246250
this.observers = new AtomicReference<>(EMPTY);
247251
this.shouldConnect = new AtomicBoolean();
@@ -255,9 +259,7 @@ public boolean isDisposed() {
255259
@Override
256260
public void dispose() {
257261
observers.set(TERMINATED);
258-
// unlike OperatorPublish, we can't null out the terminated so
259-
// late observers can still get replay
260-
// current.compareAndSet(ReplayObserver.this, null);
262+
current.compareAndSet(ReplayObserver.this, null);
261263
// we don't care if it fails because it means the current has
262264
// been replaced in the meantime
263265
DisposableHelper.dispose(this);
@@ -1004,7 +1006,7 @@ public void subscribe(Observer<? super T> child) {
10041006
// create a new subscriber to source
10051007
ReplayBuffer<T> buf = bufferFactory.call();
10061008

1007-
ReplayObserver<T> u = new ReplayObserver<>(buf);
1009+
ReplayObserver<T> u = new ReplayObserver<>(buf, curr);
10081010
// let's try setting it as the current subscriber-to-source
10091011
if (!curr.compareAndSet(null, u)) {
10101012
// didn't work, maybe someone else did it or the current subscriber

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1776,4 +1776,31 @@ public void onError(Throwable t) {
17761776

17771777
ts1.assertEmpty();
17781778
}
1779+
1780+
@Test
1781+
public void disposeNoNeedForReset() {
1782+
PublishProcessor<Integer> pp = PublishProcessor.create();
1783+
1784+
ConnectableFlowable<Integer> cf = pp.publish();
1785+
1786+
TestSubscriber<Integer> ts = cf.test();
1787+
1788+
Disposable d = cf.connect();
1789+
1790+
pp.onNext(1);
1791+
1792+
d.dispose();
1793+
1794+
ts = cf.test();
1795+
1796+
ts.assertEmpty();
1797+
1798+
cf.connect();
1799+
1800+
ts.assertEmpty();
1801+
1802+
pp.onNext(2);
1803+
1804+
ts.assertValuesOnly(2);
1805+
}
17791806
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayEagerTruncateTest.java

Lines changed: 81 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1923,19 +1923,6 @@ public ReplayBuffer<Integer> get() throws Exception {
19231923
.assertFailure(TestException.class);
19241924
}
19251925

1926-
@Test
1927-
public void currentDisposedWhenConnecting() {
1928-
FlowableReplay<Integer> fr = (FlowableReplay<Integer>)FlowableReplay.create(Flowable.<Integer>never(), 16, true);
1929-
fr.connect();
1930-
1931-
fr.current.get().dispose();
1932-
assertTrue(fr.current.get().isDisposed());
1933-
1934-
fr.connect();
1935-
1936-
assertFalse(fr.current.get().isDisposed());
1937-
}
1938-
19391926
@Test
19401927
public void noBoundedRetentionViaThreadLocal() throws Exception {
19411928
Flowable<byte[]> source = Flowable.range(1, 200)
@@ -2275,4 +2262,85 @@ public void timeAndSizeNoTerminalTruncationOnTimechange() {
22752262
.assertComplete()
22762263
.assertNoErrors();
22772264
}
2265+
2266+
@Test
2267+
public void disposeNoNeedForResetSizeBound() {
2268+
PublishProcessor<Integer> pp = PublishProcessor.create();
2269+
2270+
ConnectableFlowable<Integer> cf = pp.replay(10, true);
2271+
2272+
TestSubscriber<Integer> ts = cf.test();
2273+
2274+
Disposable d = cf.connect();
2275+
2276+
pp.onNext(1);
2277+
2278+
d.dispose();
2279+
2280+
ts = cf.test();
2281+
2282+
ts.assertEmpty();
2283+
2284+
cf.connect();
2285+
2286+
ts.assertEmpty();
2287+
2288+
pp.onNext(2);
2289+
2290+
ts.assertValuesOnly(2);
2291+
}
2292+
2293+
@Test
2294+
public void disposeNoNeedForResetTimeBound() {
2295+
PublishProcessor<Integer> pp = PublishProcessor.create();
2296+
2297+
ConnectableFlowable<Integer> cf = pp.replay(10, TimeUnit.MINUTES, Schedulers.single(), true);
2298+
2299+
TestSubscriber<Integer> ts = cf.test();
2300+
2301+
Disposable d = cf.connect();
2302+
2303+
pp.onNext(1);
2304+
2305+
d.dispose();
2306+
2307+
ts = cf.test();
2308+
2309+
ts.assertEmpty();
2310+
2311+
cf.connect();
2312+
2313+
ts.assertEmpty();
2314+
2315+
pp.onNext(2);
2316+
2317+
ts.assertValuesOnly(2);
2318+
}
2319+
2320+
@Test
2321+
public void disposeNoNeedForResetTimeAndSIzeBound() {
2322+
PublishProcessor<Integer> pp = PublishProcessor.create();
2323+
2324+
ConnectableFlowable<Integer> cf = pp.replay(10, 10, TimeUnit.MINUTES, Schedulers.single(), true);
2325+
2326+
TestSubscriber<Integer> ts = cf.test();
2327+
2328+
Disposable d = cf.connect();
2329+
2330+
pp.onNext(1);
2331+
2332+
d.dispose();
2333+
2334+
ts = cf.test();
2335+
2336+
ts.assertEmpty();
2337+
2338+
cf.connect();
2339+
2340+
ts.assertEmpty();
2341+
2342+
pp.onNext(2);
2343+
2344+
ts.assertValuesOnly(2);
2345+
}
22782346
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayTest.java

Lines changed: 108 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1988,19 +1988,6 @@ public ReplayBuffer<Integer> get() throws Exception {
19881988
.assertFailure(TestException.class);
19891989
}
19901990

1991-
@Test
1992-
public void currentDisposedWhenConnecting() {
1993-
FlowableReplay<Integer> fr = (FlowableReplay<Integer>)FlowableReplay.create(Flowable.<Integer>never(), 16, false);
1994-
fr.connect();
1995-
1996-
fr.current.get().dispose();
1997-
assertTrue(fr.current.get().isDisposed());
1998-
1999-
fr.connect();
2000-
2001-
assertFalse(fr.current.get().isDisposed());
2002-
}
2003-
20041991
@Test
20051992
public void noBoundedRetentionViaThreadLocal() throws Exception {
20061993
Flowable<byte[]> source = Flowable.range(1, 200)
@@ -2191,4 +2178,112 @@ public void cancel() {
21912178

21922179
ts.assertResult();
21932180
}
2181+
2182+
@Test
2183+
public void disposeNoNeedForReset() {
2184+
PublishProcessor<Integer> pp = PublishProcessor.create();
2185+
2186+
ConnectableFlowable<Integer> cf = pp.replay();
2187+
2188+
TestSubscriber<Integer> ts = cf.test();
2189+
2190+
Disposable d = cf.connect();
2191+
2192+
pp.onNext(1);
2193+
2194+
d.dispose();
2195+
2196+
ts = cf.test();
2197+
2198+
ts.assertEmpty();
2199+
2200+
cf.connect();
2201+
2202+
ts.assertEmpty();
2203+
2204+
pp.onNext(2);
2205+
2206+
ts.assertValuesOnly(2);
2207+
}
2208+
2209+
@Test
2210+
public void disposeNoNeedForResetSizeBound() {
2211+
PublishProcessor<Integer> pp = PublishProcessor.create();
2212+
2213+
ConnectableFlowable<Integer> cf = pp.replay(10);
2214+
2215+
TestSubscriber<Integer> ts = cf.test();
2216+
2217+
Disposable d = cf.connect();
2218+
2219+
pp.onNext(1);
2220+
2221+
d.dispose();
2222+
2223+
ts = cf.test();
2224+
2225+
ts.assertEmpty();
2226+
2227+
cf.connect();
2228+
2229+
ts.assertEmpty();
2230+
2231+
pp.onNext(2);
2232+
2233+
ts.assertValuesOnly(2);
2234+
}
2235+
2236+
@Test
2237+
public void disposeNoNeedForResetTimeBound() {
2238+
PublishProcessor<Integer> pp = PublishProcessor.create();
2239+
2240+
ConnectableFlowable<Integer> cf = pp.replay(10, TimeUnit.MINUTES);
2241+
2242+
TestSubscriber<Integer> ts = cf.test();
2243+
2244+
Disposable d = cf.connect();
2245+
2246+
pp.onNext(1);
2247+
2248+
d.dispose();
2249+
2250+
ts = cf.test();
2251+
2252+
ts.assertEmpty();
2253+
2254+
cf.connect();
2255+
2256+
ts.assertEmpty();
2257+
2258+
pp.onNext(2);
2259+
2260+
ts.assertValuesOnly(2);
2261+
}
2262+
2263+
@Test
2264+
public void disposeNoNeedForResetTimeAndSIzeBound() {
2265+
PublishProcessor<Integer> pp = PublishProcessor.create();
2266+
2267+
ConnectableFlowable<Integer> cf = pp.replay(10, 10, TimeUnit.MINUTES);
2268+
2269+
TestSubscriber<Integer> ts = cf.test();
2270+
2271+
Disposable d = cf.connect();
2272+
2273+
pp.onNext(1);
2274+
2275+
d.dispose();
2276+
2277+
ts = cf.test();
2278+
2279+
ts.assertEmpty();
2280+
2281+
cf.connect();
2282+
2283+
ts.assertEmpty();
2284+
2285+
pp.onNext(2);
2286+
2287+
ts.assertValuesOnly(2);
2288+
}
21942289
}

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservablePublishTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,4 +866,31 @@ public void disposeResets() {
866866

867867
to.assertValuesOnly(1);
868868
}
869+
870+
@Test
871+
public void disposeNoNeedForReset() {
872+
PublishSubject<Integer> ps = PublishSubject.create();
873+
874+
ConnectableObservable<Integer> co = ps.publish();
875+
876+
TestObserver<Integer> to = co.test();
877+
878+
Disposable d = co.connect();
879+
880+
ps.onNext(1);
881+
882+
d.dispose();
883+
884+
to = co.test();
885+
886+
to.assertEmpty();
887+
888+
co.connect();
889+
890+
to.assertEmpty();
891+
892+
ps.onNext(2);
893+
894+
to.assertValuesOnly(2);
895+
}
869896
}

0 commit comments

Comments
 (0)