Skip to content

Commit 0f1542d

Browse files
authored
1.x: replay().refCount() avoid leaking items between connections (ReactiveX#5181)
* 1.x: replay().refCount() avoid leaking items between connections * Improve coverage of changes.
1 parent 8811831 commit 0f1542d

File tree

3 files changed

+177
-1
lines changed

3 files changed

+177
-1
lines changed

src/main/java/rx/internal/operators/OnSubscribeRefCount.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,13 @@ void cleanup() {
129129
// and set the subscriptionCount to 0
130130
lock.lock();
131131
try {
132+
132133
if (baseSubscription == currentBase) {
134+
// backdoor into the ConnectableObservable to cleanup and reset its state
135+
if (source instanceof Subscription) {
136+
((Subscription)source).unsubscribe();
137+
}
138+
133139
baseSubscription.unsubscribe();
134140
baseSubscription = new CompositeSubscription();
135141
subscriptionCount.set(0);
@@ -148,7 +154,13 @@ public void call() {
148154
lock.lock();
149155
try {
150156
if (baseSubscription == current) {
157+
151158
if (subscriptionCount.decrementAndGet() == 0) {
159+
// backdoor into the ConnectableObservable to cleanup and reset its state
160+
if (source instanceof Subscription) {
161+
((Subscription)source).unsubscribe();
162+
}
163+
152164
baseSubscription.unsubscribe();
153165
// need a new baseSubscription because once
154166
// unsubscribed stays that way

src/main/java/rx/internal/operators/OperatorReplay.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import rx.schedulers.Timestamped;
2929
import rx.subscriptions.Subscriptions;
3030

31-
public final class OperatorReplay<T> extends ConnectableObservable<T> {
31+
public final class OperatorReplay<T> extends ConnectableObservable<T> implements Subscription {
3232
/** The source observable. */
3333
final Observable<? extends T> source;
3434
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
@@ -254,6 +254,17 @@ private OperatorReplay(OnSubscribe<T> onSubscribe, Observable<? extends T> sourc
254254
this.bufferFactory = bufferFactory;
255255
}
256256

257+
@Override
258+
public void unsubscribe() {
259+
current.lazySet(null);
260+
}
261+
262+
@Override
263+
public boolean isUnsubscribed() {
264+
ReplaySubscriber<T> ps = current.get();
265+
return ps == null || ps.isUnsubscribed();
266+
}
267+
257268
@Override
258269
public void connect(Action1<? super Subscription> connection) {
259270
boolean doConnect;

src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Matchers.any;
2020
import static org.mockito.Mockito.*;
2121

22+
import java.lang.management.ManagementFactory;
2223
import java.util.*;
2324
import java.util.concurrent.*;
2425
import java.util.concurrent.atomic.*;
@@ -31,6 +32,7 @@
3132
import rx.Observable.OnSubscribe;
3233
import rx.Observer;
3334
import rx.functions.*;
35+
import rx.observables.ConnectableObservable;
3436
import rx.observers.*;
3537
import rx.schedulers.*;
3638
import rx.subjects.ReplaySubject;
@@ -611,4 +613,155 @@ public void call(Throwable t) {
611613
assertNotNull("First subscriber didn't get the error", err1);
612614
assertNotNull("Second subscriber didn't get the error", err2);
613615
}
616+
617+
Observable<Object> source;
618+
619+
@Test
620+
public void replayNoLeak() throws Exception {
621+
System.gc();
622+
Thread.sleep(100);
623+
624+
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
625+
626+
source = Observable.fromCallable(new Callable<Object>() {
627+
@Override
628+
public Object call() throws Exception {
629+
return new byte[100 * 1000 * 1000];
630+
}
631+
})
632+
.replay(1)
633+
.refCount();
634+
635+
source.subscribe();
636+
637+
System.gc();
638+
Thread.sleep(100);
639+
640+
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
641+
642+
source = null;
643+
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
644+
}
645+
646+
@Test
647+
public void replayNoLeak2() throws Exception {
648+
System.gc();
649+
Thread.sleep(100);
650+
651+
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
652+
653+
source = Observable.fromCallable(new Callable<Object>() {
654+
@Override
655+
public Object call() throws Exception {
656+
return new byte[100 * 1000 * 1000];
657+
}
658+
}).concatWith(Observable.never())
659+
.replay(1)
660+
.refCount();
661+
662+
Subscription s1 = source.subscribe();
663+
Subscription s2 = source.subscribe();
664+
665+
s1.unsubscribe();
666+
s2.unsubscribe();
667+
668+
s1 = null;
669+
s2 = null;
670+
671+
System.gc();
672+
Thread.sleep(100);
673+
674+
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
675+
676+
source = null;
677+
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
678+
}
679+
680+
static final class ExceptionData extends Exception {
681+
private static final long serialVersionUID = -6763898015338136119L;
682+
683+
public final Object data;
684+
685+
public ExceptionData(Object data) {
686+
this.data = data;
687+
}
688+
}
689+
690+
@Test
691+
public void publishNoLeak() throws Exception {
692+
System.gc();
693+
Thread.sleep(100);
694+
695+
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
696+
697+
source = Observable.fromCallable(new Callable<Object>() {
698+
@Override
699+
public Object call() throws Exception {
700+
throw new ExceptionData(new byte[100 * 1000 * 1000]);
701+
}
702+
})
703+
.publish()
704+
.refCount();
705+
706+
Action1<Throwable> err = Actions.empty();
707+
source.subscribe(Actions.empty(), err);
708+
709+
System.gc();
710+
Thread.sleep(100);
711+
712+
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
713+
714+
source = null;
715+
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
716+
}
717+
718+
@Test
719+
public void publishNoLeak2() throws Exception {
720+
System.gc();
721+
Thread.sleep(100);
722+
723+
long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
724+
725+
source = Observable.fromCallable(new Callable<Object>() {
726+
@Override
727+
public Object call() throws Exception {
728+
return new byte[100 * 1000 * 1000];
729+
}
730+
}).concatWith(Observable.never())
731+
.publish()
732+
.refCount();
733+
734+
Subscription s1 = source.test(0);
735+
Subscription s2 = source.test(0);
736+
737+
s1.unsubscribe();
738+
s2.unsubscribe();
739+
740+
s1 = null;
741+
s2 = null;
742+
743+
System.gc();
744+
Thread.sleep(100);
745+
746+
long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
747+
748+
source = null;
749+
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
750+
}
751+
752+
@Test
753+
public void replayIsUnsubscribed() {
754+
ConnectableObservable<Integer> co = Observable.just(1)
755+
.replay();
756+
757+
assertTrue(((Subscription)co).isUnsubscribed());
758+
759+
Subscription s = co.connect();
760+
761+
assertFalse(((Subscription)co).isUnsubscribed());
762+
763+
s.unsubscribe();
764+
765+
assertTrue(((Subscription)co).isUnsubscribed());
766+
}
614767
}

0 commit comments

Comments
 (0)