Skip to content

Commit 17652fc

Browse files
Merge pull request ReactiveX#975 from benjchristensen/groupBy-fixes
GroupBy & Time Gap Fixes
2 parents 3741e7b + cbb6fda commit 17652fc

File tree

5 files changed

+211
-681
lines changed

5 files changed

+211
-681
lines changed

rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java

Lines changed: 147 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -15,152 +15,175 @@
1515
*/
1616
package rx.operators;
1717

18-
import java.util.LinkedList;
19-
import java.util.Queue;
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicReference;
2020

21+
import rx.Observable;
22+
import rx.Observer;
2123
import rx.Subscriber;
22-
import rx.subscriptions.CompositeSubscription;
2324

2425
/**
25-
* Buffers the incoming events until notified, then replays the
26-
* buffered events and continues as a simple pass-through subscriber.
27-
* @param <T> the streamed value type
26+
* A solution to the "time gap" problem that occurs with `groupBy` and `pivot` => https://github.com/Netflix/RxJava/issues/844
27+
*
28+
* This currently has temporary unbounded buffers. It needs to become bounded and then do one of two things:
29+
*
30+
* 1) blow up and make the user do something about it
31+
* 2) work with the backpressure solution ... still to be implemented (such as co-routines)
32+
*
33+
* Generally the buffer should be very short lived (milliseconds) and then stops being involved.
34+
* It can become a memory leak though if a GroupedObservable backed by this class is emitted but never subscribed to (such as filtered out).
35+
* In that case, either a time-bomb to throw away the buffer, or just blowing up and making the user do something about it is needed.
36+
*
37+
* For example, to filter out GroupedObservables, perhaps they need a silent `subscribe()` on them to just blackhole the data.
38+
*
39+
* This is an initial start at solving this problem and solves the immediate problem of `groupBy` and `pivot` and trades off the possibility of memory leak for deterministic functionality.
40+
*
41+
* @param <T>
2842
*/
29-
public class BufferUntilSubscriber<T> extends Subscriber<T> {
30-
/** The actual subscriber. */
31-
private final Subscriber<? super T> actual;
32-
/** Indicate the pass-through mode. */
33-
private volatile boolean passthroughMode;
34-
/** Protect mode transition. */
35-
private final Object gate = new Object();
36-
/** The buffered items. */
37-
private final Queue<Object> queue = new LinkedList<Object>();
38-
/** The queue capacity. */
39-
private final int capacity;
40-
private final NotificationLite<T> on = NotificationLite.instance();
43+
public class BufferUntilSubscriber<T> extends Observable<T> implements Observer<T> {
4144

42-
/**
43-
* Constructor that wraps the actual subscriber and shares its subscription.
44-
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
45-
* @param actual
46-
*/
47-
public BufferUntilSubscriber(int capacity, Subscriber<? super T> actual) {
48-
super(actual);
49-
this.actual = actual;
50-
this.capacity = capacity;
51-
}
52-
/**
53-
* Constructor that wraps the actual subscriber and uses the given composite
54-
* subscription.
55-
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
56-
* @param actual
57-
* @param cs
58-
*/
59-
public BufferUntilSubscriber(int capacity, Subscriber<? super T> actual, CompositeSubscription cs) {
60-
super(cs);
61-
this.actual = actual;
62-
this.capacity = capacity;
45+
public static <T> BufferUntilSubscriber<T> create() {
46+
return new BufferUntilSubscriber<T>(new AtomicReference<Observer<? super T>>(new BufferedObserver<T>()));
6347
}
64-
65-
/**
66-
* Call this method to replay the buffered events and continue as a pass-through subscriber.
67-
* If already in pass-through mode, this method is a no-op.
68-
*/
69-
public void enterPassthroughMode() {
70-
if (!passthroughMode) {
71-
synchronized (gate) {
72-
if (!passthroughMode) {
73-
while (!queue.isEmpty()) {
74-
Object o = queue.poll();
75-
if (!actual.isUnsubscribed()) {
76-
on.accept(actual, o);
77-
}
78-
}
79-
passthroughMode = true;
80-
gate.notifyAll();
48+
49+
private final AtomicReference<Observer<? super T>> observerRef;
50+
51+
private BufferUntilSubscriber(final AtomicReference<Observer<? super T>> observerRef) {
52+
super(new OnSubscribe<T>() {
53+
54+
@Override
55+
public void call(Subscriber<? super T> s) {
56+
// drain queued notifications before subscription
57+
// we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
58+
BufferedObserver<T> buffered = (BufferedObserver<T>) observerRef.get();
59+
Object o = null;
60+
while ((o = buffered.buffer.poll()) != null) {
61+
emit(s, o);
8162
}
63+
// register real observer for pass-thru ... and drain any further events received on first notification
64+
observerRef.set(new PassThruObserver<T>(s, buffered.buffer, observerRef));
8265
}
83-
}
66+
67+
});
68+
this.observerRef = observerRef;
69+
}
70+
71+
@Override
72+
public void onCompleted() {
73+
observerRef.get().onCompleted();
8474
}
75+
76+
@Override
77+
public void onError(Throwable e) {
78+
observerRef.get().onError(e);
79+
}
80+
8581
@Override
8682
public void onNext(T t) {
87-
if (!passthroughMode) {
88-
synchronized (gate) {
89-
if (!passthroughMode) {
90-
if (capacity < 0 || queue.size() < capacity) {
91-
queue.offer(on.next(t));
92-
return;
93-
}
94-
try {
95-
while (!passthroughMode) {
96-
gate.wait();
97-
}
98-
if (actual.isUnsubscribed()) {
99-
return;
100-
}
101-
} catch (InterruptedException ex) {
102-
Thread.currentThread().interrupt();
103-
actual.onError(ex);
104-
return;
105-
}
106-
}
83+
observerRef.get().onNext(t);
84+
}
85+
86+
/**
87+
* This is a temporary observer between buffering and the actual that gets into the line of notifications
88+
* from the producer and will drain the queue of any items received during the race of the initial drain and
89+
* switching this.
90+
*
91+
* It will then immediately swap itself out for the actual (after a single notification), but since this is now
92+
* being done on the same producer thread no further buffering will occur.
93+
*/
94+
private static class PassThruObserver<T> implements Observer<T> {
95+
96+
private final Observer<? super T> actual;
97+
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
98+
private final ConcurrentLinkedQueue<Object> buffer;
99+
private final AtomicReference<Observer<? super T>> observerRef;
100+
101+
PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer, AtomicReference<Observer<? super T>> observerRef) {
102+
this.actual = actual;
103+
this.buffer = buffer;
104+
this.observerRef = observerRef;
105+
}
106+
107+
@Override
108+
public void onCompleted() {
109+
drainIfNeededAndSwitchToActual();
110+
actual.onCompleted();
111+
}
112+
113+
@Override
114+
public void onError(Throwable e) {
115+
drainIfNeededAndSwitchToActual();
116+
actual.onError(e);
117+
}
118+
119+
@Override
120+
public void onNext(T t) {
121+
drainIfNeededAndSwitchToActual();
122+
actual.onNext(t);
123+
}
124+
125+
private void drainIfNeededAndSwitchToActual() {
126+
Object o = null;
127+
while ((o = buffer.poll()) != null) {
128+
emit(this, o);
107129
}
130+
// now we can safely change over to the actual and get rid of the pass-thru
131+
observerRef.set(actual);
108132
}
109-
actual.onNext(t);
133+
110134
}
111135

112-
@Override
113-
public void onError(Throwable e) {
114-
if (!passthroughMode) {
115-
synchronized (gate) {
116-
if (!passthroughMode) {
117-
if (capacity < 0 || queue.size() < capacity) {
118-
queue.offer(on.error(e));
119-
return;
120-
}
121-
try {
122-
while (!passthroughMode) {
123-
gate.wait();
124-
}
125-
if (actual.isUnsubscribed()) {
126-
return;
127-
}
128-
} catch (InterruptedException ex) {
129-
Thread.currentThread().interrupt();
130-
actual.onError(ex);
131-
return;
132-
}
133-
}
136+
private static class BufferedObserver<T> implements Observer<T> {
137+
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
138+
139+
@Override
140+
public void onCompleted() {
141+
buffer.add(COMPLETE_SENTINEL);
142+
}
143+
144+
@Override
145+
public void onError(Throwable e) {
146+
buffer.add(new ErrorSentinel(e));
147+
}
148+
149+
@Override
150+
public void onNext(T t) {
151+
if (t == null) {
152+
buffer.add(NULL_SENTINEL);
153+
} else {
154+
buffer.add(t);
134155
}
135156
}
136-
actual.onError(e);
157+
137158
}
138159

139-
@Override
140-
public void onCompleted() {
141-
if (!passthroughMode) {
142-
synchronized (gate) {
143-
if (!passthroughMode) {
144-
if (capacity < 0 || queue.size() < capacity) {
145-
queue.offer(on.completed());
146-
return;
147-
}
148-
try {
149-
while (!passthroughMode) {
150-
gate.wait();
151-
}
152-
if (actual.isUnsubscribed()) {
153-
return;
154-
}
155-
} catch (InterruptedException ex) {
156-
Thread.currentThread().interrupt();
157-
actual.onError(ex);
158-
return;
159-
}
160-
}
160+
private final static <T> void emit(Observer<T> s, Object v) {
161+
if (v instanceof Sentinel) {
162+
if (v == NULL_SENTINEL) {
163+
s.onNext(null);
164+
} else if (v == COMPLETE_SENTINEL) {
165+
s.onCompleted();
166+
} else if (v instanceof ErrorSentinel) {
167+
s.onError(((ErrorSentinel) v).e);
161168
}
169+
} else {
170+
s.onNext((T) v);
171+
}
172+
}
173+
174+
private static class Sentinel {
175+
176+
}
177+
178+
private static Sentinel NULL_SENTINEL = new Sentinel();
179+
private static Sentinel COMPLETE_SENTINEL = new Sentinel();
180+
181+
private static class ErrorSentinel extends Sentinel {
182+
final Throwable e;
183+
184+
ErrorSentinel(Throwable e) {
185+
this.e = e;
162186
}
163-
actual.onCompleted();
164187
}
165188

166189
}

rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import rx.functions.Action0;
2828
import rx.functions.Func1;
2929
import rx.observables.GroupedObservable;
30-
import rx.subjects.PublishSubject;
31-
import rx.subjects.Subject;
3230
import rx.subscriptions.CompositeSubscription;
3331
import rx.subscriptions.Subscriptions;
3432

@@ -51,43 +49,50 @@ public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K,
5149
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
5250
// and will unsubscribe on this parent if they are all unsubscribed
5351
return new Subscriber<T>(new CompositeSubscription()) {
54-
private final Map<K, Subject<T, T>> groups = new HashMap<K, Subject<T, T>>();
52+
private final Map<K, BufferUntilSubscriber<T>> groups = new HashMap<K, BufferUntilSubscriber<T>>();
5553
private final AtomicInteger completionCounter = new AtomicInteger(0);
56-
private final AtomicBoolean completed = new AtomicBoolean(false);
54+
private final AtomicBoolean completionEmitted = new AtomicBoolean(false);
55+
private final AtomicBoolean terminated = new AtomicBoolean(false);
5756

5857
@Override
5958
public void onCompleted() {
60-
completed.set(true);
61-
// if we receive onCompleted from our parent we onComplete children
62-
for (Subject<T, T> ps : groups.values()) {
63-
ps.onCompleted();
64-
}
59+
if (terminated.compareAndSet(false, true)) {
60+
// if we receive onCompleted from our parent we onComplete children
61+
for (BufferUntilSubscriber<T> ps : groups.values()) {
62+
ps.onCompleted();
63+
}
6564

66-
// special case for empty (no groups emitted)
67-
if (completionCounter.get() == 0) {
68-
childObserver.onCompleted();
65+
// special case for empty (no groups emitted)
66+
if (completionCounter.get() == 0) {
67+
// we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted
68+
if (completionEmitted.compareAndSet(false, true)) {
69+
childObserver.onCompleted();
70+
}
71+
}
6972
}
7073
}
7174

7275
@Override
7376
public void onError(Throwable e) {
74-
// we immediately tear everything down if we receive an error
75-
childObserver.onError(e);
77+
if (terminated.compareAndSet(false, true)) {
78+
// we immediately tear everything down if we receive an error
79+
childObserver.onError(e);
80+
}
7681
}
7782

7883
@Override
7984
public void onNext(T t) {
8085
try {
8186
final K key = keySelector.call(t);
82-
Subject<T, T> gps = groups.get(key);
87+
BufferUntilSubscriber<T> gps = groups.get(key);
8388
if (gps == null) {
8489
// this group doesn't exist
8590
if (childObserver.isUnsubscribed()) {
8691
// we have been unsubscribed on the outer so won't send any more groups
8792
return;
8893
}
89-
gps = PublishSubject.create();
90-
final Subject<T, T> _gps = gps;
94+
gps = BufferUntilSubscriber.create();
95+
final BufferUntilSubscriber<T> _gps = gps;
9196

9297
GroupedObservable<K, T> go = new GroupedObservable<K, T>(key, new OnSubscribe<T>() {
9398

@@ -136,15 +141,16 @@ public void onNext(T t) {
136141
}
137142

138143
private void completeInner() {
139-
if (completionCounter.decrementAndGet() == 0 && (completed.get() || childObserver.isUnsubscribed())) {
140-
if (childObserver.isUnsubscribed()) {
141-
// if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
142-
unsubscribe();
143-
}
144-
for (Subject<T, T> ps : groups.values()) {
145-
ps.onCompleted();
144+
// count can be < 0 because unsubscribe also calls this
145+
if (completionCounter.decrementAndGet() <= 0 && (terminated.get() || childObserver.isUnsubscribed())) {
146+
// completionEmitted ensures we only emit onCompleted once
147+
if (completionEmitted.compareAndSet(false, true)) {
148+
if (childObserver.isUnsubscribed()) {
149+
// if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
150+
unsubscribe();
151+
}
152+
childObserver.onCompleted();
146153
}
147-
childObserver.onCompleted();
148154
}
149155
}
150156

0 commit comments

Comments
 (0)