From a5ca8c77af420a2926cdc4aa32013ea055085731 Mon Sep 17 00:00:00 2001 From: Brian Attwell Date: Wed, 12 Apr 2017 16:52:57 -0700 Subject: [PATCH 1/3] BoundedReplayBuffer should not leak memory. Problem: When `BoundedReplayBuffer` evicts an item to make room for a new item it still contains a strong reference to the evicted reference. This diff adds a new unit test to verify this behavior. Given this unit test needs to look under the hood of `BoundedReplayBuffer` in order to ensure no memory leaks exist, writing this test requires a new package private method. This diff fixes the behavior. --- .../observable/ObservableReplay.java | 28 ++++++++++++++- .../observable/ObservableReplayTest.java | 36 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index af25fa9bf2..498fcfeebe 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -616,7 +616,13 @@ final void removeFirst() { size--; // can't just move the head because it would retain the very first value // can't null out the head's value because of late replayers would see null - setFirst(next); + Node newHead = new Node(null); + Node newNext = next.get(); + newHead.set(newNext); + setFirst(newHead); + if (newNext == null) { + tail = newHead; + } } /* test */ final void removeSome(int n) { Node head = get(); @@ -751,6 +757,26 @@ void truncateFinal() { } } } + /* test */ final void collectObjectsInMemory(Collection output) { + Node next = getHead(); + for (;;) { + if (next != null) { + Object o = next.value; + Object v = leaveTransform(o); + if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) { + break; + } + T value = NotificationLite.getValue(v); + if (value != null) { + output.add(value); + } + next = next; + } else { + break; + } + next = next.get(); + } + } /* test */ boolean hasError() { return tail.value != null && NotificationLite.isError(leaveTransform(tail.value)); } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index 2057e33692..d165d7e23a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -748,7 +748,43 @@ void truncate() { buf.collect(values); Assert.assertEquals(Arrays.asList(5, 6), values); + } + + @Test + public void testBoundedReplayBufferForMemoryLeaks() { + BoundedReplayBuffer buf = new BoundedReplayBuffer() { + private static final long serialVersionUID = -5182053207244406872L; + + @Override + void truncate() { + } + }; + buf.addLast(new Node(1)); + buf.addLast(new Node(2)); + buf.addLast(new Node(3)); + buf.addLast(new Node(4)); + buf.addLast(new Node(5)); + + List valuesInMemory = new ArrayList(); + buf.collectObjectsInMemory(valuesInMemory); + + Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), valuesInMemory); + + buf.removeFirst(); + buf.removeFirst(); + buf.removeFirst(); + buf.removeFirst(); + buf.removeFirst(); + + valuesInMemory.clear(); + buf.collectObjectsInMemory(valuesInMemory); + Assert.assertTrue(valuesInMemory.isEmpty()); + + buf.addLast(new Node(5)); + buf.addLast(new Node(6)); + buf.collectObjectsInMemory(valuesInMemory); + Assert.assertEquals(Arrays.asList(5, 6), valuesInMemory); } @Test From 2b60da49dff8819fa6d3862948b30b7b00b754ec Mon Sep 17 00:00:00 2001 From: Brian Attwell Date: Wed, 12 Apr 2017 19:11:10 -0700 Subject: [PATCH 2/3] Prevent some additional memory leaks I took a look at merging the `collect()` and `collectValuesInMemory()` methods into a single method at the prompting of akarnokd. This made me realize the following: 1) There are more memory leaks. I updated the setNext() method to fix some of these. 2) We can't avoid all these memory leaks without introducing schduling because of the TimedBuffer. I don't believe engineers should have any expectation that the TimedBuffer values will be eagerly cleaned up after they timeout. So I left the seperate `collect()` and `collectValuesInMemory()` methods. This allows us to test memory for memory leaks only when we think it is appropriate. --- .../observable/ObservableReplay.java | 57 ++++++++-------- .../observable/ObservableReplayTest.java | 65 +++++-------------- 2 files changed, 43 insertions(+), 79 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index 498fcfeebe..0a9672202a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -616,31 +616,22 @@ final void removeFirst() { size--; // can't just move the head because it would retain the very first value // can't null out the head's value because of late replayers would see null + setFirst(next); + + } + /** + * Arranges the given node is the new head from now on. + * @param next the Node instance to set as first + */ + final void setFirst(Node next) { Node newHead = new Node(null); Node newNext = next.get(); newHead.set(newNext); - setFirst(newHead); + set(newHead); if (newNext == null) { tail = newHead; } } - /* test */ final void removeSome(int n) { - Node head = get(); - while (n > 0) { - head = head.get(); - n--; - size--; - } - - setFirst(head); - } - /** - * Arranges the given node is the new head from now on. - * @param n the Node instance to set as first - */ - final void setFirst(Node n) { - set(n); - } @Override public final void next(T value) { @@ -740,41 +731,45 @@ Object leaveTransform(Object value) { void truncateFinal() { } - /* test */ final void collect(Collection output) { - Node n = getHead(); + /** + * Collect all values in memory even if they will be ignored by subsequent calls to replay(). This is useful + * for tracking memory leaks. + * @param output all values. + */ + /* test */ final void collectValuesInMemory(Collection output) { + Node next = getHead(); for (;;) { - Node next = n.get(); if (next != null) { Object o = next.value; Object v = leaveTransform(o); if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) { break; } - output.add(NotificationLite.getValue(v)); - n = next; + T value = NotificationLite.getValue(v); + if (value != null) { + output.add(value); + } } else { break; } + next = next.get(); } } - /* test */ final void collectObjectsInMemory(Collection output) { - Node next = getHead(); + /* test */ final void collect(Collection output) { + Node n = getHead(); for (;;) { + Node next = n.get(); if (next != null) { Object o = next.value; Object v = leaveTransform(o); if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) { break; } - T value = NotificationLite.getValue(v); - if (value != null) { - output.add(value); - } - next = next; + output.add(NotificationLite.getValue(v)); + n = next; } else { break; } - next = next.get(); } } /* test */ boolean hasError() { diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index d165d7e23a..55746e5a11 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -731,60 +731,24 @@ void truncate() { buf.addLast(new Node(5)); List values = new ArrayList(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), values); - buf.removeSome(2); - buf.removeFirst(); - buf.removeSome(2); - - values.clear(); - buf.collect(values); - Assert.assertTrue(values.isEmpty()); - - buf.addLast(new Node(5)); - buf.addLast(new Node(6)); - buf.collect(values); - - Assert.assertEquals(Arrays.asList(5, 6), values); - } - - @Test - public void testBoundedReplayBufferForMemoryLeaks() { - BoundedReplayBuffer buf = new BoundedReplayBuffer() { - private static final long serialVersionUID = -5182053207244406872L; - - @Override - void truncate() { - } - }; - buf.addLast(new Node(1)); - buf.addLast(new Node(2)); - buf.addLast(new Node(3)); - buf.addLast(new Node(4)); - buf.addLast(new Node(5)); - - List valuesInMemory = new ArrayList(); - buf.collectObjectsInMemory(valuesInMemory); - - Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), valuesInMemory); - buf.removeFirst(); buf.removeFirst(); buf.removeFirst(); buf.removeFirst(); buf.removeFirst(); - valuesInMemory.clear(); - buf.collectObjectsInMemory(valuesInMemory); - Assert.assertTrue(valuesInMemory.isEmpty()); + collectAndAssertNoLeaks(buf, values); + Assert.assertTrue(values.isEmpty()); buf.addLast(new Node(5)); buf.addLast(new Node(6)); - buf.collectObjectsInMemory(valuesInMemory); + collectAndAssertNoLeaks(buf, values); - Assert.assertEquals(Arrays.asList(5, 6), valuesInMemory); + Assert.assertEquals(Arrays.asList(5, 6), values); } @Test @@ -877,26 +841,23 @@ public void testSizedTruncation() { buf.next(1); buf.next(2); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(1, 2), values); buf.next(3); buf.next(4); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(3, 4), values); buf.next(5); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(4, 5), values); Assert.assertFalse(buf.hasCompleted()); buf.complete(); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(4, 5), values); Assert.assertEquals(3, buf.size); @@ -1564,4 +1525,12 @@ public void timedNoOutdatedData() { source.test().assertResult(); } + + private void collectAndAssertNoLeaks(BoundedReplayBuffer buffer, Collection collection) { + Collection allObjects = new ArrayList(); + collection.clear(); + buffer.collectValuesInMemory(allObjects); + buffer.collect(collection); + assertEquals(allObjects, collection); + } } From a772a7e8d9a6fe7d1658d365aa24dca4b9e9d42f Mon Sep 17 00:00:00 2001 From: Brian Attwell Date: Wed, 12 Apr 2017 20:54:41 -0700 Subject: [PATCH 3/3] Apply leak test to the size portion of the TimeoutAndSize tests --- .../internal/operators/observable/ObservableReplay.java | 6 +++--- .../operators/observable/ObservableReplayTest.java | 7 ++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index 0a9672202a..1e57343789 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -739,7 +739,9 @@ void truncateFinal() { /* test */ final void collectValuesInMemory(Collection output) { Node next = getHead(); for (;;) { - if (next != null) { + if (next == null) { + break; + } else if (next.value != null) { Object o = next.value; Object v = leaveTransform(o); if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) { @@ -749,8 +751,6 @@ void truncateFinal() { if (value != null) { output.add(value); } - } else { - break; } next = next.get(); } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index 55746e5a11..de97bd47f9 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -12,7 +12,6 @@ */ package io.reactivex.internal.operators.observable; - import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; @@ -766,8 +765,7 @@ public void testTimedAndSizedTruncation() { buf.next(3); buf.next(4); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(3, 4), values); test.advanceTimeBy(2, TimeUnit.SECONDS); @@ -809,8 +807,7 @@ public void testTimedAndSizedTruncationError() { buf.next(3); buf.next(4); - values.clear(); - buf.collect(values); + collectAndAssertNoLeaks(buf, values); Assert.assertEquals(Arrays.asList(3, 4), values); test.advanceTimeBy(2, TimeUnit.SECONDS);