Skip to content

Commit f64233f

Browse files
Add Single.fromCallable()
1 parent 61e1c22 commit f64233f

File tree

2 files changed

+78
-0
lines changed

2 files changed

+78
-0
lines changed

src/main/java/rx/Single.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package rx;
1414

15+
import java.util.concurrent.Callable;
1516
import java.util.concurrent.Future;
1617
import java.util.concurrent.TimeUnit;
1718
import java.util.concurrent.TimeoutException;
@@ -605,6 +606,43 @@ public final static <T> Single<T> from(Future<? extends T> future, Scheduler sch
605606
return new Single<T>(OnSubscribeToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);
606607
}
607608

609+
/**
610+
* Returns a {@link Single} that invokes passed function and emits its result for each new Observer that subscribes.
611+
* <p>
612+
* Allows you to defer execution of passed function until Observer subscribes to the {@link Single}.
613+
* It makes passed function "lazy".
614+
* Result of the function invocation will be emitted by the {@link Single}.
615+
* <dl>
616+
* <dt><b>Scheduler:</b></dt>
617+
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
618+
* </dl>
619+
*
620+
* @param func
621+
* function which execution should be deferred, it will be invoked when Observer will subscribe to the {@link Single}.
622+
* @param <T>
623+
* the type of the item emitted by the {@link Single}.
624+
* @return a {@link Single} whose {@link Observer}s' subscriptions trigger an invocation of the given function.
625+
*/
626+
@Experimental
627+
public static <T> Single<T> fromCallable(final Callable<? extends T> func) {
628+
return create(new OnSubscribe<T>() {
629+
@Override
630+
public void call(SingleSubscriber<? super T> singleSubscriber) {
631+
final T value;
632+
633+
try {
634+
value = func.call();
635+
} catch (Throwable t) {
636+
Exceptions.throwIfFatal(t);
637+
singleSubscriber.onError(t);
638+
return;
639+
}
640+
641+
singleSubscriber.onSuccess(value);
642+
}
643+
});
644+
}
645+
608646
/**
609647
* Returns a {@code Single} that emits a specified item.
610648
* <p>

src/test/java/rx/SingleTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import static org.mockito.Mockito.mock;
2121
import static org.mockito.Mockito.verify;
2222
import static org.mockito.Mockito.verifyZeroInteractions;
23+
import static org.mockito.Mockito.when;
2324

2425
import java.util.Arrays;
26+
import java.util.concurrent.Callable;
2527
import java.util.concurrent.CountDownLatch;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.TimeoutException;
@@ -530,4 +532,42 @@ public void doOnErrorShouldThrowCompositeExceptionIfOnErrorActionThrows() {
530532

531533
verify(action).call(error);
532534
}
535+
536+
@Test
537+
public void shouldEmitValueFromCallable() throws Exception {
538+
Callable<String> callable = mock(Callable.class);
539+
540+
when(callable.call()).thenReturn("value");
541+
542+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
543+
544+
Single
545+
.fromCallable(callable)
546+
.subscribe(testSubscriber);
547+
548+
testSubscriber.assertValue("value");
549+
testSubscriber.assertNoErrors();
550+
551+
verify(callable).call();
552+
}
553+
554+
@Test
555+
public void shouldPassErrorFromCallable() throws Exception {
556+
Callable<String> callable = mock(Callable.class);
557+
558+
Throwable error = new IllegalStateException();
559+
560+
when(callable.call()).thenThrow(error);
561+
562+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
563+
564+
Single
565+
.fromCallable(callable)
566+
.subscribe(testSubscriber);
567+
568+
testSubscriber.assertNoValues();
569+
testSubscriber.assertError(error);
570+
571+
verify(callable).call();
572+
}
533573
}

0 commit comments

Comments
 (0)