Skip to content

Commit a5ca8c7

Browse files
author
Brian Attwell
committed
BoundedReplayBuffer should not leak memory.
Problem: When `BoundedReplayBuffer` evicts an item to make room for a new item it still contains a strong reference to the evicted reference. This diff adds a new unit test to verify this behavior. Given this unit test needs to look under the hood of `BoundedReplayBuffer` in order to ensure no memory leaks exist, writing this test requires a new package private method. This diff fixes the behavior.
1 parent 85e0ea5 commit a5ca8c7

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,13 @@ final void removeFirst() {
616616
size--;
617617
// can't just move the head because it would retain the very first value
618618
// can't null out the head's value because of late replayers would see null
619-
setFirst(next);
619+
Node newHead = new Node(null);
620+
Node newNext = next.get();
621+
newHead.set(newNext);
622+
setFirst(newHead);
623+
if (newNext == null) {
624+
tail = newHead;
625+
}
620626
}
621627
/* test */ final void removeSome(int n) {
622628
Node head = get();
@@ -751,6 +757,26 @@ void truncateFinal() {
751757
}
752758
}
753759
}
760+
/* test */ final void collectObjectsInMemory(Collection<? super T> output) {
761+
Node next = getHead();
762+
for (;;) {
763+
if (next != null) {
764+
Object o = next.value;
765+
Object v = leaveTransform(o);
766+
if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) {
767+
break;
768+
}
769+
T value = NotificationLite.<T>getValue(v);
770+
if (value != null) {
771+
output.add(value);
772+
}
773+
next = next;
774+
} else {
775+
break;
776+
}
777+
next = next.get();
778+
}
779+
}
754780
/* test */ boolean hasError() {
755781
return tail.value != null && NotificationLite.isError(leaveTransform(tail.value));
756782
}

src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,43 @@ void truncate() {
748748
buf.collect(values);
749749

750750
Assert.assertEquals(Arrays.asList(5, 6), values);
751+
}
752+
753+
@Test
754+
public void testBoundedReplayBufferForMemoryLeaks() {
755+
BoundedReplayBuffer<Integer> buf = new BoundedReplayBuffer<Integer>() {
756+
private static final long serialVersionUID = -5182053207244406872L;
757+
758+
@Override
759+
void truncate() {
760+
}
761+
};
762+
buf.addLast(new Node(1));
763+
buf.addLast(new Node(2));
764+
buf.addLast(new Node(3));
765+
buf.addLast(new Node(4));
766+
buf.addLast(new Node(5));
767+
768+
List<Integer> valuesInMemory = new ArrayList<Integer>();
769+
buf.collectObjectsInMemory(valuesInMemory);
770+
771+
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), valuesInMemory);
772+
773+
buf.removeFirst();
774+
buf.removeFirst();
775+
buf.removeFirst();
776+
buf.removeFirst();
777+
buf.removeFirst();
778+
779+
valuesInMemory.clear();
780+
buf.collectObjectsInMemory(valuesInMemory);
781+
Assert.assertTrue(valuesInMemory.isEmpty());
782+
783+
buf.addLast(new Node(5));
784+
buf.addLast(new Node(6));
785+
buf.collectObjectsInMemory(valuesInMemory);
751786

787+
Assert.assertEquals(Arrays.asList(5, 6), valuesInMemory);
752788
}
753789

754790
@Test

0 commit comments

Comments
 (0)