Skip to content

Commit 3c045c7

Browse files
committed
BlockingObservable + subscribe methods.
1 parent 189928c commit 3c045c7

File tree

2 files changed

+386
-34
lines changed

2 files changed

+386
-34
lines changed

src/main/java/rx/observables/BlockingObservable.java

Lines changed: 242 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,19 @@
1515
*/
1616
package rx.observables;
1717

18-
import java.util.Iterator;
19-
import java.util.NoSuchElementException;
20-
import java.util.concurrent.CountDownLatch;
21-
import java.util.concurrent.Future;
18+
import java.util.*;
19+
import java.util.concurrent.*;
2220
import java.util.concurrent.atomic.AtomicReference;
2321

22+
import rx.*;
2423
import rx.Observable;
25-
import rx.Subscriber;
26-
import rx.Subscription;
27-
import rx.functions.Action1;
28-
import rx.functions.Func1;
29-
import rx.internal.operators.BlockingOperatorLatest;
30-
import rx.internal.operators.BlockingOperatorMostRecent;
31-
import rx.internal.operators.BlockingOperatorNext;
32-
import rx.internal.operators.BlockingOperatorToFuture;
33-
import rx.internal.operators.BlockingOperatorToIterator;
24+
import rx.Observer;
25+
import rx.annotations.Experimental;
26+
import rx.exceptions.OnErrorNotImplementedException;
27+
import rx.functions.*;
28+
import rx.internal.operators.*;
3429
import rx.internal.util.UtilityFunctions;
30+
import rx.subscriptions.Subscriptions;
3531

