@@ -108,21 +108,14 @@ void startEmitting() {
108
108
109
109
@ Override
110
110
public void request (long n ) {
111
+ if (requested == Long .MAX_VALUE ) {
112
+ return ;
113
+ }
111
114
long _c ;
112
115
if (n == Long .MAX_VALUE ) {
113
116
_c = REQUESTED_UPDATER .getAndSet (this , Long .MAX_VALUE );
114
117
} else {
115
- for (;;) {
116
- _c = requested ;
117
- if (_c == Long .MAX_VALUE ) {
118
- // If `requested` is Long.MAX_VALUE, `c+n` will be overflow.
119
- // Therefore, always check before setting to `c+n`
120
- return ;
121
- }
122
- if (REQUESTED_UPDATER .compareAndSet (this , _c , _c + n )) {
123
- break ;
124
- }
125
- }
118
+ _c = REQUESTED_UPDATER .getAndAdd (this , n );
126
119
}
127
120
if (!emittingStarted ) {
128
121
// we haven't started yet, so record what was requested and return
@@ -169,10 +162,21 @@ void emit(long previousRequested) {
169
162
emitted ++;
170
163
}
171
164
}
172
-
173
- if (REQUESTED_UPDATER .addAndGet (this , -emitted ) == 0 ) {
174
- // we're done emitting the number requested so return
175
- return ;
165
+ for (;;) {
166
+ long oldRequested = requested ;
167
+ long newRequested = oldRequested - emitted ;
168
+ if (oldRequested == Long .MAX_VALUE ) {
169
+ // became unbounded during the loop
170
+ // continue the outer loop to emit the rest events.
171
+ break ;
172
+ }
173
+ if (REQUESTED_UPDATER .compareAndSet (this , oldRequested , newRequested )) {
174
+ if (newRequested == 0 ) {
175
+ // we're done emitting the number requested so return
176
+ return ;
177
+ }
178
+ break ;
179
+ }
176
180
}
177
181
}
178
182
}
0 commit comments