Skip to content

Commit 629acad

Browse files
committed
Add operators to create Observables from BroadcastReceiver
it allows to listen global and local (with support LocalBroadcastManager) broadcasts
1 parent ac6d04b commit 629acad

File tree

4 files changed

+244
-5
lines changed

4 files changed

+244
-5
lines changed

rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,22 @@
1515
*/
1616
package rx.android.observables;
1717

18-
import static rx.android.schedulers.AndroidSchedulers.mainThread;
18+
import android.app.Activity;
19+
import android.app.Fragment;
20+
import android.content.Context;
21+
import android.content.Intent;
22+
import android.content.IntentFilter;
23+
import android.os.Build;
24+
import android.os.Handler;
1925

2026
import rx.Observable;
2127
import rx.functions.Func1;
22-
import rx.operators.OperatorObserveFromAndroidComponent;
28+
import rx.operators.OperatorBroadcastRegister;
2329
import rx.operators.OperatorConditionalBinding;
30+
import rx.operators.OperatorLocalBroadcastRegister;
31+
import rx.operators.OperatorObserveFromAndroidComponent;
2432

25-
import android.app.Activity;
26-
import android.app.Fragment;
27-
import android.os.Build;
33+
import static rx.android.schedulers.AndroidSchedulers.mainThread;
2834

2935

