Skip to content

Commit 5b317ad

Browse files
Update SerializedObserver to Not Allow Notification Delay
Unit test showing delays. Fails when MAX_DRAIN_ITERATION set to 1, passes as currently configured. Added a thread starvation unit test and marked as ignored for now. Doesn't pass even with MAX_DRAIN_ITERATION set to 1. Probably needs backpressure solution.
1 parent ed392d7 commit 5b317ad

File tree

2 files changed

+143
-8
lines changed

2 files changed

+143
-8
lines changed

rxjava-core/src/main/java/rx/observers/SerializedObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class SerializedObserver<T> implements Observer<T> {
2121
private boolean terminated = false;
2222
private FastList queue;
2323

24-
private static final int MAX_DRAIN_ITERATION = 1;
24+
private static final int MAX_DRAIN_ITERATION = Integer.MAX_VALUE;
2525
private static final Object NULL_SENTINEL = new Object();
2626
private static final Object COMPLETE_SENTINEL = new Object();
2727

rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java

Lines changed: 142 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,15 @@
1515
*/
1616
package rx.observers;
1717

18-
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
20-
import static org.mockito.Mockito.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
20+
import static org.junit.Assert.assertTrue;
21+
import static org.junit.Assert.fail;
22+
import static org.mockito.Matchers.any;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.never;
25+
import static org.mockito.Mockito.times;
26+
import static org.mockito.Mockito.verify;
2127

2228
import java.util.concurrent.CountDownLatch;
2329
import java.util.concurrent.ExecutorService;
@@ -28,14 +34,17 @@
2834
import java.util.concurrent.atomic.AtomicInteger;
2935

3036
import org.junit.Before;
37+
import org.junit.Ignore;
3138
import org.junit.Test;
3239
import org.mockito.Mock;
3340
import org.mockito.MockitoAnnotations;
3441

3542
import rx.Observable;
43+
import rx.Observable.OnSubscribe;
3644
import rx.Observer;
3745
import rx.Subscriber;
3846
import rx.Subscription;
47+
import rx.schedulers.Schedulers;
3948

4049
public class SerializedObserverTest {
4150

@@ -265,6 +274,111 @@ public void runConcurrencyTest() {
265274
}
266275
}
267276

277+
@Test
278+
public void testNotificationDelay() {
279+
ExecutorService tp = Executors.newFixedThreadPool(2);
280+
281+
TestSubscriber<String> to = new TestSubscriber<String>(new Observer<String>() {
282+
283+
@Override
284+
public void onCompleted() {
285+
286+
}
287+
288+
@Override
289+
public void onError(Throwable e) {
290+
291+
}
292+
293+
@Override
294+
public void onNext(String t) {
295+
// force it to take time when delivering
296+
// so the second thread will asynchronously enqueue
297+
try {
298+
Thread.sleep(50);
299+
} catch (InterruptedException e) {
300+
e.printStackTrace();
301+
}
302+
}
303+
304+
});
305+
Observer<String> o = serializedObserver(to);
306+
307+
Future<?> f1 = tp.submit(new OnNextThread(o, 1));
308+
Future<?> f2 = tp.submit(new OnNextThread(o, 1));
309+
310+
waitOnThreads(f1, f2);
311+
// not completed yet
312+
313+
assertEquals(2, to.getOnNextEvents().size());
314+
System.out.println(to.getOnNextEvents());
315+
o.onCompleted();
316+
System.out.println(to.getOnNextEvents());
317+
}
318+
319+
/**
320+
* Demonstrates thread starvation problem.
321+
*
322+
* No solution on this for now. Trade-off in this direction as per https://github.com/Netflix/RxJava/issues/998#issuecomment-38959474
323+
* Probably need backpressure for this to work
324+
*
325+
* When using SynchronizedObserver we get this output:
326+
*
327+
* p1: 18 p2: 68 => should be close to each other unless we have thread starvation
328+
*
329+
* When using SerializedObserver we get:
330+
*
331+
* p1: 1 p2: 2445261 => should be close to each other unless we have thread starvation
332+
*
333+
* This demonstrates how SynchronizedObserver balances back and forth better, and blocks emission.
334+
* The real issue in this example is the async buffer-bloat, so we need backpressure.
335+
*
336+
*
337+
* @throws InterruptedException
338+
*/
339+
@Ignore
340+
@Test
341+
public void testThreadStarvation() throws InterruptedException {
342+
343+
TestSubscriber<String> to = new TestSubscriber<String>(new Observer<String>() {
344+
345+
@Override
346+
public void onCompleted() {
347+
348+
}
349+
350+
@Override
351+
public void onError(Throwable e) {
352+
353+
}
354+
355+
@Override
356+
public void onNext(String t) {
357+
// force it to take time when delivering
358+
try {
359+
Thread.sleep(1);
360+
} catch (InterruptedException e) {
361+
}
362+
}
363+
364+
});
365+
Observer<String> o = serializedObserver(to);
366+
367+
AtomicInteger p1 = new AtomicInteger();
368+
AtomicInteger p2 = new AtomicInteger();
369+
370+
Subscription s1 = infinite(p1).subscribe(o);
371+
Subscription s2 = infinite(p2).subscribe(o);
372+
373+
Thread.sleep(100);
374+
375+
System.out.println("p1: " + p1.get() + " p2: " + p2.get() + " => should be close to each other unless we have thread starvation");
376+
assertEquals(p1.get(), p2.get(), 10000); // fairly distributed within 10000 of each other
377+
378+
s1.unsubscribe();
379+
s2.unsubscribe();
380+
}
381+
268382
private static void waitOnThreads(Future<?>... futures) {
269383
for (Future<?> f : futures) {
270384
try {
@@ -276,23 +390,44 @@ private static void waitOnThreads(Future<?>... futures) {
276390
}
277391
}
278392

393+
private static Observable<String> infinite(final AtomicInteger produced) {
394+
return Observable.create(new OnSubscribe<String>() {
395+
396+
@Override
397+
public void call(Subscriber<? super String> s) {
398+
while (!s.isUnsubscribed()) {
399+
s.onNext("onNext");
400+
produced.incrementAndGet();
401+
}
402+
}
403+
404+
}).subscribeOn(Schedulers.newThread());
405+
}
406+
279407
/**
280408
* A thread that will pass data to onNext
281409
*/
282410
public static class OnNextThread implements Runnable {
283411

284-
private final Observer<String> Observer;
412+
private final Observer<String> observer;
285413
private final int numStringsToSend;
414+
final AtomicInteger produced;
286415

287-
OnNextThread(Observer<String> Observer, int numStringsToSend) {
288-
this.Observer = Observer;
416+
OnNextThread(Observer<String> observer, int numStringsToSend, AtomicInteger produced) {
417+
this.observer = observer;
289418
this.numStringsToSend = numStringsToSend;
419+
this.produced = produced;
420+
}
421+
422+
OnNextThread(Observer<String> observer, int numStringsToSend) {
423+
this(observer, numStringsToSend, new AtomicInteger());
290424
}
291425

292426
@Override
293427
public void run() {
294428
for (int i = 0; i < numStringsToSend; i++) {
295-
Observer.onNext(Thread.currentThread().getId() + "-" + i);
429+
observer.onNext(Thread.currentThread().getId() + "-" + i);
430+
produced.incrementAndGet();
296431
}
297432
}
298433
}

0 commit comments

Comments
 (0)