27
27
import rx .Observer ;
28
28
import rx .Subscription ;
29
29
import rx .subscriptions .CompositeSubscription ;
30
- import rx .subscriptions .SingleAssignmentSubscription ;
31
30
import rx .util .functions .Func2 ;
32
31
import rx .util .functions .Func3 ;
33
32
import rx .util .functions .Func4 ;
@@ -61,37 +60,42 @@ public class OperationCombineLatest {
61
60
* The aggregation function used to combine the source observable values.
62
61
* @return A function from an observer to a subscription. This can be used to create an observable from.
63
62
*/
63
+ @ SuppressWarnings ("unchecked" )
64
64
public static <T0 , T1 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <T1 > w1 , Func2 <? super T0 , ? super T1 , ? extends R > combineLatestFunction ) {
65
65
return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 ), Functions .fromFunc (combineLatestFunction ));
66
66
}
67
67
68
68
/**
69
69
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
70
70
*/
71
+ @ SuppressWarnings ("unchecked" )
71
72
public static <T0 , T1 , T2 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 ,
72
73
Func3 <? super T0 , ? super T1 , ? super T2 , ? extends R > combineLatestFunction ) {
73
- return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 ), Functions .fromFunc (combineLatestFunction ));
74
+ return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 ), Functions .fromFunc (combineLatestFunction ));
74
75
}
75
76
76
77
/**
77
78
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
78
79
*/
80
+ @ SuppressWarnings ("unchecked" )
79
81
public static <T0 , T1 , T2 , T3 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 ,
80
82
Func4 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? extends R > combineLatestFunction ) {
81
- return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 ), Functions .fromFunc (combineLatestFunction ));
83
+ return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 ), Functions .fromFunc (combineLatestFunction ));
82
84
}
83
85
84
86
/**
85
87
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
86
88
*/
89
+ @ SuppressWarnings ("unchecked" )
87
90
public static <T0 , T1 , T2 , T3 , T4 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 , Observable <? extends T4 > w4 ,
88
91
Func5 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? super T4 , ? extends R > combineLatestFunction ) {
89
- return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 , w4 ), Functions .fromFunc (combineLatestFunction ));
92
+ return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 , w4 ), Functions .fromFunc (combineLatestFunction ));
90
93
}
91
94
92
95
/**
93
96
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
94
97
*/
98
+ @ SuppressWarnings ("unchecked" )
95
99
public static <T0 , T1 , T2 , T3 , T4 , T5 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 , Observable <? extends T4 > w4 , Observable <? extends T5 > w5 ,
96
100
Func6 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? super T4 , ? super T5 , ? extends R > combineLatestFunction ) {
97
101
return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 , w4 , w5 ), Functions .fromFunc (combineLatestFunction ));
@@ -100,6 +104,7 @@ public static <T0, T1, T2, T3, T4, T5, R> OnSubscribeFunc<R> combineLatest(Obser
100
104
/**
101
105
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
102
106
*/
107
+ @ SuppressWarnings ("unchecked" )
103
108
public static <T0 , T1 , T2 , T3 , T4 , T5 , T6 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 , Observable <? extends T4 > w4 , Observable <? extends T5 > w5 , Observable <? extends T6 > w6 ,
104
109
Func7 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? super T4 , ? super T5 , ? super T6 , ? extends R > combineLatestFunction ) {
105
110
return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 , w4 , w5 , w6 ), Functions .fromFunc (combineLatestFunction ));
@@ -108,6 +113,7 @@ public static <T0, T1, T2, T3, T4, T5, T6, R> OnSubscribeFunc<R> combineLatest(O
108
113
/**
109
114
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
110
115
*/
116
+ @ SuppressWarnings ("unchecked" )
111
117
public static <T0 , T1 , T2 , T3 , T4 , T5 , T6 , T7 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 , Observable <? extends T4 > w4 , Observable <? extends T5 > w5 , Observable <? extends T6 > w6 , Observable <? extends T7 > w7 ,
112
118
Func8 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? super T4 , ? super T5 , ? super T6 , ? super T7 , ? extends R > combineLatestFunction ) {
113
119
return new CombineLatest <Object , R >(Arrays .asList (w0 , w1 , w2 , w3 , w4 , w5 , w6 , w7 ), Functions .fromFunc (combineLatestFunction ));
@@ -116,6 +122,7 @@ public static <T0, T1, T2, T3, T4, T5, T6, T7, R> OnSubscribeFunc<R> combineLate
116
122
/**
117
123
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
118
124
*/
125
+ @ SuppressWarnings ("unchecked" )
119
126
public static <T0 , T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , R > OnSubscribeFunc <R > combineLatest (Observable <? extends T0 > w0 , Observable <? extends T1 > w1 , Observable <? extends T2 > w2 , Observable <? extends T3 > w3 , Observable <? extends T4 > w4 , Observable <? extends T5 > w5 , Observable <? extends T6 > w6 , Observable <? extends T7 > w7 ,
120
127
Observable <? extends T8 > w8 ,
121
128
Func9 <? super T0 , ? super T1 , ? super T2 , ? super T3 , ? super T4 , ? super T5 , ? super T6 , ? super T7 , ? super T8 , ? extends R > combineLatestFunction ) {
@@ -125,6 +132,7 @@ public static <T0, T1, T2, T3, T4, T5, T6, T7, T8, R> OnSubscribeFunc<R> combine
125
132
static final class CombineLatest <T , R > implements OnSubscribeFunc <R > {
126
133
final List <Observable <? extends T >> sources ;
127
134
final FuncN <? extends R > combiner ;
135
+
128
136
public CombineLatest (Iterable <? extends Observable <? extends T >> sources , FuncN <? extends R > combiner ) {
129
137
this .sources = new ArrayList <Observable <? extends T >>();
130
138
this .combiner = combiner ;
@@ -136,27 +144,28 @@ public CombineLatest(Iterable<? extends Observable<? extends T>> sources, FuncN<
136
144
@ Override
137
145
public Subscription onSubscribe (Observer <? super R > t1 ) {
138
146
CompositeSubscription csub = new CompositeSubscription ();
139
-
147
+
140
148
Collector collector = new Collector (t1 , csub , sources .size ());
141
-
149
+
142
150
int index = 0 ;
143
151
List <SourceObserver > observers = new ArrayList <SourceObserver >(sources .size () + 1 );
144
152
for (Observable <? extends T > source : sources ) {
145
- SingleAssignmentSubscription sas = new SingleAssignmentSubscription ();
153
+ SafeObservableSubscription sas = new SafeObservableSubscription ();
146
154
csub .add (sas );
147
155
observers .add (new SourceObserver (collector , sas , index , source ));
148
156
index ++;
149
157
}
150
-
158
+
151
159
for (SourceObserver so : observers ) {
152
160
// if we run to completion, don't bother any further
153
161
if (!csub .isUnsubscribed ()) {
154
162
so .connect ();
155
163
}
156
164
}
157
-
165
+
158
166
return csub ;
159
167
}
168
+
160
169
/**
161
170
* The collector that combines the latest values from many sources.
162
171
*/
@@ -173,6 +182,7 @@ final class Collector {
173
182
int hasCount ;
174
183
/** Number of completed source observers. */
175
184
int completedCount ;
185
+
176
186
public Collector (Observer <? super R > observer , Subscription cancel , int count ) {
177
187
this .observer = observer ;
178
188
this .cancel = cancel ;
@@ -181,6 +191,7 @@ public Collector(Observer<? super R> observer, Subscription cancel, int count) {
181
191
this .completed = new BitSet (count );
182
192
this .lock = new ReentrantLock ();
183
193
}
194
+
184
195
public void next (int index , T value ) {
185
196
Throwable err = null ;
186
197
lock .lock ();
@@ -210,6 +221,7 @@ public void next(int index, T value) {
210
221
cancel .unsubscribe ();
211
222
}
212
223
}
224
+
213
225
public void error (int index , Throwable e ) {
214
226
boolean unsub = false ;
215
227
lock .lock ();
@@ -226,13 +238,16 @@ public void error(int index, Throwable e) {
226
238
cancel .unsubscribe ();
227
239
}
228
240
}
241
+
229
242
boolean isTerminated () {
230
243
return completedCount == values .length + 1 ;
231
244
}
245
+
232
246
void terminate () {
233
247
completedCount = values .length + 1 ;
234
248
Arrays .fill (values , null );
235
249
}
250
+
236
251
public void completed (int index ) {
237
252
boolean unsub = false ;
238
253
lock .lock ();
@@ -256,22 +271,25 @@ public void completed(int index) {
256
271
}
257
272
}
258
273
}
274
+
259
275
/**
260
276
* Observes a specific source and communicates with the collector.
261
277
*/
262
- final class SourceObserver implements Observer <T > {
263
- final SingleAssignmentSubscription self ;
278
+ final class SourceObserver implements Observer <T > {
279
+ final SafeObservableSubscription self ;
264
280
final Collector collector ;
265
281
final int index ;
266
282
Observable <? extends T > source ;
267
- public SourceObserver (Collector collector ,
268
- SingleAssignmentSubscription self , int index ,
283
+
284
+ public SourceObserver (Collector collector ,
285
+ SafeObservableSubscription self , int index ,
269
286
Observable <? extends T > source ) {
270
287
this .self = self ;
271
288
this .collector = collector ;
272
289
this .index = index ;
273
290
this .source = source ;
274
291
}
292
+
275
293
@ Override
276
294
public void onNext (T args ) {
277
295
collector .next (index , args );
@@ -287,9 +305,10 @@ public void onCompleted() {
287
305
collector .completed (index );
288
306
self .unsubscribe ();
289
307
}
308
+
290
309
/** Connect to the source. */
291
310
void connect () {
292
- self .set (source .subscribe (this ));
311
+ self .wrap (source .subscribe (this ));
293
312
source = null ;
294
313
}
295
314
}
0 commit comments