Skip to content

Commit 2b60da4

Browse files
author
Brian Attwell
committed
Prevent some additional memory leaks
I took a look at merging the `collect()` and `collectValuesInMemory()` methods into a single method at the prompting of akarnokd. This made me realize the following: 1) There are more memory leaks. I updated the setNext() method to fix some of these. 2) We can't avoid all these memory leaks without introducing schduling because of the TimedBuffer. I don't believe engineers should have any expectation that the TimedBuffer values will be eagerly cleaned up after they timeout. So I left the seperate `collect()` and `collectValuesInMemory()` methods. This allows us to test memory for memory leaks only when we think it is appropriate.
1 parent a5ca8c7 commit 2b60da4

File tree

2 files changed

+43
-79
lines changed

2 files changed

+43
-79
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -616,31 +616,22 @@ final void removeFirst() {
616616
size--;
617617
// can't just move the head because it would retain the very first value
618618
// can't null out the head's value because of late replayers would see null
619+
setFirst(next);
620+
621+
}
622+
/**
623+
* Arranges the given node is the new head from now on.
624+
* @param next the Node instance to set as first
625+
*/
626+
final void setFirst(Node next) {
619627
Node newHead = new Node(null);
620628
Node newNext = next.get();
621629
newHead.set(newNext);
622-
setFirst(newHead);
630+
set(newHead);
623631
if (newNext == null) {
624632
tail = newHead;
625633
}
626634
}
627-
/* test */ final void removeSome(int n) {
628-
Node head = get();
629-
while (n > 0) {
630-
head = head.get();
631-
n--;
632-
size--;
633-
}
634-
635-
setFirst(head);
636-
}
637-
/**
638-
* Arranges the given node is the new head from now on.
639-
* @param n the Node instance to set as first
640-
*/
641-
final void setFirst(Node n) {
642-
set(n);
643-
}
644635

645636
@Override
646637
public final void next(T value) {
@@ -740,41 +731,45 @@ Object leaveTransform(Object value) {
740731
void truncateFinal() {
741732

742733
}
743-
/* test */ final void collect(Collection<? super T> output) {
744-
Node n = getHead();
734+
/**
735+
* Collect all values in memory even if they will be ignored by subsequent calls to replay(). This is useful
736+
* for tracking memory leaks.
737+
* @param output all values.
738+
*/
739+
/* test */ final void collectValuesInMemory(Collection<? super T> output) {
740+
Node next = getHead();
745741
for (;;) {
746-
Node next = n.get();
747742
if (next != null) {
748743
Object o = next.value;
749744
Object v = leaveTransform(o);
750745
if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) {
751746
break;
752747
}
753-
output.add(NotificationLite.<T>getValue(v));
754-
n = next;
748+
T value = NotificationLite.<T>getValue(v);
749+
if (value != null) {
750+
output.add(value);
751+
}
755752
} else {
756753
break;
757754
}
755+
next = next.get();
758756
}
759757
}
760-
/* test */ final void collectObjectsInMemory(Collection<? super T> output) {
761-
Node next = getHead();
758+
/* test */ final void collect(Collection<? super T> output) {
759+
Node n = getHead();
762760
for (;;) {
761+
Node next = n.get();
763762
if (next != null) {
764763
Object o = next.value;
765764
Object v = leaveTransform(o);
766765
if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) {
767766
break;
768767
}
769-
T value = NotificationLite.<T>getValue(v);
770-
if (value != null) {
771-
output.add(value);
772-
}
773-
next = next;
768+
output.add(NotificationLite.<T>getValue(v));
769+
n = next;
774770
} else {
775771
break;
776772
}
777-
next = next.get();
778773
}
779774
}
780775
/* test */ boolean hasError() {

src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java

Lines changed: 17 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -731,60 +731,24 @@ void truncate() {
731731
buf.addLast(new Node(5));
732732

733733
List<Integer> values = new ArrayList<Integer>();
734-
buf.collect(values);
734+
collectAndAssertNoLeaks(buf, values);
735735

736736
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), values);
737737

738-
buf.removeSome(2);
739-
buf.removeFirst();
740-
buf.removeSome(2);
741-
742-
values.clear();
743-
buf.collect(values);
744-
Assert.assertTrue(values.isEmpty());
745-
746-
buf.addLast(new Node(5));
747-
buf.addLast(new Node(6));
748-
buf.collect(values);
749-
750-
Assert.assertEquals(Arrays.asList(5, 6), values);
751-
}
752-
753-
@Test
754-
public void testBoundedReplayBufferForMemoryLeaks() {
755-
BoundedReplayBuffer<Integer> buf = new BoundedReplayBuffer<Integer>() {
756-
private static final long serialVersionUID = -5182053207244406872L;
757-
758-
@Override
759-
void truncate() {
760-
}
761-
};
762-
buf.addLast(new Node(1));
763-
buf.addLast(new Node(2));
764-
buf.addLast(new Node(3));
765-
buf.addLast(new Node(4));
766-
buf.addLast(new Node(5));
767-
768-
List<Integer> valuesInMemory = new ArrayList<Integer>();
769-
buf.collectObjectsInMemory(valuesInMemory);
770-
771-
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), valuesInMemory);
772-
773738
buf.removeFirst();
774739
buf.removeFirst();
775740
buf.removeFirst();
776741
buf.removeFirst();
777742
buf.removeFirst();
778743

