Skip to content

Commit 1db7349

Browse files
committed
Adds beginnings of doOnEach operator
1 parent a57042c commit 1db7349

File tree

3 files changed

+191
-0
lines changed

3 files changed

+191
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import rx.operators.OperationDematerialize;
4444
import rx.operators.OperationDistinct;
4545
import rx.operators.OperationDistinctUntilChanged;
46+
import rx.operators.OperationDoOnEach;
4647
import rx.operators.OperationElementAt;
4748
import rx.operators.OperationFilter;
4849
import rx.operators.OperationFinally;
@@ -4777,6 +4778,22 @@ public static <T> Observable<T> amb(Iterable<? extends Observable<? extends T>>
47774778
return create(OperationAmb.amb(sources));
47784779
}
47794780

4781+
4782+
/**
4783+
* Invokes an action for each element in the observable sequence.
4784+
*
4785+
* @param func
4786+
* The action to invoke for each element in the source sequence.
4787+
*
4788+
* @return
4789+
* The source sequence with the side-effecting behavior applied.
4790+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229115(v=vs.103).aspx">MSDN: Observable.Amb</a>
4791+
*/
4792+
public Observable<T> doOnEach(Observer<? super T> observer) {
4793+
return create(OperationDoOnEach.doOnEach(this, observer));
4794+
}
4795+
4796+
47804797
/**
47814798
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
47824799
* <p>
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observer;
20+
import rx.Observable.OnSubscribeFunc;
21+
import rx.Subscription;
22+
23+
/**
24+
* Converts the elements of an observable sequence to the specified type.
25+
*/
26+
public class OperationDoOnEach {
27+
public static <T> OnSubscribeFunc<T> doOnEach(Observable<? extends T> source, Observer<? super T> observer) {
28+
return new DoOnEachObservable<T>(source, observer);
29+
}
30+
31+
private static class DoOnEachObservable<T> implements OnSubscribeFunc<T> {
32+
33+
private final Observable<? extends T> source;
34+
private final Observer<? super T> doOnEachObserver;
35+
36+
public DoOnEachObservable(Observable<? extends T> source, Observer<? super T> doOnEachObserver) {
37+
this.source = source;
38+
this.doOnEachObserver = doOnEachObserver;
39+
}
40+
41+
@Override
42+
public Subscription onSubscribe(final Observer<? super T> observer) {
43+
return source.subscribe(new Observer<T>() {
44+
@Override
45+
public void onCompleted() {
46+
doOnEachObserver.onCompleted();
47+
observer.onCompleted();
48+
}
49+
50+
@Override
51+
public void onError(Throwable e) {
52+
doOnEachObserver.onError(e);
53+
observer.onError(e);
54+
}
55+
56+
@Override
57+
public void onNext(T value) {
58+
doOnEachObserver.onNext(value);
59+
observer.onNext(value);
60+
}
61+
62+
});
63+
}
64+
65+
}
66+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
import static org.mockito.Matchers.*;
20+
import static org.mockito.Mockito.*;
21+
import static rx.operators.OperationMap.*;
22+
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.mockito.InOrder;
31+
import org.mockito.Mock;
32+
import org.mockito.MockitoAnnotations;
33+
34+
import rx.Observable;
35+
import rx.Observer;
36+
import rx.concurrency.Schedulers;
37+
import rx.util.functions.Func1;
38+
import rx.util.functions.Func2;
39+
40+
public class OperationDoOnEachTest {
41+
42+
@Mock
43+
Observer<String> subscribedObserver;
44+
@Mock
45+
Observer<String> sideEffectObserver;
46+
47+
@Before
48+
public void before() {
49+
MockitoAnnotations.initMocks(this);
50+
}
51+
52+
@Test
53+
public void testDoOnEach() {
54+
Observable<String> base = Observable.from("a", "b", "c");
55+
Observable<String> doOnEach = base.doOnEach(sideEffectObserver);
56+
57+
doOnEach.subscribe(subscribedObserver);
58+
59+
// ensure the leaf observer is still getting called
60+
verify(subscribedObserver, never()).onError(any(Throwable.class));
61+
verify(subscribedObserver, times(1)).onNext("a");
62+
verify(subscribedObserver, times(1)).onNext("b");
63+
verify(subscribedObserver, times(1)).onNext("c");
64+
verify(subscribedObserver, times(1)).onCompleted();
65+
66+
// ensure our injected observer is getting called
67+
verify(sideEffectObserver, never()).onError(any(Throwable.class));
68+
verify(sideEffectObserver, times(1)).onNext("a");
69+
verify(sideEffectObserver, times(1)).onNext("b");
70+
verify(sideEffectObserver, times(1)).onNext("c");
71+
verify(sideEffectObserver, times(1)).onCompleted();
72+
}
73+
74+
75+
76+
@Test
77+
public void testDoOnEachWithError() {
78+
Observable<String> base = Observable.from("one", "fail", "two", "three", "fail");
79+
Observable<String> errs = base.map(new Func1<String, String>() {
80+
@Override
81+
public String call(String s) {
82+
if ("fail".equals(s)) {
83+
throw new RuntimeException("Forced Failure");
84+
}
85+
return s;
86+
}
87+
});
88+
89+
Observable<String> doOnEach = errs.doOnEach(sideEffectObserver);
90+
91+
92+
doOnEach.subscribe(subscribedObserver);
93+
verify(subscribedObserver, times(1)).onNext("one");
94+
verify(subscribedObserver, never()).onNext("two");
95+
verify(subscribedObserver, never()).onNext("three");
96+
verify(subscribedObserver, never()).onCompleted();
97+
verify(subscribedObserver, times(1)).onError(any(Throwable.class));
98+
99+
100+
verify(sideEffectObserver, times(1)).onNext("one");
101+
verify(sideEffectObserver, never()).onNext("two");
102+
verify(sideEffectObserver, never()).onNext("three");
103+
verify(sideEffectObserver, never()).onCompleted();
104+
verify(sideEffectObserver, times(1)).onError(any(Throwable.class));
105+
}
106+
107+
108+
}

0 commit comments

Comments
 (0)