File tree Expand file tree Collapse file tree 1 file changed +14
-15
lines changed
rxjava-core/src/main/java/rx/observers Expand file tree Collapse file tree 1 file changed +14
-15
lines changed Original file line number Diff line number Diff line change @@ -63,25 +63,24 @@ public void onNext(T t) {
63
63
} while (!state .compareAndSet (current , newState ));
64
64
65
65
if (newState .shouldProcess ()) {
66
- if (newState == State .PROCESS_SELF ) {
67
- s .onNext (t );
68
-
69
- // finish processing to let this thread move on
70
- do {
71
- current = state .get ();
72
- newState = current .finishProcessing (1 );
73
- } while (!state .compareAndSet (current , newState ));
74
- } else {
75
- // drain queue
76
- Object [] items = newState .queue ;
77
- for (int i = 0 ; i < items .length ; i ++) {
78
- s .onNext ((T ) items [i ]);
66
+ int numItemsProcessed = 0 ;
67
+ try {
68
+ if (newState == State .PROCESS_SELF ) {
69
+ s .onNext (t );
70
+ numItemsProcessed ++;
71
+ } else {
72
+ // drain queue
73
+ Object [] items = newState .queue ;
74
+ for (int i = 0 ; i < items .length ; i ++) {
75
+ s .onNext ((T ) items [i ]);
76
+ numItemsProcessed ++;
77
+ }
79
78
}
80
-
79
+ } finally {
81
80
// finish processing to let this thread move on
82
81
do {
83
82
current = state .get ();
84
- newState = current .finishProcessing (items . length );
83
+ newState = current .finishProcessing (numItemsProcessed );
85
84
} while (!state .compareAndSet (current , newState ));
86
85
}
87
86
}
You can’t perform that action at this time.
0 commit comments