15
15
*/
16
16
package rx .observers ;
17
17
18
- import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
19
18
import rx .Observer ;
20
- import rx .operators .NotificationLite ;
21
19
22
20
/**
23
21
* Enforces single-threaded, serialized, ordered execution of {@link #onNext}, {@link #onCompleted}, and
33
31
* @param <T>
34
32
* the type of items expected to be observed by the {@code Observer}
35
33
*/
36
- public final class SerializedObserver <T > implements Observer <T > {
37
- /** The actual observer that receives the values. */
38
- private final Observer <Object > actual ;
39
- /** The queue in case of concurrent access. */
40
- final MpscPaddedQueue <Object > queue ;
41
- /** Number of queued events. */
42
- final PaddedAtomicInteger wip ;
43
- /**
44
- * Adaptively enable and disable fast path. 0 means fast path enabled, 1 means fast path disabled,
45
- * 2 means the observer is terminated.
46
- */
47
- volatile int fastpath ;
48
- /**
49
- * Atomic updater for the fastpath variable.
50
- */
51
- @ SuppressWarnings ("rawtypes" )
52
- static final AtomicIntegerFieldUpdater <SerializedObserver > FASTPATH_UPDATER
53
- = AtomicIntegerFieldUpdater .newUpdater (SerializedObserver .class , "fastpath" );
54
- /** Lightweight notification transformation. */
55
- static final NotificationLite <Object > nl = NotificationLite .instance ();
56
- /**
57
- * Constructor, takes the actual observer and initializes the queue and
58
- * work counters.
59
- * @param s the actual observer to wrap
60
- */
61
- @ SuppressWarnings ("unchecked" )
34
+ public class SerializedObserver <T > implements Observer <T > {
35
+ private final Observer <? super T > actual ;
36
+
37
+ private boolean emitting = false ;
38
+ private boolean terminated = false ;
39
+ private FastList queue ;
40
+
41
+ private static final int MAX_DRAIN_ITERATION = Integer .MAX_VALUE ;
42
+ private static final Object NULL_SENTINEL = new Object ();
43
+ private static final Object COMPLETE_SENTINEL = new Object ();
44
+
45
+ static final class FastList {
46
+ Object [] array ;
47
+ int size ;
48
+
49
+ public void add (Object o ) {
50
+ int s = size ;
51
+ Object [] a = array ;
52
+ if (a == null ) {
53
+ a = new Object [16 ];
54
+ array = a ;
55
+ } else if (s == a .length ) {
56
+ Object [] array2 = new Object [s + (s >> 2 )];
57
+ System .arraycopy (a , 0 , array2 , 0 , s );
58
+ a = array2 ;
59
+ array = a ;
60
+ }
61
+ a [s ] = o ;
62
+ size = s + 1 ;
63
+ }
64
+ }
65
+
66
+ private static final class ErrorSentinel {
67
+ final Throwable e ;
68
+
69
+ ErrorSentinel (Throwable e ) {
70
+ this .e = e ;
71
+ }
72
+ }
73
+
62
74
public SerializedObserver (Observer <? super T > s ) {
63
- this .actual = (Observer <Object >)s ;
64
- this .queue = new MpscPaddedQueue <Object >();
65
- this .wip = new PaddedAtomicInteger ();
75
+ this .actual = s ;
66
76
}
67
77
68
78
@ Override
69
79
public void onCompleted () {
70
- int n = fastpath ;
71
- // let's try the fast path
72
- if (n == 0 && wip .compareAndSet (0 , 1 )) {
73
- FASTPATH_UPDATER .lazySet (this , 2 );
74
- actual .onCompleted ();
75
- // just return since nooen else will be able to
76
- // put anything else into the queue at this point
77
- return ;
78
- }
79
- if (n == 2 ) {
80
- return ;
81
- }
82
- queue .offer (nl .completed ());
83
- if (wip .getAndIncrement () == 0 ) {
84
- n = fastpath ; // re-read fastpath
85
- if (n == 2 ) {
86
- queue .clear ();
80
+ FastList list ;
81
+ synchronized (this ) {
82
+ if (terminated ) {
87
83
return ;
88
84
}
89
- FASTPATH_UPDATER .lazySet (this , 2 );
90
- do {
91
- if (nl .accept2 (actual , queue .poll ())) {
92
- queue .clear ();
93
- return ;
85
+ terminated = true ;
86
+ if (emitting ) {
87
+ if (queue == null ) {
88
+ queue = new FastList ();
94
89
}
95
- } while (wip .decrementAndGet () > 0 );
90
+ queue .add (COMPLETE_SENTINEL );
91
+ return ;
92
+ }
93
+ emitting = true ;
94
+ list = queue ;
95
+ queue = null ;
96
96
}
97
+ drainQueue (list );
98
+ actual .onCompleted ();
97
99
}
98
100
99
101
@ Override
100
102
public void onError (final Throwable e ) {
101
- int n = fastpath ;
102
- // let's try the fast path
103
- if (n == 0 && wip .compareAndSet (0 , 1 )) {
104
- FASTPATH_UPDATER .lazySet (this , 2 );
105
- actual .onError (e );
106
- // just return since nooen else will be able to
107
- // put anything else into the queue at this point
108
- return ;
109
- }
110
- // or are we terminated?
111
- if (n == 2 ) {
112
- return ;
113
- }
114
- queue .offer (nl .error (e ));
115
- if (wip .getAndIncrement () == 0 ) {
116
- n = fastpath ; // re-read fastpath
117
- if (n == 2 ) { // are we terminated now?
118
- queue .clear ();
103
+ FastList list ;
104
+ synchronized (this ) {
105
+ if (terminated ) {
119
106
return ;
120
107
}
121
- FASTPATH_UPDATER .lazySet (this , 2 );
122
- do {
123
- if (nl .accept2 (actual , queue .poll ())) {
124
- queue .clear ();
125
- return ;
108
+ terminated = true ;
109
+ if (emitting ) {
110
+ if (queue == null ) {
111
+ queue = new FastList ();
126
112
}
127
- } while (wip .decrementAndGet () > 0 );
113
+ queue .add (new ErrorSentinel (e ));
114
+ return ;
115
+ }
116
+ emitting = true ;
117
+ list = queue ;
118
+ queue = null ;
128
119
}
120
+ drainQueue (list );
121
+ actual .onError (e );
129
122
}
130
123
131
124
@ Override
132
- public void onNext (final T t ) {
133
- int n = fastpath ;
134
- if (n == 0 && wip .compareAndSet (0 , 1 )) {
135
- int w ;
136
- try {
137
- actual .onNext (t );
138
- } finally {
139
- w = wip .decrementAndGet ();
140
- }
141
- if (w == 0 ) {
142
- return ;
143
- }
144
- FASTPATH_UPDATER .lazySet (this , 0 );
145
- } else {
146
- if (n == 2 ) {
147
- return ;
148
- }
149
- queue .offer (nl .next (t ));
150
- if (wip .getAndIncrement () != 0 ) {
125
+ public void onNext (T t ) {
126
+ FastList list ;
127
+
128
+ synchronized (this ) {
129
+ if (terminated ) {
151
130
return ;
152
131
}
153
- n = fastpath ; // we won the emission race, are we done btw?
154
- if (n == 2 ) {
132
+ if (emitting ) {
133
+ if (queue == null ) {
134
+ queue = new FastList ();
135
+ }
136
+ queue .add (t != null ? t : NULL_SENTINEL );
137
+ // another thread is emitting so we add to the queue and return
155
138
return ;
156
139
}
140
+ // we can emit
141
+ emitting = true ;
142
+ // reference to the list to drain before emitting our value
143
+ list = queue ;
144
+ queue = null ;
157
145
}
158
- int c = 0 ;
159
- boolean endRegular = false ;
146
+
147
+ // we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
148
+ boolean skipFinal = false ;
160
149
try {
150
+ int iter = MAX_DRAIN_ITERATION ;
161
151
do {
162
- if ( nl . accept2 ( actual , queue . poll ())) {
163
- FASTPATH_UPDATER . lazySet ( this , 2 );
164
- queue . clear ();
165
- return ;
152
+ drainQueue ( list );
153
+ if ( iter == MAX_DRAIN_ITERATION ) {
154
+ // after the first draining we emit our own value
155
+ actual . onNext ( t ) ;
166
156
}
167
- c ++;
168
- } while (wip .decrementAndGet () > 0 );
169
- endRegular = true ;
170
- if (c < 3 ) {
171
- FASTPATH_UPDATER .lazySet (this , 1 );
172
- }
157
+ --iter ;
158
+ if (iter > 0 ) {
159
+ synchronized (this ) {
160
+ list = queue ;
161
+ queue = null ;
162
+ if (list == null ) {
163
+ emitting = false ;
164
+ skipFinal = true ;
165
+ return ;
166
+ }
167
+ }
168
+ }
169
+ } while (iter > 0 );
173
170
} finally {
174
- if (!endRegular ) {
175
- wip .set (0 ); // allow onError to enter
171
+ if (!skipFinal ) {
172
+ synchronized (this ) {
173
+ if (terminated ) {
174
+ list = queue ;
175
+ queue = null ;
176
+ } else {
177
+ emitting = false ;
178
+ list = null ;
179
+ }
180
+ }
181
+ }
182
+ }
183
+
184
+ // this will only drain if terminated (done here outside of synchronized block)
185
+ drainQueue (list );
186
+ }
187
+
188
+ void drainQueue (FastList list ) {
189
+ if (list == null || list .size == 0 ) {
190
+ return ;
191
+ }
192
+ for (Object v : list .array ) {
193
+ if (v == null ) {
194
+ break ;
195
+ }
196
+ if (v == NULL_SENTINEL ) {
197
+ actual .onNext (null );
198
+ } else if (v == COMPLETE_SENTINEL ) {
199
+ actual .onCompleted ();
200
+ } else if (v .getClass () == ErrorSentinel .class ) {
201
+ actual .onError (((ErrorSentinel ) v ).e );
202
+ } else {
203
+ @ SuppressWarnings ("unchecked" )
204
+ T t = (T )v ;
205
+ actual .onNext (t );
176
206
}
177
207
}
178
208
}
179
- }
209
+ }
0 commit comments