Skip to content

Commit 03fe25c

Browse files
Merge pull request ReactiveX#1552 from abersnaze/most-recent-null
Fixing a bug and a potential for other concurrency issues.
2 parents 0fe6e01 + 1f252c0 commit 03fe25c

File tree

2 files changed

+48
-45
lines changed

2 files changed

+48
-45
lines changed

rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.internal.operators;
1717

1818
import java.util.Iterator;
19+
import java.util.NoSuchElementException;
1920

2021
import rx.Observable;
2122
import rx.Subscriber;
@@ -41,54 +42,24 @@ public final class BlockingOperatorMostRecent {
4142
* {@code initialValue} if {@code source} has not yet emitted any items
4243
*/
4344
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, final T initialValue) {
44-
4545
return new Iterable<T>() {
4646
@Override
4747
public Iterator<T> iterator() {
4848
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
49-
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);
5049

5150
/**
5251
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
5352
* since it is for BlockingObservable.
5453
*/
5554
source.subscribe(mostRecentObserver);
5655

57-
return nextIterator;
56+
return mostRecentObserver.getIterable();
5857
}
5958
};
60-
61-
}
62-
63-
private static class MostRecentIterator<T> implements Iterator<T> {
64-
65-
private final MostRecentObserver<T> observer;
66-
67-
private MostRecentIterator(MostRecentObserver<T> observer) {
68-
this.observer = observer;
69-
}
70-
71-
@Override
72-
public boolean hasNext() {
73-
return !observer.isCompleted();
74-
}
75-
76-
@Override
77-
public T next() {
78-
if (observer.getThrowable() != null) {
79-
throw Exceptions.propagate(observer.getThrowable());
80-
}
81-
return observer.getRecentValue();
82-
}
83-
84-
@Override
85-
public void remove() {
86-
throw new UnsupportedOperationException("Read only iterator");
87-
}
8859
}
8960

9061
private static class MostRecentObserver<T> extends Subscriber<T> {
91-
static final NotificationLite<Object> nl = NotificationLite.instance();
62+
final NotificationLite<T> nl = NotificationLite.instance();
9263
volatile Object value;
9364

9465
private MostRecentObserver(T value) {
@@ -110,19 +81,47 @@ public void onNext(T args) {
11081
value = nl.next(args);
11182
}
11283

113-
private boolean isCompleted() {
114-
return nl.isCompleted(value);
115-
}
116-
117-
private Throwable getThrowable() {
118-
Object v = value;
119-
return nl.isError(v) ? nl.getError(v) : null;
120-
}
121-
122-
@SuppressWarnings("unchecked")
123-
private T getRecentValue() {
124-
return (T)value;
84+
/**
85+
* The {@link Iterator} return is not thread safe. In other words don't call {@link Iterator#hasNext()} in one
86+
* thread expect {@link Iterator#next()} called from a different thread to work.
87+
* @return
88+
*/
89+
public Iterator<T> getIterable() {
90+
return new Iterator<T>() {
91+
/**
92+
* buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next().
93+
*/
94+
private Object buf = null;
95+
96+
@Override
97+
public boolean hasNext() {
98+
buf = value;
99+
return !nl.isCompleted(buf);
100+
}
101+
102+
@Override
103+
public T next() {
104+
try {
105+
// if hasNext wasn't called before calling next.
106+
if (buf == null)
107+
buf = value;
108+
if (nl.isCompleted(buf))
109+
throw new NoSuchElementException();
110+
if (nl.isError(buf)) {
111+
throw Exceptions.propagate(nl.getError(buf));
112+
}
113+
return nl.getValue(buf);
114+
}
115+
finally {
116+
buf = null;
117+
}
118+
}
119+
120+
@Override
121+
public void remove() {
122+
throw new UnsupportedOperationException("Read only iterator");
123+
}
124+
};
125125
}
126-
127126
}
128127
}

rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorMostRecentTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
import rx.subjects.Subject;
3535

3636
public class BlockingOperatorMostRecentTest {
37+
@Test
38+
public void testMostRecentNull() {
39+
assertEquals(null, Observable.<Void>never().toBlocking().mostRecent(null).iterator().next());
40+
}
3741

3842
@Test
3943
public void testMostRecent() {

0 commit comments

Comments
 (0)