Skip to content

Commit 0357377

Browse files
Merge pull request ReactiveX#458 from soundcloud/android-ui-component-operator
[rxjava-android] OperationObserveFromAndroidComponent
2 parents e4efbc8 + 5417b59 commit 0357377

File tree

3 files changed

+327
-1
lines changed

3 files changed

+327
-1
lines changed

rxjava-contrib/rxjava-android/build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ apply plugin: 'osgi'
22

33
dependencies {
44
compile project(':rxjava-core')
5+
provided 'com.google.android:android:4.0.1.2'
6+
provided 'com.google.android:support-v4:r7'
7+
8+
// testing
59
provided 'junit:junit-dep:4.10'
610
provided 'org.mockito:mockito-core:1.8.5'
711
provided 'org.robolectric:robolectric:2.1.1'
8-
provided 'com.google.android:android:4.0.1.2'
912
}
1013

1114
javadoc {
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package rx.android.observables;
2+
3+
import rx.Observable;
4+
import rx.operators.OperationObserveFromAndroidComponent;
5+
6+
import android.app.Activity;
7+
import android.app.Fragment;
8+
9+
public final class AndroidObservable {
10+
11+
private AndroidObservable() {}
12+
13+
public static <T> Observable<T> fromActivity(Activity activity, Observable<T> sourceObservable) {
14+
return OperationObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity);
15+
}
16+
17+
public static <T> Observable<T> fromFragment(Fragment fragment, Observable<T> sourceObservable) {
18+
return OperationObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, fragment);
19+
}
20+
21+
public static <T> Observable<T> fromFragment(android.support.v4.app.Fragment fragment, Observable<T> sourceObservable) {
22+
return OperationObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, fragment);
23+
}
24+
25+
}
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
package rx.operators;
2+
3+
import static org.mockito.Matchers.any;
4+
import static org.mockito.Matchers.anyInt;
5+
import static org.mockito.Mockito.mock;
6+
import static org.mockito.Mockito.never;
7+
import static org.mockito.Mockito.times;
8+
import static org.mockito.Mockito.verify;
9+
import static org.mockito.Mockito.verifyNoMoreInteractions;
10+
import static org.mockito.Mockito.when;
11+
12+
import org.junit.Before;
13+
import org.junit.Test;
14+
import org.junit.runner.RunWith;
15+
import org.mockito.Mock;
16+
import org.mockito.MockitoAnnotations;
17+
import org.robolectric.RobolectricTestRunner;
18+
import org.robolectric.annotation.Config;
19+
import rx.Observable;
20+
import rx.Observer;
21+
import rx.Subscription;
22+
import rx.android.concurrency.AndroidSchedulers;
23+
import rx.subjects.PublishSubject;
24+
25+
import android.app.Activity;
26+
import android.app.Fragment;
27+
import android.os.Looper;
28+
import android.util.Log;
29+
30+
import java.lang.reflect.Field;
31+
import java.util.concurrent.Callable;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.Future;
34+
import java.util.concurrent.TimeUnit;
35+
36+
public class OperationObserveFromAndroidComponent {
37+
38+
public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, android.app.Fragment fragment) {
39+
return Observable.create(new OnSubscribeFragment<T>(source, fragment));
40+
}
41+
42+
public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, android.support.v4.app.Fragment fragment) {
43+
return Observable.create(new OnSubscribeSupportFragment<T>(source, fragment));
44+
}
45+
46+
public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, Activity activity) {
47+
return Observable.create(new OnSubscribeBase<T, Activity>(source, activity));
48+
}
49+
50+
private static class OnSubscribeBase<T, AndroidComponent> implements Observable.OnSubscribeFunc<T> {
51+
52+
private static final String LOG_TAG = "AndroidObserver";
53+
54+
private final Observable<T> source;
55+
private AndroidComponent componentRef;
56+
private Observer<? super T> observerRef;
57+
58+
private OnSubscribeBase(Observable<T> source, AndroidComponent component) {
59+
this.source = source;
60+
this.componentRef = component;
61+
}
62+
63+
private void log(String message) {
64+
if (Log.isLoggable(LOG_TAG, Log.DEBUG)) {
65+
Log.d(LOG_TAG, "componentRef = " + componentRef);
66+
Log.d(LOG_TAG, "observerRef = " + observerRef);
67+
Log.d(LOG_TAG, message);
68+
}
69+
}
70+
71+
protected boolean isComponentValid(AndroidComponent component) {
72+
return true;
73+
}
74+
75+
@Override
76+
public Subscription onSubscribe(Observer<? super T> observer) {
77+
assertUiThread();
78+
observerRef = observer;
79+
final Subscription sourceSub = source.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<T>() {
80+
@Override
81+
public void onCompleted() {
82+
if (componentRef != null && isComponentValid(componentRef)) {
83+
observerRef.onCompleted();
84+
} else {
85+
log("onComplete: target component released or detached; dropping message");
86+
}
87+
}
88+
89+
@Override
90+
public void onError(Throwable e) {
91+
if (componentRef != null && isComponentValid(componentRef)) {
92+
observerRef.onError(e);
93+
} else {
94+
log("onError: target component released or detached; dropping message");
95+
}
96+
}
97+
98+
@Override
99+
public void onNext(T args) {
100+
if (componentRef != null && isComponentValid(componentRef)) {
101+
observerRef.onNext(args);
102+
} else {
103+
log("onNext: target component released or detached; dropping message");
104+
}
105+
}
106+
});
107+
return new Subscription() {
108+
@Override
109+
public void unsubscribe() {
110+
log("unsubscribing from source sequence");
111+
releaseReferences();
112+
sourceSub.unsubscribe();
113+
}
114+
};
115+
}
116+
117+
private void releaseReferences() {
118+
observerRef = null;
119+
componentRef = null;
120+
}
121+
122+
private void assertUiThread() {
123+
if (Looper.getMainLooper() != Looper.myLooper()) {
124+
throw new IllegalStateException("Observers must subscribe from the main UI thread, but was " + Thread.currentThread());
125+
}
126+
}
127+
}
128+
129+
private static final class OnSubscribeFragment<T> extends OnSubscribeBase<T, android.app.Fragment> {
130+
131+
private OnSubscribeFragment(Observable<T> source, android.app.Fragment fragment) {
132+
super(source, fragment);
133+
}
134+
135+
@Override
136+
protected boolean isComponentValid(android.app.Fragment fragment) {
137+
return fragment.isAdded();
138+
}
139+
}
140+
141+
private static final class OnSubscribeSupportFragment<T> extends OnSubscribeBase<T, android.support.v4.app.Fragment> {
142+
143+
private OnSubscribeSupportFragment(Observable<T> source, android.support.v4.app.Fragment fragment) {
144+
super(source, fragment);
145+
}
146+
147+
@Override
148+
protected boolean isComponentValid(android.support.v4.app.Fragment fragment) {
149+
return fragment.isAdded();
150+
}
151+
}
152+
153+
@RunWith(RobolectricTestRunner.class)
154+
@Config(manifest = Config.NONE)
155+
public static final class UnitTest {
156+
157+
@Mock
158+
private Observer<Integer> mockObserver;
159+
160+
@Mock
161+
private Fragment mockFragment;
162+
163+
@Mock
164+
private Activity mockActivity;
165+
166+
@Mock
167+
private Observable<Integer> mockObservable;
168+
169+
@Before
170+
public void setupMocks() {
171+
MockitoAnnotations.initMocks(this);
172+
when(mockFragment.isAdded()).thenReturn(true);
173+
}
174+
175+
@Test
176+
public void itThrowsIfObserverSubscribesFromBackgroundThread() throws Exception {
177+
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
178+
@Override
179+
public Object call() throws Exception {
180+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(
181+
mockObservable, mockFragment).subscribe(mockObserver);
182+
return null;
183+
}
184+
});
185+
future.get(1, TimeUnit.SECONDS);
186+
verify(mockObserver).onError(any(IllegalStateException.class));
187+
verifyNoMoreInteractions(mockObserver);
188+
}
189+
190+
@Test
191+
public void itObservesTheSourceSequenceOnTheMainUIThread() {
192+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(mockObservable, mockFragment).subscribe(mockObserver);
193+
verify(mockObservable).observeOn(AndroidSchedulers.mainThread());
194+
}
195+
196+
@Test
197+
public void itForwardsOnNextOnCompletedSequenceToTargetObserver() {
198+
Observable<Integer> source = Observable.from(1, 2, 3);
199+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
200+
verify(mockObserver, times(3)).onNext(anyInt());
201+
verify(mockObserver).onCompleted();
202+
verify(mockObserver, never()).onError(any(Exception.class));
203+
}
204+
205+
@Test
206+
public void itForwardsOnErrorToTargetObserver() {
207+
final Exception exception = new Exception();
208+
Observable<Integer> source = Observable.error(exception);
209+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
210+
verify(mockObserver).onError(exception);
211+
verify(mockObserver, never()).onNext(anyInt());
212+
verify(mockObserver, never()).onCompleted();
213+
}
214+
215+
@Test
216+
public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Throwable {
217+
PublishSubject<Integer> source = PublishSubject.create();
218+
219+
final OnSubscribeFragment<Integer> operator = new OnSubscribeFragment<Integer>(source, mockFragment);
220+
operator.onSubscribe(mockObserver);
221+
222+
source.onNext(1);
223+
releaseComponentRef(operator);
224+
225+
source.onNext(2);
226+
source.onNext(3);
227+
source.onCompleted();
228+
229+
verify(mockObserver).onNext(1);
230+
verifyNoMoreInteractions(mockObserver);
231+
}
232+
233+
@Test
234+
public void itDropsOnErrorIfTargetComponentIsGone() throws Throwable {
235+
PublishSubject<Integer> source = PublishSubject.create();
236+
237+
final OnSubscribeFragment<Integer> operator = new OnSubscribeFragment<Integer>(source, mockFragment);
238+
operator.onSubscribe(mockObserver);
239+
240+
source.onNext(1);
241+
releaseComponentRef(operator);
242+
243+
source.onError(new Exception());
244+
245+
verify(mockObserver).onNext(1);
246+
verifyNoMoreInteractions(mockObserver);
247+
}
248+
249+
private void releaseComponentRef(OnSubscribeFragment<Integer> operator) throws NoSuchFieldException, IllegalAccessException {
250+
final Field componentRef = operator.getClass().getSuperclass().getDeclaredField("componentRef");
251+
componentRef.setAccessible(true);
252+
componentRef.set(operator, null);
253+
}
254+
255+
@Test
256+
public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() {
257+
PublishSubject<Integer> source = PublishSubject.create();
258+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
259+
260+
source.onNext(1);
261+
262+
when(mockFragment.isAdded()).thenReturn(false);
263+
source.onNext(2);
264+
source.onNext(3);
265+
source.onCompleted();
266+
267+
verify(mockObserver).onNext(1);
268+
verify(mockObserver, never()).onCompleted();
269+
}
270+
271+
@Test
272+
public void itDoesNotForwardOnErrorIfFragmentIsDetached() {
273+
PublishSubject<Integer> source = PublishSubject.create();
274+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
275+
276+
source.onNext(1);
277+
278+
when(mockFragment.isAdded()).thenReturn(false);
279+
source.onError(new Exception());
280+
281+
verify(mockObserver).onNext(1);
282+
verify(mockObserver, never()).onError(any(Exception.class));
283+
}
284+
285+
@Test
286+
public void itUnsubscribesFromTheSourceSequence() {
287+
Subscription underlying = mock(Subscription.class);
288+
when(mockObservable.observeOn(AndroidSchedulers.mainThread())).thenReturn(mockObservable);
289+
when(mockObservable.subscribe(any(Observer.class))).thenReturn(underlying);
290+
291+
Subscription sub = OperationObserveFromAndroidComponent.observeFromAndroidComponent(
292+
mockObservable, mockActivity).subscribe(mockObserver);
293+
sub.unsubscribe();
294+
295+
verify(underlying).unsubscribe();
296+
}
297+
}
298+
}

0 commit comments

Comments
 (0)