779-
valuesInMemory.clear();
780-
buf.collectObjectsInMemory(valuesInMemory);
781-
Assert.assertTrue(valuesInMemory.isEmpty());
744+
collectAndAssertNoLeaks(buf, values);
745+
Assert.assertTrue(values.isEmpty());
782746

783747
buf.addLast(new Node(5));
784748
buf.addLast(new Node(6));
785-
buf.collectObjectsInMemory(valuesInMemory);
749+
collectAndAssertNoLeaks(buf, values);
786750

787-
Assert.assertEquals(Arrays.asList(5, 6), valuesInMemory);
751+
Assert.assertEquals(Arrays.asList(5, 6), values);
788752
}
789753

790754
@Test
@@ -877,26 +841,23 @@ public void testSizedTruncation() {
877841

878842
buf.next(1);
879843
buf.next(2);
880-
buf.collect(values);
844+
collectAndAssertNoLeaks(buf, values);
881845
Assert.assertEquals(Arrays.asList(1, 2), values);
882846

883847
buf.next(3);
884848
buf.next(4);
885-
values.clear();
886-
buf.collect(values);
849+
collectAndAssertNoLeaks(buf, values);
887850
Assert.assertEquals(Arrays.asList(3, 4), values);
888851

889852
buf.next(5);
890853

891-
values.clear();
892-
buf.collect(values);
854+
collectAndAssertNoLeaks(buf, values);
893855
Assert.assertEquals(Arrays.asList(4, 5), values);
894856
Assert.assertFalse(buf.hasCompleted());
895857

896858
buf.complete();
897859

898-
values.clear();
899-
buf.collect(values);
860+
collectAndAssertNoLeaks(buf, values);
900861
Assert.assertEquals(Arrays.asList(4, 5), values);
901862

902863
Assert.assertEquals(3, buf.size);
@@ -1564,4 +1525,12 @@ public void timedNoOutdatedData() {
15641525

15651526
source.test().assertResult();
15661527
}
1528+
1529+
private <T> void collectAndAssertNoLeaks(BoundedReplayBuffer<T> buffer, Collection<T> collection) {
1530+
Collection<T> allObjects = new ArrayList<T>();
1531+
collection.clear();
1532+
buffer.collectValuesInMemory(allObjects);
1533+
buffer.collect(collection);
1534+
assertEquals(allObjects, collection);
1535+
}
15671536
}

0 commit comments

Comments
 (0)