Skip to content

Commit bcded86

Browse files
Merge pull request ReactiveX#977 from benjchristensen/dematerialize
Dematerialize - handle non-materialized terminal events
2 parents c10af94 + 75b0124 commit bcded86

File tree

2 files changed

+40
-3
lines changed

2 files changed

+40
-3
lines changed

rxjava-core/src/main/java/rx/operators/OperationDematerialize.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
5757
return sequence.subscribe(new Observer<Notification<? extends T>>() {
5858
@Override
5959
public void onCompleted() {
60+
observer.onCompleted();
6061
}
6162

6263
@Override
6364
public void onError(Throwable e) {
65+
observer.onError(e);
6466
}
6567

6668
@Override

rxjava-core/src/test/java/rx/operators/OperationDematerializeTest.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Matchers.*;
19-
import static org.mockito.Mockito.*;
20-
import static rx.operators.OperationDematerialize.*;
18+
import static org.mockito.Matchers.any;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.never;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
23+
import static rx.operators.OperationDematerialize.dematerialize;
2124

2225
import org.junit.Test;
2326

2427
import rx.Notification;
2528
import rx.Observable;
2629
import rx.Observer;
30+
import rx.observers.TestSubscriber;
2731

2832
public class OperationDematerializeTest {
2933

@@ -71,4 +75,35 @@ public void testDematerialize3() {
7175
verify(observer, times(0)).onCompleted();
7276
verify(observer, times(0)).onNext(any(Integer.class));
7377
}
78+
79+
@Test
80+
public void testErrorPassThru() {
81+
Exception exception = new Exception("test");
82+
Observable<Integer> observable = Observable.error(exception);
83+
Observable<Integer> dematerialize = observable.dematerialize();
84+
85+
Observer<Integer> observer = mock(Observer.class);
86+
dematerialize.subscribe(observer);
87+
88+
verify(observer, times(1)).onError(exception);
89+
verify(observer, times(0)).onCompleted();
90+
verify(observer, times(0)).onNext(any(Integer.class));
91+
}
92+
93+
@Test
94+
public void testCompletePassThru() {
95+
Observable<Integer> observable = Observable.empty();
96+
Observable<Integer> dematerialize = observable.dematerialize();
97+
98+
Observer<Integer> observer = mock(Observer.class);
99+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(observer);
100+
dematerialize.subscribe(ts);
101+
102+
System.out.println(ts.getOnErrorEvents());
103+
104+
verify(observer, never()).onError(any(Throwable.class));
105+
verify(observer, times(1)).onCompleted();
106+
verify(observer, times(0)).onNext(any(Integer.class));
107+
}
108+
74109
}

0 commit comments

Comments
 (0)