Skip to content

Commit 400a611

Browse files
Merge pull request ReactiveX#1523 from zsxwing/issue-1522
Fix issue ReactiveX#1522
2 parents efb7f79 + c1ec1f4 commit 400a611

File tree

4 files changed

+203
-18
lines changed

4 files changed

+203
-18
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorSingle.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ public void onNext(T value) {
6060
} else {
6161
this.value = value;
6262
isNonEmpty = true;
63+
// Issue: https://github.com/Netflix/RxJava/pull/1527
64+
// Because we cache a value and don't emit now, we need to request another one.
65+
request(1);
6366
}
6467
}
6568

rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,12 @@ void startEmitting() {
108108

109109
@Override
110110
public void request(long n) {
111-
long _c = 0;
111+
if (requested == Long.MAX_VALUE) {
112+
return;
113+
}
114+
long _c;
112115
if (n == Long.MAX_VALUE) {
113-
requested = Long.MAX_VALUE;
116+
_c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE);
114117
} else {
115118
_c = REQUESTED_UPDATER.getAndAdd(this, n);
116119
}
@@ -122,16 +125,20 @@ public void request(long n) {
122125
}
123126

124127
void emit(long previousRequested) {
125-
if (requested < 0) {
128+
if (requested == Long.MAX_VALUE) {
126129
// fast-path without backpressure
127-
try {
128-
for (Object value : deque) {
129-
notification.accept(subscriber, value);
130+
if (previousRequested == 0) {
131+
try {
132+
for (Object value : deque) {
133+
notification.accept(subscriber, value);
134+
}
135+
} catch (Throwable e) {
136+
subscriber.onError(e);
137+
} finally {
138+
deque.clear();
130139
}
131-
} catch (Throwable e) {
132-
subscriber.onError(e);
133-
} finally {
134-
deque.clear();
140+
} else {
141+
// backpressure path will handle Long.MAX_VALUE and emit the rest events.
135142
}
136143
} else {
137144
// backpressure is requested
@@ -155,12 +162,22 @@ void emit(long previousRequested) {
155162
emitted++;
156163
}
157164
}
158-
159-
if (REQUESTED_UPDATER.addAndGet(this, -emitted) == 0) {
160-
// we're done emitting the number requested so return
161-
return;
165+
for (;;) {
166+
long oldRequested = requested;
167+
long newRequested = oldRequested - emitted;
168+
if (oldRequested == Long.MAX_VALUE) {
169+
// became unbounded during the loop
170+
// continue the outer loop to emit the rest events.
171+
break;
172+
}
173+
if (REQUESTED_UPDATER.compareAndSet(this, oldRequested, newRequested)) {
174+
if (newRequested == 0) {
175+
// we're done emitting the number requested so return
176+
return;
177+
}
178+
break;
179+
}
162180
}
163-
164181
}
165182
}
166183
}

rxjava-core/src/test/java/rx/internal/operators/OperatorSingleTest.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Matchers.isA;
19-
import static org.mockito.Mockito.inOrder;
20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.times;
20+
import static org.mockito.Mockito.*;
2221

2322
import java.util.NoSuchElementException;
2423

@@ -27,7 +26,9 @@
2726

2827
import rx.Observable;
2928
import rx.Observer;
29+
import rx.Subscriber;
3030
import rx.functions.Func1;
31+
import rx.functions.Func2;
3132

3233
public class OperatorSingleTest {
3334

@@ -241,4 +242,52 @@ public Boolean call(Integer t1) {
241242
inOrder.verify(observer, times(1)).onCompleted();
242243
inOrder.verifyNoMoreInteractions();
243244
}
245+
246+
@Test
247+
public void testSingleWithBackpressure() {
248+
Observable<Integer> observable = Observable.from(1, 2).single();
249+
250+
Subscriber<Integer> subscriber = spy(new Subscriber<Integer>() {
251+
252+
@Override
253+
public void onStart() {
254+
request(1);
255+
}
256+
257+
@Override
258+
public void onCompleted() {
259+
260+
}
261+
262+
@Override
263+
public void onError(Throwable e) {
264+
265+
}
266+
267+
@Override
268+
public void onNext(Integer integer) {
269+
request(1);
270+
}
271+
});
272+
observable.subscribe(subscriber);
273+
274+
InOrder inOrder = inOrder(subscriber);
275+
inOrder.verify(subscriber, times(1)).onError(isA(IllegalArgumentException.class));
276+
inOrder.verifyNoMoreInteractions();
277+
}
278+
279+
@Test(timeout = 30000)
280+
public void testIssue1527() throws InterruptedException {
281+
//https://github.com/Netflix/RxJava/pull/1527
282+
Observable<Integer> source = Observable.from(1, 2, 3, 4, 5, 6);
283+
Observable<Integer> reduced = source.reduce(new Func2<Integer, Integer, Integer>() {
284+
@Override
285+
public Integer call(Integer i1, Integer i2) {
286+
return i1 + i2;
287+
}
288+
});
289+
290+
Integer r = reduced.toBlocking().first();
291+
assertEquals(21, r.intValue());
292+
}
244293
}

rxjava-core/src/test/java/rx/internal/operators/OperatorTakeLastTest.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030

3131
import rx.Observable;
3232
import rx.Observer;
33+
import rx.Subscriber;
3334
import rx.functions.Func1;
35+
import rx.functions.Functions;
3436
import rx.internal.util.RxRingBuffer;
3537
import rx.observers.TestSubscriber;
3638
import rx.schedulers.Schedulers;
@@ -148,4 +150,118 @@ public Integer call(Integer i) {
148150
};
149151
}
150152

153+
@Test
154+
public void testIssue1522() {
155+
// https://github.com/Netflix/RxJava/issues/1522
156+
assertEquals(0, Observable
157+
.empty()
158+
.count()
159+
.filter(Functions.alwaysFalse())
160+
.toList()
161+
.toBlocking().single().size());
162+
}
163+
164+
@Test
165+
public void testIgnoreRequest1() {
166+
// If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown.
167+
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {
168+
169+
@Override
170+
public void onStart() {
171+
request(Long.MAX_VALUE);
172+
}
173+
174+
@Override
175+
public void onCompleted() {
176+
177+
}
178+
179+
@Override
180+
public void onError(Throwable e) {
181+
}
182+
183+
@Override
184+
public void onNext(Integer integer) {
185+
request(Long.MAX_VALUE);
186+
}
187+
});
188+
}
189+
190+
@Test
191+
public void testIgnoreRequest2() {
192+
// If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown.
193+
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {
194+
195+
@Override
196+
public void onStart() {
197+
request(1);
198+
}
199+
200+
@Override
201+
public void onCompleted() {
202+
}
203+
204+
@Override
205+
public void onError(Throwable e) {
206+
}
207+
208+
@Override
209+
public void onNext(Integer integer) {
210+
request(1);
211+
}
212+
});
213+
}
214+
215+
@Test(timeout = 30000)
216+
public void testIgnoreRequest3() {
217+
// If `takeLast` does not ignore `request` properly, it will enter an infinite loop.
218+
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {
219+
220+
@Override
221+
public void onStart() {
222+
request(1);
223+
}
224+
225+
@Override
226+
public void onCompleted() {
227+
228+
}
229+
230+
@Override
231+
public void onError(Throwable e) {
232+
}
233+
234+
@Override
235+
public void onNext(Integer integer) {
236+
request(Long.MAX_VALUE);
237+
}
238+
});
239+
}
240+
241+
242+
@Test
243+
public void testIgnoreRequest4() {
244+
// If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown.
245+
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {
246+
247+
@Override
248+
public void onStart() {
249+
request(Long.MAX_VALUE);
250+
}
251+
252+
@Override
253+
public void onCompleted() {
254+
255+
}
256+
257+
@Override
258+
public void onError(Throwable e) {
259+
}
260+
261+
@Override
262+
public void onNext(Integer integer) {
263+
request(1);
264+
}
265+
});
266+
}
151267
}

0 commit comments

Comments
 (0)