7
7
import android .view .View ;
8
8
import android .view .ViewGroup ;
9
9
import android .widget .ListView ;
10
-
10
+ import butterknife .Bind ;
11
+ import butterknife .ButterKnife ;
12
+ import butterknife .OnClick ;
11
13
import com .morihacky .android .rxjava .R ;
12
14
import com .morihacky .android .rxjava .wiring .LogAdapter ;
13
-
15
+ import hu .akarnokd .rxjava .interop .RxJavaInterop ;
16
+ import io .reactivex .Flowable ;
17
+ import io .reactivex .disposables .CompositeDisposable ;
18
+ import io .reactivex .functions .Function ;
19
+ import io .reactivex .subscribers .DisposableSubscriber ;
14
20
import java .util .ArrayList ;
15
21
import java .util .List ;
16
22
import java .util .concurrent .TimeUnit ;
17
-
18
- import butterknife .Bind ;
19
- import butterknife .ButterKnife ;
20
- import butterknife .OnClick ;
21
- import rx .Observable ;
22
- import rx .Observer ;
23
- import rx .Subscriber ;
24
- import rx .functions .Func1 ;
23
+ import org .reactivestreams .Publisher ;
25
24
import rx .observables .MathObservable ;
26
- import rx .subscriptions .CompositeSubscription ;
27
25
import timber .log .Timber ;
28
26
27
+
29
28
import static android .os .Looper .getMainLooper ;
30
29
31
30
public class ExponentialBackoffFragment
32
31
extends BaseFragment {
33
32
34
33
@ Bind (R .id .list_threading_log ) ListView _logList ;
35
34
private LogAdapter _adapter ;
35
+ private CompositeDisposable _disposables = new CompositeDisposable ();
36
36
private List <String > _logs ;
37
37
38
- private CompositeSubscription _subscriptions = new CompositeSubscription ();
39
-
40
- @ Override
41
- public void onActivityCreated (@ Nullable Bundle savedInstanceState ) {
42
- super .onActivityCreated (savedInstanceState );
43
- _setupLogger ();
44
- }
45
-
46
38
@ Override
47
39
public View onCreateView (LayoutInflater inflater ,
48
40
@ Nullable ViewGroup container ,
@@ -52,11 +44,17 @@ public View onCreateView(LayoutInflater inflater,
52
44
return layout ;
53
45
}
54
46
47
+ @ Override
48
+ public void onActivityCreated (@ Nullable Bundle savedInstanceState ) {
49
+ super .onActivityCreated (savedInstanceState );
50
+ _setupLogger ();
51
+ }
52
+
55
53
@ Override
56
54
public void onPause () {
57
55
super .onPause ();
58
56
59
- _subscriptions .clear ();
57
+ _disposables .clear ();
60
58
}
61
59
62
60
@ Override
@@ -72,27 +70,30 @@ public void startRetryingWithExponentialBackoffStrategy() {
72
70
_logs = new ArrayList <>();
73
71
_adapter .clear ();
74
72
75
- _subscriptions .add (//
76
- Observable //
77
- .error (new RuntimeException ("testing" )) // always fails
78
- .retryWhen (new RetryWithDelay (5 , 1000 )) // notice this is called only onError (onNext values sent are ignored)
79
- .doOnSubscribe (() -> _log ("Attempting the impossible 5 times in intervals of 1s" ))//
80
- .subscribe (new Observer <Object >() {
81
- @ Override
82
- public void onCompleted () {
83
- Timber .d ("on Completed" );
84
- }
85
-
86
- @ Override
87
- public void onError (Throwable e ) {
88
- _log ("Error: I give up!" );
89
- }
90
-
91
- @ Override
92
- public void onNext (Object aVoid ) {
93
- Timber .d ("on Next" );
94
- }
95
- }));
73
+ DisposableSubscriber <Object > disposableSubscriber = new DisposableSubscriber <Object >() {
74
+ @ Override
75
+ public void onNext (Object aVoid ) {
76
+ Timber .d ("on Next" );
77
+ }
78
+
79
+ @ Override
80
+ public void onComplete () {
81
+ Timber .d ("on Completed" );
82
+ }
83
+
84
+ @ Override
85
+ public void onError (Throwable e ) {
86
+ _log ("Error: I give up!" );
87
+ }
88
+ };
89
+
90
+ Flowable .error (new RuntimeException ("testing" )) // always fails
91
+ .retryWhen (new RetryWithDelay (5 , 1000 )) // notice this is called only onError (onNext
92
+ // values sent are ignored)
93
+ .doOnSubscribe (subscription -> _log ("Attempting the impossible 5 times in intervals of 1s" ))
94
+ .subscribe (disposableSubscriber );
95
+
96
+ _disposables .add (disposableSubscriber );
96
97
}
97
98
98
99
@ OnClick (R .id .btn_eb_delay )
@@ -101,41 +102,41 @@ public void startExecutingWithExponentialBackoffDelay() {
101
102
_logs = new ArrayList <>();
102
103
_adapter .clear ();
103
104
104
- _subscriptions . add ( //
105
-
106
- Observable . range ( 1 , 4 ) //
107
- . delay ( integer -> {
108
- // Rx-y way of doing the Fibonnaci :P
109
- return MathObservable //
110
- . sumInteger ( Observable . range ( 1 , integer ))
111
- . flatMap ( targetSecondDelay -> Observable . just ( integer )
112
- . delay ( targetSecondDelay , TimeUnit . SECONDS ));
113
- }) //
114
- . doOnSubscribe (() ->
115
- _log ( String . format ( "Execute 4 tasks with delay - time now: [xx:%02d]" ,
116
- _getSecondHand ()))) //
117
- . subscribe ( new Subscriber < Integer >() {
118
- @ Override
119
- public void onCompleted () {
120
- Timber . d ( "onCompleted " );
121
- _log ( "Completed" );
122
- }
123
-
124
- @ Override
125
- public void onError ( Throwable e ) {
126
- Timber . d ( e , "arrrr. Error" );
127
- _log ( "Error" );
128
- }
129
-
130
- @ Override
131
- public void onNext ( Integer integer ) {
132
- Timber . d ( "executing Task %d [xx:%02d]" , integer , _getSecondHand ( ));
133
- _log ( String . format ( "executing Task %d [xx:%02d]" ,
134
- integer ,
135
- _getSecondHand ()));
136
-
137
- }
138
- }) );
105
+ DisposableSubscriber < Integer > disposableSubscriber = new DisposableSubscriber < Integer >() {
106
+ @ Override
107
+ public void onNext ( Integer integer ) {
108
+ Timber . d ( "executing Task %d [xx:%02d]" , integer , _getSecondHand ());
109
+ _log ( String . format ( "executing Task %d [xx:%02d]" , integer , _getSecondHand ()));
110
+ }
111
+
112
+ @ Override
113
+ public void onError ( Throwable e ) {
114
+ Timber . d ( e , "arrrr. Error" );
115
+ _log ( "Error" );
116
+ }
117
+
118
+ @ Override
119
+ public void onComplete () {
120
+ Timber . d ( "onCompleted" );
121
+ _log ( "Completed " );
122
+ }
123
+ };
124
+
125
+ Flowable
126
+ . range ( 1 , 4 )
127
+ . delay ( integer -> {
128
+ // Rx-y way of doing the Fibonnaci :P
129
+ return RxJavaInterop
130
+ . toV2Flowable ( MathObservable . sumInteger ( rx . Observable . range ( 1 , integer )))
131
+ . flatMap ( targetSecondDelay -> Flowable
132
+ . just ( integer )
133
+ . delay ( targetSecondDelay , TimeUnit . SECONDS ));
134
+ })
135
+ . doOnSubscribe ( s -> _log ( String . format ( "Execute 4 tasks with delay - time now: [xx:%02d]" ,
136
+ _getSecondHand ())))
137
+ . subscribe ( disposableSubscriber );
138
+
139
+ _disposables . add ( disposableSubscriber );
139
140
}
140
141
141
142
// -----------------------------------------------------------------------------------
@@ -176,7 +177,7 @@ private void _log(String logMsg) {
176
177
177
178
//public static class RetryWithDelay
178
179
public class RetryWithDelay
179
- implements Func1 < Observable <? extends Throwable >, Observable <?>> {
180
+ implements Function < Flowable <? extends Throwable >, Publisher <?>> {
180
181
181
182
private final int _maxRetries ;
182
183
private final int _retryDelayMillis ;
@@ -193,14 +194,14 @@ public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
193
194
// only onNext triggers a re-subscription (onError + onComplete kills it)
194
195
195
196
@ Override
196
- public Observable <?> call ( Observable <? extends Throwable > inputObservable ) {
197
+ public Publisher <?> apply ( Flowable <? extends Throwable > inputObservable ) {
197
198
198
199
// it is critical to use inputObservable in the chain for the result
199
200
// ignoring it and doing your own thing will break the sequence
200
201
201
- return inputObservable .flatMap (new Func1 <Throwable , Observable <?>>() {
202
+ return inputObservable .flatMap (new Function <Throwable , Publisher <?>>() {
202
203
@ Override
203
- public Observable <?> call (Throwable throwable ) {
204
+ public Publisher <?> apply (Throwable throwable ) {
204
205
if (++_retryCount < _maxRetries ) {
205
206
206
207
// When this Observable calls onNext, the original
@@ -209,15 +210,14 @@ public Observable<?> call(Throwable throwable) {
209
210
Timber .d ("Retrying in %d ms" , _retryCount * _retryDelayMillis );
210
211
_log (String .format ("Retrying in %d ms" , _retryCount * _retryDelayMillis ));
211
212
212
- return Observable .timer (_retryCount * _retryDelayMillis ,
213
- TimeUnit .MILLISECONDS );
213
+ return Flowable .timer (_retryCount * _retryDelayMillis , TimeUnit .MILLISECONDS );
214
214
}
215
215
216
216
Timber .d ("Argh! i give up" );
217
217
218
218
// Max retries hit. Pass an error so the chain is forcibly completed
219
219
// only onNext triggers a re-subscription (onError + onComplete kills it)
220
- return Observable .error (throwable );
220
+ return Flowable .error (throwable );
221
221
}
222
222
});
223
223
}
0 commit comments