Skip to content

Commit 212af0a

Browse files
committed
Fix issue ReactiveX#1522
1 parent 4f1b09c commit 212af0a

File tree

2 files changed

+141
-12
lines changed

2 files changed

+141
-12
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,21 @@ void startEmitting() {
108108

109109
@Override
110110
public void request(long n) {
111-
long _c = 0;
111+
long _c;
112112
if (n == Long.MAX_VALUE) {
113-
requested = Long.MAX_VALUE;
113+
_c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE);
114114
} else {
115-
_c = REQUESTED_UPDATER.getAndAdd(this, n);
115+
for (;;) {
116+
_c = requested;
117+
if (_c == Long.MAX_VALUE) {
118+
// If `requested` is Long.MAX_VALUE, `c+n` will be overflow.
119+
// Therefore, always check before setting to `c+n`
120+
return;
121+
}
122+
if (REQUESTED_UPDATER.compareAndSet(this, _c, _c + n)) {
123+
break;
124+
}
125+
}
116126
}
117127
if (!emittingStarted) {
118128
// we haven't started yet, so record what was requested and return
@@ -122,16 +132,20 @@ public void request(long n) {
122132
}
123133

124134
void emit(long previousRequested) {
125-
if (requested < 0) {
135+
if (requested == Long.MAX_VALUE) {
126136
// fast-path without backpressure
127-
try {
128-
for (Object value : deque) {
129-
notification.accept(subscriber, value);
137+
if (previousRequested == 0) {
138+
try {
139+
for (Object value : deque) {
140+
notification.accept(subscriber, value);
141+
}
142+
} catch (Throwable e) {
143+
subscriber.onError(e);
144+
} finally {
145+
deque.clear();
130146
}
131-
} catch (Throwable e) {
132-
subscriber.onError(e);
133-
} finally {
134-
deque.clear();
147+
} else {
148+
// backpressure path will handle Long.MAX_VALUE and emit the rest events.
135149
}
136150
} else {
137151
// backpressure is requested
@@ -160,7 +174,6 @@ void emit(long previousRequested) {
160174
// we're done emitting the number requested so return
161175
return;
162176
}
163-
164177
}
165178
}
166179
}

rxjava-core/src/test/java/rx/internal/operators/OperatorTakeLastTest.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030

3131
import rx.Observable;
3232
import rx.Observer;
33+
import rx.Subscriber;
3334
import rx.functions.Func1;
35+
import rx.functions.Functions;
3436
import rx.internal.util.RxRingBuffer;
3537
import rx.observers.TestSubscriber;
3638
import rx.schedulers.Schedulers;
@@ -148,4 +150,118 @@ public Integer call(Integer i) {
148150
};
149151
}
150152

153+
@Test
154+
public void testIssue1522() {
155+
// https://github.com/Netflix/RxJava/issues/1522
156+
assertEquals(0, Observable
157+
.empty()
158+
.count()
159+
.filter(Functions.alwaysFalse())
160+
.toList()
161+
.toBlocking().single().size());
162+
}
163+
164+
@Test
165+
public void testIgnoreRequest1() {
166+
// If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown.
167+
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {
168+
169+
@Override
170+
public void onStart() {
171+
request(Long.MAX_VALUE);
172+
}
173+
174+
@Override
175+
public void onCompleted() {
176+
177+
}
178+
179+
@Override
180+
public void onError(Throwable e) {
181+
}
182+
183+
@Override
184+
public void onNext(Integer integer) {
185+
request(Long.MAX_VALUE);
186+
}
187+
});
188+
}
189+
190+
@Test
191+
public void testIgnoreRequest2() {
192+
// If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown.
193+
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {
194+
195+
@Override
196+
public void onStart() {
197+
request(1);
198+
}
199+
200+
@Override
201+
public void onCompleted() {
202+
}
203+
204+
@Override
205+
public void onError(Throwable e) {
206+
}
207+
208+
@Override
209+
public void onNext(Integer integer) {
210+
request(1);
211+
}
212+
});
213+
}
214+
215+
@Test(timeout = 30000)
216+
public void testIgnoreRequest3() {
217+
// If `takeLast` does not ignore `request` properly, it will enter an infinite loop.
218+
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {
219+
220+
@Override
221+
public void onStart() {
222+
request(1);
223+
}
224+
225+
@Override
226+
public void onCompleted() {
227+
228+
}
229+
230+
@Override
231+
public void onError(Throwable e) {
232+
}
233+
234+
@Override
235+
public void onNext(Integer integer) {
236+
request(Long.MAX_VALUE);
237+
}
238+
});
239+
}
240+
241+
242+
@Test
243+
public void testIgnoreRequest4() {
244+
// If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown.
245+
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {
246+
247+
@Override
248+
public void onStart() {
249+
request(Long.MAX_VALUE);
250+
}
251+
252+
@Override
253+
public void onCompleted() {
254+
255+
}
256+
257+
@Override
258+
public void onError(Throwable e) {
259+
}
260+
261+
@Override
262+
public void onNext(Integer integer) {
263+
request(1);
264+
}
265+
});
266+
}
151267
}

0 commit comments

Comments
 (0)