Skip to content

Commit d3ed269

Browse files
authored
2.x: Add Flowable.switchMap{Maybe,Single}{DelayError} operators (ReactiveX#5873)
1 parent 44fb7cd commit d3ed269

File tree

5 files changed

+1990
-0
lines changed

5 files changed

+1990
-0
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14648,6 +14648,146 @@ <R> Flowable<R> switchMap0(Function<? super T, ? extends Publisher<? extends R>>
1464814648
return RxJavaPlugins.onAssembly(new FlowableSwitchMap<T, R>(this, mapper, bufferSize, delayError));
1464914649
}
1465014650

14651+
/**
14652+
* Maps the upstream items into {@link MaybeSource}s and switches (subscribes) to the newer ones
14653+
* while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one if
14654+
* available while failing immediately if this {@code Flowable} or any of the
14655+
* active inner {@code MaybeSource}s fail.
14656+
* <p>
14657+
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
14658+
* <dl>
14659+
* <dt><b>Backpressure:</b></dt>
14660+
* <dd>The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an
14661+
* unbounded manner (i.e., without backpressure).</dd>
14662+
* <dt><b>Scheduler:</b></dt>
14663+
* <dd>{@code switchMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
14664+
* <dt><b>Error handling:</b></dt>
14665+
* <dd>This operator terminates with an {@code onError} if this {@code Flowable} or any of
14666+
* the inner {@code MaybeSource}s fail while they are active. When this happens concurrently, their
14667+
* individual {@code Throwable} errors may get combined and emitted as a single
14668+
* {@link io.reactivex.exceptions.CompositeException CompositeException}. Otherwise, a late
14669+
* (i.e., inactive or switched out) {@code onError} from this {@code Flowable} or from any of
14670+
* the inner {@code MaybeSource}s will be forwarded to the global error handler via
14671+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} as
14672+
* {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}</dd>
14673+
* </dl>
14674+
* @param <R> the output value type
14675+
* @param mapper the function called with the current upstream event and should
14676+
* return a {@code MaybeSource} to replace the current active inner source
14677+
* and get subscribed to.
14678+
* @return the new Flowable instance
14679+
* @since 2.1.11 - experimental
14680+
* @see #switchMapMaybe(Function)
14681+
*/
14682+
@CheckReturnValue
14683+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
14684+
@SchedulerSupport(SchedulerSupport.NONE)
14685+
@Experimental
14686+
public final <R> Flowable<R> switchMapMaybe(@NonNull Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
14687+
ObjectHelper.requireNonNull(mapper, "mapper is null");
14688+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybe<T, R>(this, mapper, false));
14689+
}
14690+
14691+
/**
14692+
* Maps the upstream items into {@link MaybeSource}s and switches (subscribes) to the newer ones
14693+
* while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one if
14694+
* available, delaying errors from this {@code Flowable} or the inner {@code MaybeSource}s until all terminate.
14695+
* <p>
14696+
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
14697+
* <dl>
14698+
* <dt><b>Backpressure:</b></dt>
14699+
* <dd>The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an
14700+
* unbounded manner (i.e., without backpressure).</dd>
14701+
* <dt><b>Scheduler:</b></dt>
14702+
* <dd>{@code switchMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
14703+
* </dl>
14704+
* @param <R> the output value type
14705+
* @param mapper the function called with the current upstream event and should
14706+
* return a {@code MaybeSource} to replace the current active inner source
14707+
* and get subscribed to.
14708+
* @return the new Flowable instance
14709+
* @since 2.1.11 - experimental
14710+
* @see #switchMapMaybe(Function)
14711+
*/
14712+
@CheckReturnValue
14713+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
14714+
@SchedulerSupport(SchedulerSupport.NONE)
14715+
@Experimental
14716+
public final <R> Flowable<R> switchMapMaybeDelayError(@NonNull Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
14717+
ObjectHelper.requireNonNull(mapper, "mapper is null");
14718+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybe<T, R>(this, mapper, true));
14719+
}
14720+
14721+
/**
14722+
* Maps the upstream items into {@link SingleSource}s and switches (subscribes) to the newer ones
14723+
* while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one
14724+
* while failing immediately if this {@code Flowable} or any of the
14725+
* active inner {@code SingleSource}s fail.
14726+
* <p>
14727+
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
14728+
* <dl>
14729+
* <dt><b>Backpressure:</b></dt>
14730+
* <dd>The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an
14731+
* unbounded manner (i.e., without backpressure).</dd>
14732+
* <dt><b>Scheduler:</b></dt>
14733+
* <dd>{@code switchMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
14734+
* <dt><b>Error handling:</b></dt>
14735+
* <dd>This operator terminates with an {@code onError} if this {@code Flowable} or any of
14736+
* the inner {@code SingleSource}s fail while they are active. When this happens concurrently, their
14737+
* individual {@code Throwable} errors may get combined and emitted as a single
14738+
* {@link io.reactivex.exceptions.CompositeException CompositeException}. Otherwise, a late
14739+
* (i.e., inactive or switched out) {@code onError} from this {@code Flowable} or from any of
14740+
* the inner {@code SingleSource}s will be forwarded to the global error handler via
14741+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} as
14742+
* {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}</dd>
14743+
* </dl>
14744+
* @param <R> the output value type
14745+
* @param mapper the function called with the current upstream event and should
14746+
* return a {@code SingleSource} to replace the current active inner source
14747+
* and get subscribed to.
14748+
* @return the new Flowable instance
14749+
* @since 2.1.11 - experimental
14750+
* @see #switchMapSingle(Function)
14751+
*/
14752+
@CheckReturnValue
14753+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
14754+
@SchedulerSupport(SchedulerSupport.NONE)
14755+
@Experimental
14756+
public final <R> Flowable<R> switchMapSingle(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
14757+
ObjectHelper.requireNonNull(mapper, "mapper is null");
14758+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapSingle<T, R>(this, mapper, false));
14759+
}
14760+
14761+
/**
14762+
* Maps the upstream items into {@link SingleSource}s and switches (subscribes) to the newer ones
14763+
* while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one,
14764+
* delaying errors from this {@code Flowable} or the inner {@code SingleSource}s until all terminate.
14765+
* <p>
14766+
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
14767+
* <dl>
14768+
* <dt><b>Backpressure:</b></dt>
14769+
* <dd>The operator honors backpressure from downstream. The main {@code Flowable} is consumed in an
14770+
* unbounded manner (i.e., without backpressure).</dd>
14771+
* <dt><b>Scheduler:</b></dt>
14772+
* <dd>{@code switchMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
14773+
* </dl>
14774+
* @param <R> the output value type
14775+
* @param mapper the function called with the current upstream event and should
14776+
* return a {@code SingleSource} to replace the current active inner source
14777+
* and get subscribed to.
14778+
* @return the new Flowable instance
14779+
* @since 2.1.11 - experimental
14780+
* @see #switchMapSingle(Function)
14781+
*/
14782+
@CheckReturnValue
14783+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
14784+
@SchedulerSupport(SchedulerSupport.NONE)
14785+
@Experimental
14786+
public final <R> Flowable<R> switchMapSingleDelayError(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
14787+
ObjectHelper.requireNonNull(mapper, "mapper is null");
14788+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapSingle<T, R>(this, mapper, true));
14789+
}
14790+
1465114791
/**
1465214792
* Returns a Flowable that emits only the first {@code count} items emitted by the source Publisher. If the source emits fewer than
1465314793
* {@code count} items then all of its items are emitted.

0 commit comments

Comments
 (0)