Skip to content

Commit 855153e

Browse files
authored
2.x: Fix window(Observable|Callable) upstream handling (ReactiveX#5887)
1 parent a267d7e commit 855153e

File tree

3 files changed

+943
-276
lines changed

3 files changed

+943
-276
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundary.java

Lines changed: 144 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -18,167 +18,199 @@
1818
import io.reactivex.*;
1919
import io.reactivex.disposables.Disposable;
2020
import io.reactivex.internal.disposables.DisposableHelper;
21-
import io.reactivex.internal.observers.QueueDrainObserver;
2221
import io.reactivex.internal.queue.MpscLinkedQueue;
23-
import io.reactivex.internal.util.NotificationLite;
24-
import io.reactivex.observers.*;
22+
import io.reactivex.internal.util.AtomicThrowable;
23+
import io.reactivex.observers.DisposableObserver;
2524
import io.reactivex.plugins.RxJavaPlugins;
2625
import io.reactivex.subjects.UnicastSubject;
2726

2827
public final class ObservableWindowBoundary<T, B> extends AbstractObservableWithUpstream<T, Observable<T>> {
2928
final ObservableSource<B> other;
30-
final int bufferSize;
29+
final int capacityHint;
3130

32-
public ObservableWindowBoundary(ObservableSource<T> source, ObservableSource<B> other, int bufferSize) {
31+
public ObservableWindowBoundary(ObservableSource<T> source, ObservableSource<B> other, int capacityHint) {
3332
super(source);
3433
this.other = other;
35-
this.bufferSize = bufferSize;
34+
this.capacityHint = capacityHint;
3635
}
3736

3837
@Override
39-
public void subscribeActual(Observer<? super Observable<T>> t) {
40-
source.subscribe(new WindowBoundaryMainObserver<T, B>(new SerializedObserver<Observable<T>>(t), other, bufferSize));
38+
public void subscribeActual(Observer<? super Observable<T>> observer) {
39+
WindowBoundaryMainObserver<T, B> parent = new WindowBoundaryMainObserver<T, B>(observer, capacityHint);
40+
41+
observer.onSubscribe(parent);
42+
other.subscribe(parent.boundaryObserver);
43+
44+
source.subscribe(parent);
4145
}
4246

4347
static final class WindowBoundaryMainObserver<T, B>
44-
extends QueueDrainObserver<T, Object, Observable<T>>
45-
implements Disposable {
48+
extends AtomicInteger
49+
implements Observer<T>, Disposable, Runnable {
4650

47-
final ObservableSource<B> other;
48-
final int bufferSize;
51+
private static final long serialVersionUID = 2233020065421370272L;
4952

50-
Disposable s;
53+
final Observer<? super Observable<T>> downstream;
5154

52-
final AtomicReference<Disposable> boundary = new AtomicReference<Disposable>();
55+
final int capacityHint;
5356

54-
UnicastSubject<T> window;
57+
final WindowBoundaryInnerObserver<T, B> boundaryObserver;
5558

56-
static final Object NEXT = new Object();
59+
final AtomicReference<Disposable> upstream;
5760

58-
final AtomicLong windows = new AtomicLong();
61+
final AtomicInteger windows;
5962

60-
WindowBoundaryMainObserver(Observer<? super Observable<T>> actual, ObservableSource<B> other,
61-
int bufferSize) {
62-
super(actual, new MpscLinkedQueue<Object>());
63-
this.other = other;
64-
this.bufferSize = bufferSize;
65-
windows.lazySet(1);
66-
}
63+
final MpscLinkedQueue<Object> queue;
6764

68-
@Override
69-
public void onSubscribe(Disposable s) {
70-
if (DisposableHelper.validate(this.s, s)) {
71-
this.s = s;
65+
final AtomicThrowable errors;
7266

73-
Observer<? super Observable<T>> a = actual;
74-
a.onSubscribe(this);
67+
final AtomicBoolean stopWindows;
7568

76-
if (cancelled) {
77-
return;
78-
}
69+
static final Object NEXT_WINDOW = new Object();
7970

80-
UnicastSubject<T> w = UnicastSubject.create(bufferSize);
71+
volatile boolean done;
8172

82-
window = w;
73+
UnicastSubject<T> window;
8374

84-
a.onNext(w);
75+
WindowBoundaryMainObserver(Observer<? super Observable<T>> downstream, int capacityHint) {
76+
this.downstream = downstream;
77+
this.capacityHint = capacityHint;
78+
this.boundaryObserver = new WindowBoundaryInnerObserver<T, B>(this);
79+
this.upstream = new AtomicReference<Disposable>();
80+
this.windows = new AtomicInteger(1);
81+
this.queue = new MpscLinkedQueue<Object>();
82+
this.errors = new AtomicThrowable();
83+
this.stopWindows = new AtomicBoolean();
84+
}
8585

86-
WindowBoundaryInnerObserver<T, B> inner = new WindowBoundaryInnerObserver<T, B>(this);
86+
@Override
87+
public void onSubscribe(Disposable d) {
88+
if (DisposableHelper.setOnce(upstream, d)) {
8789

88-
if (boundary.compareAndSet(null, inner)) {
89-
windows.getAndIncrement();
90-
other.subscribe(inner);
91-
}
90+
innerNext();
9291
}
9392
}
9493

9594
@Override
9695
public void onNext(T t) {
97-
if (fastEnter()) {
98-
UnicastSubject<T> w = window;
99-
100-
w.onNext(t);
96+
queue.offer(t);
97+
drain();
98+
}
10199

102-
if (leave(-1) == 0) {
103-
return;
104-
}
100+
@Override
101+
public void onError(Throwable e) {
102+
boundaryObserver.dispose();
103+
if (errors.addThrowable(e)) {
104+
done = true;
105+
drain();
105106
} else {
106-
queue.offer(NotificationLite.next(t));
107-
if (!enter()) {
108-
return;
109-
}
107+
RxJavaPlugins.onError(e);
110108
}
111-
drainLoop();
112109
}
113110

114111
@Override
115-
public void onError(Throwable t) {
116-
if (done) {
117-
RxJavaPlugins.onError(t);
118-
return;
119-
}
120-
error = t;
112+
public void onComplete() {
113+
boundaryObserver.dispose();
121114
done = true;
122-
if (enter()) {
123-
drainLoop();
124-
}
115+
drain();
116+
}
125117

126-
if (windows.decrementAndGet() == 0) {
127-
DisposableHelper.dispose(boundary);
118+
@Override
119+
public void dispose() {
120+
if (stopWindows.compareAndSet(false, true)) {
121+
boundaryObserver.dispose();
122+
if (windows.decrementAndGet() == 0) {
123+
DisposableHelper.dispose(upstream);
124+
}
128125
}
129-
130-
actual.onError(t);
131126
}
132127

133128
@Override
134-
public void onComplete() {
135-
if (done) {
136-
return;
137-
}
138-
done = true;
139-
if (enter()) {
140-
drainLoop();
141-
}
129+
public boolean isDisposed() {
130+
return stopWindows.get();
131+
}
142132

133+
@Override
134+
public void run() {
143135
if (windows.decrementAndGet() == 0) {
144-
DisposableHelper.dispose(boundary);
136+
DisposableHelper.dispose(upstream);
145137
}
138+
}
146139

147-
actual.onComplete();
148-
140+
void innerNext() {
141+
queue.offer(NEXT_WINDOW);
142+
drain();
149143
}
150144

151-
@Override
152-
public void dispose() {
153-
cancelled = true;
145+
void innerError(Throwable e) {
146+
DisposableHelper.dispose(upstream);
147+
if (errors.addThrowable(e)) {
148+
done = true;
149+
drain();
150+
} else {
151+
RxJavaPlugins.onError(e);
152+
}
154153
}
155154

156-
@Override
157-
public boolean isDisposed() {
158-
return cancelled;
155+
void innerComplete() {
156+
DisposableHelper.dispose(upstream);
157+
done = true;
158+
drain();
159159
}
160160

161-
void drainLoop() {
162-
final MpscLinkedQueue<Object> q = (MpscLinkedQueue<Object>)queue;
163-
final Observer<? super Observable<T>> a = actual;
161+
@SuppressWarnings("unchecked")
162+
void drain() {
163+
if (getAndIncrement() != 0) {
164+
return;
165+
}
166+
164167
int missed = 1;
165-
UnicastSubject<T> w = window;
168+
Observer<? super Observable<T>> downstream = this.downstream;
169+
MpscLinkedQueue<Object> queue = this.queue;
170+
AtomicThrowable errors = this.errors;
171+
166172
for (;;) {
167173

168174
for (;;) {
175+
if (windows.get() == 0) {
176+
queue.clear();
177+
window = null;
178+
return;
179+
}
180+
181+
UnicastSubject<T> w = window;
182+
169183
boolean d = done;
170184

171-
Object o = q.poll();
185+
if (d && errors.get() != null) {
186+
queue.clear();
187+
Throwable ex = errors.terminate();
188+
if (w != null) {
189+
window = null;
190+
w.onError(ex);
191+
}
192+
downstream.onError(ex);
193+
return;
194+
}
195+
196+
Object v = queue.poll();
172197

173-
boolean empty = o == null;
198+
boolean empty = v == null;
174199

175200
if (d && empty) {
176-
DisposableHelper.dispose(boundary);
177-
Throwable e = error;
178-
if (e != null) {
179-
w.onError(e);
201+
Throwable ex = errors.terminate();
202+
if (ex == null) {
203+
if (w != null) {
204+
window = null;
205+
w.onComplete();
206+
}
207+
downstream.onComplete();
180208
} else {
181-
w.onComplete();
209+
if (w != null) {
210+
window = null;
211+
w.onError(ex);
212+
}
213+
downstream.onError(ex);
182214
}
183215
return;
184216
}
@@ -187,48 +219,35 @@ void drainLoop() {
187219
break;
188220
}
189221

190-
if (o == NEXT) {
191-
w.onComplete();
192-
193-
if (windows.decrementAndGet() == 0) {
194-
DisposableHelper.dispose(boundary);
195-
return;
196-
}
197-
198-
if (cancelled) {
199-
continue;
200-
}
201-
202-
w = UnicastSubject.create(bufferSize);
222+
if (v != NEXT_WINDOW) {
223+
w.onNext((T)v);
224+
continue;
225+
}
203226

204-
windows.getAndIncrement();
227+
if (w != null) {
228+
window = null;
229+
w.onComplete();
230+
}
205231

232+
if (!stopWindows.get()) {
233+
w = UnicastSubject.create(capacityHint, this);
206234
window = w;
235+
windows.getAndIncrement();
207236

208-
a.onNext(w);
209-
210-
continue;
237+
downstream.onNext(w);
211238
}
212-
213-
w.onNext(NotificationLite.<T>getValue(o));
214239
}
215240

216-
missed = leave(-missed);
241+
missed = addAndGet(-missed);
217242
if (missed == 0) {
218-
return;
243+
break;
219244
}
220245
}
221246
}
222-
223-
void next() {
224-
queue.offer(NEXT);
225-
if (enter()) {
226-
drainLoop();
227-
}
228-
}
229247
}
230248

231249
static final class WindowBoundaryInnerObserver<T, B> extends DisposableObserver<B> {
250+
232251
final WindowBoundaryMainObserver<T, B> parent;
233252

234253
boolean done;
@@ -242,7 +261,7 @@ public void onNext(B t) {
242261
if (done) {
243262
return;
244263
}
245-
parent.next();
264+
parent.innerNext();
246265
}
247266

248267
@Override
@@ -252,7 +271,7 @@ public void onError(Throwable t) {
252271
return;
253272
}
254273
done = true;
255-
parent.onError(t);
274+
parent.innerError(t);
256275
}
257276

258277
@Override
@@ -261,7 +280,7 @@ public void onComplete() {
261280
return;
262281
}
263282
done = true;
264-
parent.onComplete();
283+
parent.innerComplete();
265284
}
266285
}
267286
}

0 commit comments

Comments
 (0)