Skip to content

Commit 25f2364

Browse files
author
jmhofer
committed
Switched away from using take and skip again due to concurrency issues.
1 parent 5ef3600 commit 25f2364

File tree

1 file changed

+58
-47
lines changed

1 file changed

+58
-47
lines changed

rxjava-core/src/main/java/rx/operators/OperationScan.java

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@
2525
import rx.Observable;
2626
import rx.Observer;
2727
import rx.Subscription;
28-
import rx.util.AtomicObservableSubscription;
29-
import rx.util.functions.Action0;
30-
import rx.util.functions.Action1;
3128
import rx.util.functions.Func1;
3229
import rx.util.functions.Func2;
3330

@@ -67,41 +64,46 @@ public static <T> Func1<Observer<T>, Subscription> scan(Observable<T> sequence,
6764
private static class AccuWithoutInitialValue<T> implements Func1<Observer<T>, Subscription> {
6865
private final Observable<T> sequence;
6966
private final Func2<T, T, T> accumulatorFunction;
70-
private T initialValue;
71-
67+
68+
private AccumulatingObserver<T, T> accumulatingObserver;
69+
7270
private AccuWithoutInitialValue(Observable<T> sequence, Func2<T, T, T> accumulator) {
7371
this.sequence = sequence;
7472
this.accumulatorFunction = accumulator;
7573
}
7674

7775
@Override
7876
public Subscription call(final Observer<T> observer) {
79-
sequence.take(1).subscribe(new Action1<T>() {
77+
return sequence.subscribe(new Observer<T>() {
78+
79+
// has to be synchronized so that the initial value is always sent only once.
8080
@Override
81-
public void call(T value) {
82-
initialValue = value;
81+
public synchronized void onNext(T value) {
82+
if (accumulatingObserver == null) {
83+
observer.onNext(value);
84+
accumulatingObserver = new AccumulatingObserver<T, T>(observer, value, accumulatorFunction);
85+
} else {
86+
accumulatingObserver.onNext(value);
87+
}
8388
}
84-
}, new Action1<Exception>() {
89+
8590
@Override
86-
public void call(Exception e) {
91+
public void onError(Exception e) {
8792
observer.onError(e);
8893
}
89-
}, new Action0() {
94+
9095
@Override
91-
public void call() {
96+
public void onCompleted() {
9297
observer.onCompleted();
9398
}
9499
});
95-
Accumulator<T, T> scan = new Accumulator<T, T>(sequence.skip(1), initialValue, accumulatorFunction);
96-
return scan.call(observer);
97100
}
98101
}
99102

100103
private static class Accumulator<T, R> implements Func1<Observer<R>, Subscription> {
101104
private final Observable<T> sequence;
102105
private final R initialValue;
103106
private final Func2<R, T, R> accumulatorFunction;
104-
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
105107

106108
private Accumulator(Observable<T> sequence, R initialValue, Func2<R, T, R> accumulator) {
107109
this.sequence = sequence;
@@ -112,42 +114,51 @@ private Accumulator(Observable<T> sequence, R initialValue, Func2<R, T, R> accum
112114
@Override
113115
public Subscription call(final Observer<R> observer) {
114116
observer.onNext(initialValue);
115-
116-
return subscription.wrap(sequence.subscribe(new Observer<T>() {
117-
private R acc = initialValue;
118-
119-
/**
120-
* We must synchronize this because we can't allow
121-
* multiple threads to execute the 'accumulatorFunction' at the same time because
122-
* the accumulator code very often will be doing mutation of the 'acc' object such as a non-threadsafe HashMap
123-
*
124-
* Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded
125-
*/
126-
@Override
127-
public synchronized void onNext(T value) {
128-
try {
129-
acc = accumulatorFunction.call(acc, value);
130-
observer.onNext(acc);
131-
} catch (Exception ex) {
132-
observer.onError(ex);
133-
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
134-
subscription.unsubscribe();
135-
}
136-
}
117+
return sequence.subscribe(new AccumulatingObserver<T, R>(observer, initialValue, accumulatorFunction));
118+
}
119+
}
137120

138-
@Override
139-
public void onError(Exception ex) {
140-
observer.onError(ex);
141-
}
121+
private static class AccumulatingObserver<T, R> implements Observer<T> {
122+
private final Observer<R> observer;
123+
private final Func2<R, T, R> accumulatorFunction;
142124

143-
@Override
144-
public void onCompleted() {
145-
observer.onCompleted();
146-
}
147-
}));
125+
private R acc;
126+
127+
private AccumulatingObserver(Observer<R> observer, R initialValue, Func2<R, T, R> accumulator) {
128+
this.observer = observer;
129+
this.accumulatorFunction = accumulator;
130+
131+
this.acc = initialValue;
148132
}
149-
}
150133

134+
/**
135+
* We must synchronize this because we can't allow
136+
* multiple threads to execute the 'accumulatorFunction' at the same time because
137+
* the accumulator code very often will be doing mutation of the 'acc' object such as a non-threadsafe HashMap
138+
*
139+
* Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded
140+
*/
141+
@Override
142+
public synchronized void onNext(T value) {
143+
try {
144+
acc = accumulatorFunction.call(acc, value);
145+
observer.onNext(acc);
146+
} catch (Exception ex) {
147+
observer.onError(ex);
148+
}
149+
}
150+
151+
@Override
152+
public void onError(Exception e) {
153+
observer.onError(e);
154+
}
155+
156+
@Override
157+
public void onCompleted() {
158+
observer.onCompleted();
159+
}
160+
}
161+
151162
public static class UnitTest {
152163

153164
@Before

0 commit comments

Comments
 (0)