3036
public final class AndroidObservable {
@@ -176,4 +182,37 @@ public static <T> Observable<T> bindFragment(Object fragment, Observable<T> sour
176182
throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment");
177183
}
178184
}
185+
186+
/**
187+
* Create Observable that wraps BroadcastReceiver and emmit received intents.
188+
*
189+
* @param filter Selects the Intent broadcasts to be received.
190+
*/
191+
public static Observable<Intent> fromBroadcast(Context context, IntentFilter filter){
192+
return Observable.create(new OperatorBroadcastRegister(context, filter, null, null));
193+
}
194+
195+
/**
196+
* Create Observable that wraps BroadcastReceiver and emmit received intents.
197+
*
198+
* @param filter Selects the Intent broadcasts to be received.
199+
* @param broadcastPermission String naming a permissions that a
200+
* broadcaster must hold in order to send an Intent to you. If null,
201+
* no permission is required.
202+
* @param schedulerHandler Handler identifying the thread that will receive
203+
* the Intent. If null, the main thread of the process will be used.
204+
*/
205+
public static Observable<Intent> fromBroadcast(Context context, IntentFilter filter, String broadcastPermission, Handler schedulerHandler){
206+
return Observable.create(new OperatorBroadcastRegister(context, filter, broadcastPermission, schedulerHandler));
207+
}
208+
209+
/**
210+
* Create Observable that wraps BroadcastReceiver and connects to LocalBroadcastManager
211+
* to emmit received intents.
212+
*
213+
* @param filter Selects the Intent broadcasts to be received.
214+
*/
215+
public static Observable<Intent> fromLocalBroadcast(Context context, IntentFilter filter){
216+
return Observable.create(new OperatorLocalBroadcastRegister(context, filter));
217+
}
179218
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import android.content.BroadcastReceiver;
19+
import android.content.Context;
20+
import android.content.Intent;
21+
import android.content.IntentFilter;
22+
import android.os.Handler;
23+
24+
import rx.Observable;
25+
import rx.Subscriber;
26+
import rx.Subscription;
27+
import rx.android.subscriptions.AndroidSubscriptions;
28+
import rx.functions.Action0;
29+
30+
public class OperatorBroadcastRegister implements Observable.OnSubscribe<Intent> {
31+
32+
private final Context context;
33+
private final IntentFilter intentFilter;
34+
private final String broadcastPermission;
35+
private final Handler schedulerHandler;
36+
37+
public OperatorBroadcastRegister(Context context, IntentFilter intentFilter, String broadcastPermission, Handler schedulerHandler) {
38+
this.context = context;
39+
this.intentFilter = intentFilter;
40+
this.broadcastPermission = broadcastPermission;
41+
this.schedulerHandler = schedulerHandler;
42+
}
43+
44+
@Override
45+
public void call(final Subscriber<? super Intent> subscriber) {
46+
final BroadcastReceiver broadcastReceiver = new BroadcastReceiver() {
47+
@Override
48+
public void onReceive(Context context, Intent intent) {
49+
subscriber.onNext(intent);
50+
}
51+
};
52+
53+
final Subscription subscription = AndroidSubscriptions.unsubscribeInUiThread(new Action0() {
54+
@Override
55+
public void call() {
56+
context.unregisterReceiver(broadcastReceiver);
57+
}
58+
});
59+
60+
subscriber.add(subscription);
61+
Intent stickyIntent = context.registerReceiver(broadcastReceiver, intentFilter, broadcastPermission, schedulerHandler);
62+
if (stickyIntent != null) {
63+
subscriber.onNext(stickyIntent);
64+
}
65+
66+
}
67+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import android.content.BroadcastReceiver;
19+
import android.content.Context;
20+
import android.content.Intent;
21+
import android.content.IntentFilter;
22+
import android.support.v4.content.LocalBroadcastManager;
23+
24+
import rx.Observable;
25+
import rx.Subscriber;
26+
import rx.Subscription;
27+
import rx.android.subscriptions.AndroidSubscriptions;
28+
import rx.functions.Action0;
29+
30+
public class OperatorLocalBroadcastRegister implements Observable.OnSubscribe<Intent> {
31+
32+
private final Context context;
33+
private final IntentFilter intentFilter;
34+
35+
public OperatorLocalBroadcastRegister(Context context, IntentFilter intentFilter) {
36+
this.context = context;
37+
this.intentFilter = intentFilter;
38+
}
39+
40+
@Override
41+
public void call(final Subscriber<? super Intent> subscriber) {
42+
final LocalBroadcastManager localBroadcastManager = LocalBroadcastManager.getInstance(context);
43+
final BroadcastReceiver broadcastReceiver = new BroadcastReceiver() {
44+
@Override
45+
public void onReceive(Context context, Intent intent) {
46+
subscriber.onNext(intent);
47+
}
48+
};
49+
50+
final Subscription subscription = AndroidSubscriptions.unsubscribeInUiThread(new Action0() {
51+
@Override
52+
public void call() {
53+
localBroadcastManager.unregisterReceiver(broadcastReceiver);
54+
}
55+
});
56+
57+
subscriber.add(subscription);
58+
localBroadcastManager.registerReceiver(broadcastReceiver, intentFilter);
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.android.operators;
17+
18+
import android.app.Application;
19+
import android.content.Intent;
20+
import android.content.IntentFilter;
21+
import android.support.v4.content.LocalBroadcastManager;
22+
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import org.mockito.InOrder;
26+
import org.robolectric.Robolectric;
27+
import org.robolectric.RobolectricTestRunner;
28+
29+
import rx.Observable;
30+
import rx.Observer;
31+
import rx.Subscription;
32+
import rx.android.observables.AndroidObservable;
33+
import rx.observers.TestObserver;
34+
35+
import static org.mockito.Matchers.any;
36+
import static org.mockito.Mockito.inOrder;
37+
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.never;
39+
import static org.mockito.Mockito.times;
40+
41+
@RunWith(RobolectricTestRunner.class)
42+
public class OperatorLocalBroadcastRegisterTest {
43+
44+
@Test
45+
@SuppressWarnings("unchecked")
46+
public void testLocalBroadcast() {
47+
String action = "TEST_ACTION";
48+
IntentFilter intentFilter = new IntentFilter(action);
49+
Application application = Robolectric.application;
50+
Observable<Intent> observable = AndroidObservable.fromLocalBroadcast(application, intentFilter);
51+
final Observer<Intent> observer = mock(Observer.class);
52+
final Subscription subscription = observable.subscribe(new TestObserver<Intent>(observer));
53+
54+
final InOrder inOrder = inOrder(observer);
55+
56+
inOrder.verify(observer, never()).onNext(any(Intent.class));
57+
58+
Intent intent = new Intent(action);
59+
LocalBroadcastManager localBroadcastManager = LocalBroadcastManager.getInstance(application);
60+
localBroadcastManager.sendBroadcast(intent);
61+
inOrder.verify(observer, times(1)).onNext(intent);
62+
63+
localBroadcastManager.sendBroadcast(intent);
64+
inOrder.verify(observer, times(1)).onNext(intent);
65+
66+
subscription.unsubscribe();
67+
inOrder.verify(observer, never()).onNext(any(Intent.class));
68+
69+
inOrder.verify(observer, never()).onError(any(Throwable.class));
70+
inOrder.verify(observer, never()).onCompleted();
71+
}
72+
73+
}

0 commit comments

Comments
 (0)