Skip to content

Commit 28489d0

Browse files
Merge pull request ReactiveX#275 from benjchristensen/forEach-BlockingObservable
Move forEach to BlockingObservable
2 parents 448d778 + eb7e414 commit 28489d0

File tree

4 files changed

+157
-141
lines changed

4 files changed

+157
-141
lines changed

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ def class ObservableTests {
228228

229229
@Test
230230
public void testForEach() {
231-
Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)});
231+
Observable.create(new AsyncObservable()).toBlockingObservable().forEach({ result -> a.received(result)});
232232
verify(a, times(1)).received(1);
233233
verify(a, times(1)).received(2);
234234
verify(a, times(1)).received(3);
@@ -237,7 +237,7 @@ def class ObservableTests {
237237
@Test
238238
public void testForEachWithError() {
239239
try {
240-
Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')});
240+
Observable.create(new AsyncObservable()).toBlockingObservable().forEach({ result -> throw new RuntimeException('err')});
241241
fail("we expect an exception to be thrown");
242242
}catch(Exception e) {
243243
// do nothing as we expect this

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 0 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -518,105 +518,6 @@ public Subscription subscribe(final Action1<T> onNext, final Action1<Exception>
518518
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
519519
}
520520

521-
/**
522-
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
523-
* <p>
524-
* NOTE: This will block even if the Observable is asynchronous.
525-
* <p>
526-
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
527-
*
528-
* @param onNext
529-
* {@link Action1}
530-
* @throws RuntimeException
531-
* if error occurs
532-
*/
533-
public void forEach(final Action1<T> onNext) {
534-
final CountDownLatch latch = new CountDownLatch(1);
535-
final AtomicReference<Exception> exceptionFromOnError = new AtomicReference<Exception>();
536-
537-
/**
538-
* Wrapping since raw functions provided by the user are being invoked.
539-
*
540-
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
541-
*/
542-
protectivelyWrapAndSubscribe(new Observer<T>() {
543-
@Override
544-
public void onCompleted() {
545-
latch.countDown();
546-
}
547-
548-
@Override
549-
public void onError(Exception e) {
550-
/*
551-
* If we receive an onError event we set the reference on the outer thread
552-
* so we can git it and throw after the latch.await().
553-
*
554-
* We do this instead of throwing directly since this may be on a different thread and the latch is still waiting.
555-
*/
556-
exceptionFromOnError.set(e);
557-
latch.countDown();
558-
}
559-
560-
@Override
561-
public void onNext(T args) {
562-
onNext.call(args);
563-
}
564-
});
565-
// block until the subscription completes and then return
566-
try {
567-
latch.await();
568-
} catch (InterruptedException e) {
569-
// set the interrupted flag again so callers can still get it
570-
// for more information see https://github.com/Netflix/RxJava/pull/147#issuecomment-13624780
571-
Thread.currentThread().interrupt();
572-
// using Runtime so it is not checked
573-
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
574-
}
575-
576-
if (exceptionFromOnError.get() != null) {
577-
if (exceptionFromOnError.get() instanceof RuntimeException) {
578-
throw (RuntimeException) exceptionFromOnError.get();
579-
} else {
580-
throw new RuntimeException(exceptionFromOnError.get());
581-
}
582-
}
583-
}
584-
585-
/**
586-
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
587-
* <p>
588-
* NOTE: This will block even if the Observable is asynchronous.
589-
* <p>
590-
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
591-
*
592-
* @param o
593-
* onNext {@link Action1 action}
594-
* @throws RuntimeException
595-
* if error occurs
596-
*/
597-
@SuppressWarnings({ "rawtypes", "unchecked" })
598-
public void forEach(final Object o) {
599-
if (o instanceof Action1) {
600-
// in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method.
601-
forEach((Action1) o);
602-
}
603-
604-
// lookup and memoize onNext
605-
if (o == null) {
606-
throw new RuntimeException("onNext must be implemented");
607-
}
608-
final FuncN onNext = Functions.from(o);
609-
610-
forEach(new Action1() {
611-
612-
@Override
613-
public void call(Object args) {
614-
onNext.call(args);
615-
}
616-
617-
});
618-
}
619-
620521
/**
621522
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
622523
*
@@ -3565,39 +3466,6 @@ public void onNext(String v) {
35653466
}
35663467
}
35673468

3568-
@Test
3569-
public void testForEachWithError() {
3570-
try {
3571-
Observable.create(new Func1<Observer<String>, Subscription>() {
3572-
3573-
@Override
3574-
public Subscription call(final Observer<String> observer) {
3575-
final BooleanSubscription subscription = new BooleanSubscription();
3576-
new Thread(new Runnable() {
3577-
3578-
@Override
3579-
public void run() {
3580-
observer.onNext("one");
3581-
observer.onNext("two");
3582-
observer.onNext("three");
3583-
observer.onCompleted();
3584-
}
3585-
}).start();
3586-
return subscription;
3587-
}
3588-
}).forEach(new Action1<String>() {
3589-
3590-
@Override
3591-
public void call(String t1) {
3592-
throw new RuntimeException("fail");
3593-
}
3594-
});
3595-
fail("we expect an exception to be thrown");
3596-
} catch (Exception e) {
3597-
// do nothing as we expect this
3598-
}
3599-
}
3600-
36013469
@Test
36023470
public void testPublish() throws InterruptedException {
36033471
final AtomicInteger counter = new AtomicInteger();

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import static org.junit.Assert.*;
44

55
import java.util.Iterator;
6+
import java.util.concurrent.CountDownLatch;
67
import java.util.concurrent.Future;
8+
import java.util.concurrent.atomic.AtomicReference;
79

810
import org.junit.Before;
911
import org.junit.Test;
@@ -13,11 +15,15 @@
1315
import rx.Observable;
1416
import rx.Observer;
1517
import rx.Subscription;
18+
import rx.operators.AtomicObservableSubscription;
19+
import rx.operators.AtomicObserver;
1620
import rx.operators.OperationMostRecent;
1721
import rx.operators.OperationNext;
1822
import rx.operators.OperationToFuture;
1923
import rx.operators.OperationToIterator;
24+
import rx.subscriptions.BooleanSubscription;
2025
import rx.subscriptions.Subscriptions;
26+
import rx.util.functions.Action1;
2127
import rx.util.functions.Func1;
2228
import rx.util.functions.FuncN;
2329
import rx.util.functions.Functions;
@@ -309,6 +315,115 @@ protected BlockingObservable(Func1<Observer<T>, Subscription> onSubscribe) {
309315
super(onSubscribe);
310316
}
311317

318+
/**
319+
* Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance.
320+
* <p>
321+
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
322+
*/
323+
private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
324+
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
325+
return subscription.wrap(subscribe(new AtomicObserver<T>(subscription, o)));
326+
}
327+
328+
/**
329+
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
330+
* <p>
331+
* NOTE: This will block even if the Observable is asynchronous.
332+
* <p>
333+
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
334+
*
335+
* @param onNext
336+
* {@link Action1}
337+
* @throws RuntimeException
338+
* if error occurs
339+
*/
340+
public void forEach(final Action1<T> onNext) {
341+
final CountDownLatch latch = new CountDownLatch(1);
342+
final AtomicReference<Exception> exceptionFromOnError = new AtomicReference<Exception>();
343+
344+
/**
345+
* Wrapping since raw functions provided by the user are being invoked.
346+
*
347+
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
348+
*/
349+
protectivelyWrapAndSubscribe(new Observer<T>() {
350+
@Override
351+
public void onCompleted() {
352+
latch.countDown();
353+
}
354+
355+
@Override
356+
public void onError(Exception e) {
357+
/*
358+
* If we receive an onError event we set the reference on the outer thread
359+
* so we can git it and throw after the latch.await().
360+
*
361+
* We do this instead of throwing directly since this may be on a different thread and the latch is still waiting.
362+
*/
363+
exceptionFromOnError.set(e);
364+
latch.countDown();
365+
}
366+
367+
@Override
368+
public void onNext(T args) {
369+
onNext.call(args);
370+
}
371+
});
372+
// block until the subscription completes and then return
373+
try {
374+
latch.await();
375+
} catch (InterruptedException e) {
376+
// set the interrupted flag again so callers can still get it
377+
// for more information see https://github.com/Netflix/RxJava/pull/147#issuecomment-13624780
378+
Thread.currentThread().interrupt();
379+
// using Runtime so it is not checked
380+
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
381+
}
382+
383+
if (exceptionFromOnError.get() != null) {
384+
if (exceptionFromOnError.get() instanceof RuntimeException) {
385+
throw (RuntimeException) exceptionFromOnError.get();
386+
} else {
387+
throw new RuntimeException(exceptionFromOnError.get());
388+
}
389+
}
390+
}
391+
392+
/**
393+
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
394+
* <p>
395+
* NOTE: This will block even if the Observable is asynchronous.
396+
* <p>
397+
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
398+
*
399+
* @param o
400+
* onNext {@link Action1 action}
401+
* @throws RuntimeException
402+
* if error occurs
403+
*/
404+
@SuppressWarnings({ "rawtypes", "unchecked" })
405+
public void forEach(final Object o) {
406+
if (o instanceof Action1) {
407+
// in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method.
408+
forEach((Action1) o);
409+
}
410+
411+
// lookup and memoize onNext
412+
if (o == null) {
413+
throw new RuntimeException("onNext must be implemented");
414+
}
415+
final FuncN onNext = Functions.from(o);
416+
417+
forEach(new Action1() {
418+
419+
@Override
420+
public void call(Object args) {
421+
onNext.call(args);
422+
}
423+
424+
});
425+
}
426+
312427
/**
313428
* Returns an iterator that iterates all values of the observable.
314429
*
@@ -731,6 +846,39 @@ public Subscription call(Observer<String> observer) {
731846
it.next();
732847

733848
}
849+
850+
@Test
851+
public void testForEachWithError() {
852+
try {
853+
BlockingObservable.from(Observable.create(new Func1<Observer<String>, Subscription>() {
854+
855+
@Override
856+
public Subscription call(final Observer<String> observer) {
857+
final BooleanSubscription subscription = new BooleanSubscription();
858+
new Thread(new Runnable() {
859+
860+
@Override
861+
public void run() {
862+
observer.onNext("one");
863+
observer.onNext("two");
864+
observer.onNext("three");
865+
observer.onCompleted();
866+
}
867+
}).start();
868+
return subscription;
869+
}
870+
})).forEach(new Action1<String>() {
871+
872+
@Override
873+
public void call(String t1) {
874+
throw new RuntimeException("fail");
875+
}
876+
});
877+
fail("we expect an exception to be thrown");
878+
} catch (Exception e) {
879+
// do nothing as we expect this
880+
}
881+
}
734882

735883
private static class TestException extends RuntimeException {
736884
private static final long serialVersionUID = 1L;

0 commit comments

Comments
 (0)