Skip to content

Commit cb9d1eb

Browse files
committed
single: add toSingle method to Observable
* closes ReactiveX#3038 * this method allows an observable which is guaranteed to return exactly one item to be converted to a Single * NOTE: the semantics of this function are very similar to that of single * i.e. errors are passed through, more than one item results in an IllegalArgumentException, completion without emission results in a NoSuchElementException and exactly one item is passed through the onSuccess method of SingleSubscriber
1 parent de1f414 commit cb9d1eb

File tree

3 files changed

+184
-2
lines changed

3 files changed

+184
-2
lines changed

src/main/java/rx/Observable.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,28 @@ public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer
195195
public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
196196
// cover for generics insanity
197197
}
198-
199-
198+
199+
/**
200+
* Returns a Single that emits the single item emitted by the source Observable, if that Observable
201+
* emits only a single item. If the source Observable emits more than one item or no items, notify of an
202+
* {@code IllegalArgumentException} or {@code NoSuchElementException} respectively.
203+
* <p>
204+
* <dl>
205+
* <dt><b>Scheduler:</b></dt>
206+
* <dd>{@code toSingle} does not operate by default on a particular {@link Scheduler}.</dd>
207+
* </dl>
208+
*
209+
* @return a Single that emits the single item emitted by the source Observable
210+
* @throws IllegalArgumentException
211+
* if the source observable emits more than one item
212+
* @throws NoSuchElementException
213+
* if the source observable emits no items
214+
*/
215+
@Experimental
216+
public Single<T> toSingle() {
217+
return new Single<T>(OnSubscribeSingle.create(this));
218+
}
219+
200220

201221
/* *********************************************************************************************************
202222
* Operators Below Here
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable;
19+
import rx.Single;
20+
import rx.SingleSubscriber;
21+
import rx.Subscriber;
22+
23+
import java.util.NoSuchElementException;
24+
25+
/**
26+
* Allows conversion of an Observable to a Single ensuring that exactly one item is emitted - no more and no less.
27+
* Also forwards errors as appropriate.
28+
*/
29+
public class OnSubscribeSingle<T> implements Single.OnSubscribe<T> {
30+
31+
private final Observable<T> observable;
32+
33+
public OnSubscribeSingle(Observable<T> observable) {
34+
this.observable = observable;
35+
}
36+
37+
@Override
38+
public void call(final SingleSubscriber<? super T> child) {
39+
Subscriber<T> parent = new Subscriber<T>() {
40+
private boolean emittedTooMany = false;
41+
private boolean itemEmitted = false;
42+
private T emission = null;
43+
44+
@Override
45+
public void onStart() {
46+
// We request 2 here since we need 1 for the single and 1 to check that the observable
47+
// doesn't emit more than one item
48+
request(2);
49+
}
50+
51+
@Override
52+
public void onCompleted() {
53+
if (emittedTooMany) {
54+
// Don't need to do anything here since we already sent an error downstream
55+
} else {
56+
if (itemEmitted) {
57+
child.onSuccess(emission);
58+
} else {
59+
child.onError(new NoSuchElementException("Observable emitted no items"));
60+
}
61+
}
62+
}
63+
64+
@Override
65+
public void onError(Throwable e) {
66+
child.onError(e);
67+
unsubscribe();
68+
}
69+
70+
@Override
71+
public void onNext(T t) {
72+
if (itemEmitted) {
73+
emittedTooMany = true;
74+
child.onError(new IllegalArgumentException("Observable emitted too many elements"));
75+
unsubscribe();
76+
} else {
77+
itemEmitted = true;
78+
emission = t;
79+
}
80+
}
81+
};
82+
child.add(parent);
83+
observable.subscribe(parent);
84+
}
85+
86+
public static <T> OnSubscribeSingle<T> create(Observable<T> observable) {
87+
return new OnSubscribeSingle<T>(observable);
88+
}
89+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import org.junit.Test;
19+
import rx.Observable;
20+
import rx.Single;
21+
import rx.observers.TestSubscriber;
22+
23+
import java.util.Collections;
24+
import java.util.NoSuchElementException;
25+
26+
public class OnSubscribeSingleTest {
27+
28+
@Test
29+
public void testJustSingleItemObservable() {
30+
TestSubscriber<String> subscriber = TestSubscriber.create();
31+
Single<String> single = Observable.just("Hello World!").toSingle();
32+
single.subscribe(subscriber);
33+
34+
subscriber.assertReceivedOnNext(Collections.singletonList("Hello World!"));
35+
}
36+
37+
@Test
38+
public void testErrorObservable() {
39+
TestSubscriber<String> subscriber = TestSubscriber.create();
40+
IllegalArgumentException error = new IllegalArgumentException("Error");
41+
Single<String> single = Observable.<String>error(error).toSingle();
42+
single.subscribe(subscriber);
43+
44+
subscriber.assertError(error);
45+
}
46+
47+
@Test
48+
public void testJustTwoEmissionsObservableThrowsError() {
49+
TestSubscriber<String> subscriber = TestSubscriber.create();
50+
Single<String> single = Observable.just("First", "Second").toSingle();
51+
single.subscribe(subscriber);
52+
53+
subscriber.assertError(IllegalArgumentException.class);
54+
}
55+
56+
@Test
57+
public void testEmptyObservable() {
58+
TestSubscriber<String> subscriber = TestSubscriber.create();
59+
Single<String> single = Observable.<String>empty().toSingle();
60+
single.subscribe(subscriber);
61+
62+
subscriber.assertError(NoSuchElementException.class);
63+
}
64+
65+
@Test
66+
public void testRepeatObservableThrowsError() {
67+
TestSubscriber<String> subscriber = TestSubscriber.create();
68+
Single<String> single = Observable.just("First", "Second").repeat().toSingle();
69+
single.subscribe(subscriber);
70+
71+
subscriber.assertError(IllegalArgumentException.class);
72+
}
73+
}

0 commit comments

Comments
 (0)