3632
/**
3733
* {@code BlockingObservable} is a variety of {@link Observable} that provides blocking operators. It can be
@@ -83,12 +79,16 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
8379
* need the {@link Subscriber#onCompleted()} or {@link Subscriber#onError(Throwable)} methods. If the
8480
* underlying Observable terminates with an error, rather than calling {@code onError}, this method will
8581
* throw an exception.
86-
*
82+
*
83+
* <p>The difference between this method and {@link #subscribe(Action1)} is that the {@code onNext} action
84+
* is executed on the emission thread instead of the current thread.
85+
*
8786
* @param onNext
8887
* the {@link Action1} to invoke for each item emitted by the {@code BlockingObservable}
8988
* @throws RuntimeException
9089
* if an error occurs
9190
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
91+
* @see #subscribe(Action1)
9292
*/
9393
public void forEach(final Action1<? super T> onNext) {
9494
final CountDownLatch latch = new CountDownLatch(1);
@@ -477,4 +477,232 @@ private void awaitForComplete(CountDownLatch latch, Subscription subscription) {
477477
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
478478
}
479479
}
480+
481+
/**
482+
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
483+
*/
484+
@Experimental
485+
public void run() {
486+
final CountDownLatch cdl = new CountDownLatch(1);
487+
final Throwable[] error = { null };
488+
Subscription s = o.subscribe(new Subscriber<T>() {
489+
@Override
490+
public void onNext(T t) {
491+
492+
}
493+
@Override
494+
public void onError(Throwable e) {
495+
error[0] = e;
496+
cdl.countDown();
497+
}
498+
499+
@Override
500+
public void onCompleted() {
501+
cdl.countDown();
502+
}
503+
});
504+
505+
awaitForComplete(cdl, s);
506+
Throwable e = error[0];
507+
if (e != null) {
508+
if (e instanceof RuntimeException) {
509+
throw (RuntimeException)e;
510+
} else {
511+
throw new RuntimeException(e);
512+
}
513+
}
514+
}
515+
516+
/**
517+
* Subscribes to the source and calls back the Observer methods on the current thread.
518+
* @param observer the observer to call event methods on
519+
*/
520+
@Experimental
521+
public void subscribe(Observer<? super T> observer) {
522+
final NotificationLite<T> nl = NotificationLite.instance();
523+
final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
524+
525+
Subscription s = o.subscribe(new Subscriber<T>() {
526+
@Override
527+
public void onNext(T t) {
528+
queue.offer(nl.next(t));
529+
}
530+
@Override
531+
public void onError(Throwable e) {
532+
queue.offer(nl.error(e));
533+
}
534+
@Override
535+
public void onCompleted() {
536+
queue.offer(nl.completed());
537+
}
538+
});
539+
540+
try {
541+
for (;;) {
542+
Object o = queue.poll();
543+
if (o == null) {
544+
o = queue.take();
545+
}
546+
if (nl.accept(observer, o)) {
547+
return;
548+
}
549+
}
550+
} catch (InterruptedException e) {
551+
Thread.currentThread().interrupt();
552+
observer.onError(e);
553+
} finally {
554+
s.unsubscribe();
555+
}
556+
}
557+
558+
/** Constant to indicate the onStart method should be called. */
559+
private static final Object ON_START = new Object();
560+
561+
/** Constant indicating the setProducer method should be called. */
562+
private static final Object SET_PRODUCER = new Object();
563+
564+
/** Indicates an unsubscripton happened */
565+
private static final Object UNSUBSCRIBE = new Object();
566+
567+
/**
568+
* Subscribes to the source and calls the Subscriber methods on the current thread.
569+
* <p>
570+
* The unsubscription and backpressure is composed through.
571+
* @param subscriber the subscriber to forward events and calls to in the current thread
572+
*/
573+
@Experimental
574+
public void subscribe(Subscriber<? super T> subscriber) {
575+
final NotificationLite<T> nl = NotificationLite.instance();
576+
final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
577+
final Producer[] theProducer = { null };
578+
579+
Subscriber<T> s = new Subscriber<T>() {
580+
@Override
581+
public void onNext(T t) {
582+
queue.offer(nl.next(t));
583+
}
584+
@Override
585+
public void onError(Throwable e) {
586+
queue.offer(nl.error(e));
587+
}
588+
@Override
589+
public void onCompleted() {
590+
queue.offer(nl.completed());
591+
}
592+
593+
@Override
594+
public void setProducer(Producer p) {
595+
theProducer[0] = p;
596+
queue.offer(SET_PRODUCER);
597+
}
598+
599+
@Override
600+
public void onStart() {
601+
queue.offer(ON_START);
602+
}
603+
};
604+
605+
subscriber.add(s);
606+
subscriber.add(Subscriptions.create(new Action0() {
607+
@Override
608+
public void call() {
609+
queue.offer(UNSUBSCRIBE);
610+
}
611+
}));
612+
613+
o.subscribe(s);
614+
615+
try {
616+
for (;;) {
617+
if (subscriber.isUnsubscribed()) {
618+
break;
619+
}
620+
Object o = queue.poll();
621+
if (o == null) {
622+
o = queue.take();
623+
}
624+
if (subscriber.isUnsubscribed() || o == UNSUBSCRIBE) {
625+
break;
626+
}
627+
if (o == ON_START) {
628+
subscriber.onStart();
629+
} else
630+
if (o == SET_PRODUCER) {
631+
subscriber.setProducer(theProducer[0]);
632+
} else
633+
if (nl.accept(subscriber, o)) {
634+
return;
635+
}
636+
}
637+
} catch (InterruptedException e) {
638+
Thread.currentThread().interrupt();
639+
subscriber.onError(e);
640+
} finally {
641+
s.unsubscribe();
642+
}
643+
}
644+
645+
/**
646+
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
647+
*/
648+
@Experimental
649+
public void subscribe() {
650+
run();
651+
}
652+
653+
/**
654+
* Subscribes to the source and calls the given action on the current thread and rethrows any exception wrapped
655+
* into OnErrorNotImplementedException.
656+
*
657+
* <p>The difference between this method and {@link #forEach(Action1)} is that the
658+
* action is always executed on the current thread.
659+
*
660+
* @param onNext the callback action for each source value
661+
* @see #forEach(Action1)
662+
*/
663+
@Experimental
664+
public void subscribe(final Action1<? super T> onNext) {
665+
subscribe(onNext, new Action1<Throwable>() {
666+
@Override
667+
public void call(Throwable t) {
668+
throw new OnErrorNotImplementedException(t);
669+
}
670+
}, Actions.empty());
671+
}
672+
673+
/**
674+
* Subscribes to the source and calls the given actions on the current thread.
675+
* @param onNext the callback action for each source value
676+
* @param onError the callback action for an error event
677+
*/
678+
@Experimental
679+
public void subscribe(final Action1<? super T> onNext, final Action1<? super Throwable> onError) {
680+
subscribe(onNext, onError, Actions.empty());
681+
}
682+
683+
/**
684+
* Subscribes to the source and calls the given actions on the current thread.
685+
* @param onNext the callback action for each source value
686+
* @param onError the callback action for an error event
687+
* @param onCompleted the callback action for the completion event.
688+
*/
689+
@Experimental
690+
public void subscribe(final Action1<? super T> onNext, final Action1<? super Throwable> onError, final Action0 onCompleted) {
691+
subscribe(new Observer<T>() {
692+
@Override
693+
public void onNext(T t) {
694+
onNext.call(t);
695+
}
696+
697+
@Override
698+
public void onError(Throwable e) {
699+
onError.call(e);
700+
}
701+
702+
@Override
703+
public void onCompleted() {
704+
onCompleted.call();
705+
}
706+
});
707+
}
480708
}

0 commit comments

Comments
 (0)