Skip to content

Commit 8525f6f

Browse files
UnsafeSubscribe
Migrate from reflection to unsafeSubscribe as per discussion at ReactiveX#676
1 parent 14dda8b commit 8525f6f

File tree

77 files changed

+510
-560
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+510
-560
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 209 additions & 340 deletions
Large diffs are not rendered by default.

rxjava-core/src/main/java/rx/joins/JoinObserver1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void addActivePlan(ActivePlan0 activePlan) {
6161
public void subscribe(Object gate) {
6262
if (subscribed.compareAndSet(false, true)) {
6363
this.gate = gate;
64-
source.materialize().subscribe(this);
64+
source.materialize().unsafeSubscribe(this);
6565
} else {
6666
throw new IllegalStateException("Can only be subscribed to once.");
6767
}

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
7272
* "Guideline 6.4: Protect calls to user code from within an operator"
7373
*/
7474
private Subscription protectivelyWrapAndSubscribe(Subscriber<? super T> observer) {
75-
return o.subscribe(new SafeSubscriber<T>(observer));
75+
return o.unsafeSubscribe(new SafeSubscriber<T>(observer));
7676
}
7777

7878
/**

rxjava-core/src/main/java/rx/observables/GroupedObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public static <K, T> GroupedObservable<K, T> from(K key, final Observable<T> o)
3737

3838
@Override
3939
public void call(Subscriber<? super T> s) {
40-
o.subscribe(s);
40+
o.unsafeSubscribe(s);
4141
}
4242
});
4343
}

rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void onNext(T t) {
9191
* It will then immediately swap itself out for the actual (after a single notification), but since this is now
9292
* being done on the same producer thread no further buffering will occur.
9393
*/
94-
private static class PassThruObserver<T> implements Observer<T> {
94+
private static class PassThruObserver<T> extends Subscriber<T> {
9595

9696
private final Observer<? super T> actual;
9797
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
@@ -133,7 +133,7 @@ private void drainIfNeededAndSwitchToActual() {
133133

134134
}
135135

136-
private static class BufferedObserver<T> implements Observer<T> {
136+
private static class BufferedObserver<T> extends Subscriber<T> {
137137
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
138138

139139
@Override

rxjava-core/src/main/java/rx/operators/ChunkedOperation.java

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import rx.Observer;
2929
import rx.Scheduler;
3030
import rx.Scheduler.Inner;
31+
import rx.Subscriber;
3132
import rx.Subscription;
3233
import rx.functions.Action1;
3334
import rx.functions.Func0;
3435
import rx.functions.Func1;
36+
import rx.subscriptions.CompositeSubscription;
3537

3638
/**
3739
* The base class for operations that break observables into "chunks". Currently buffers and windows.
@@ -408,7 +410,7 @@ public void pushValue(T value) {
408410
* The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record.
409411
* <C> The type of object being tracked by the {@link Chunk}
410412
*/
411-
protected static class ChunkObserver<T, C> implements Observer<T> {
413+
protected static class ChunkObserver<T, C> extends Subscriber<T> {
412414

413415
private final Chunks<T, C> chunks;
414416
private final Observer<? super C> observer;
@@ -492,12 +494,24 @@ public ObservableBasedSingleChunkCreator(NonOverlappingChunks<T, C> chunks, Func
492494

493495
private void listenForChunkEnd() {
494496
Observable<? extends TClosing> closingObservable = chunkClosingSelector.call();
495-
closingObservable.subscribe(new Action1<TClosing>() {
497+
closingObservable.unsafeSubscribe(new Subscriber<TClosing>() {
498+
499+
@Override
500+
public void onCompleted() {
501+
502+
}
503+
504+
@Override
505+
public void onError(Throwable e) {
506+
507+
}
508+
496509
@Override
497-
public void call(TClosing closing) {
510+
public void onNext(TClosing t) {
498511
chunks.emitAndReplaceChunk();
499-
listenForChunkEnd();
512+
listenForChunkEnd();
500513
}
514+
501515
});
502516
}
503517

@@ -524,23 +538,47 @@ public void stop() {
524538
*/
525539
protected static class ObservableBasedMultiChunkCreator<T, C, TOpening, TClosing> implements ChunkCreator {
526540

527-
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
541+
private final CompositeSubscription subscription = new CompositeSubscription();
528542

529543
public ObservableBasedMultiChunkCreator(final OverlappingChunks<T, C> chunks, Observable<? extends TOpening> openings, final Func1<? super TOpening, ? extends Observable<? extends TClosing>> chunkClosingSelector) {
530-
subscription.wrap(openings.subscribe(new Action1<TOpening>() {
544+
openings.unsafeSubscribe(new Subscriber<TOpening>(subscription) {
545+
546+
@Override
547+
public void onCompleted() {
548+
549+
}
550+
531551
@Override
532-
public void call(TOpening opening) {
552+
public void onError(Throwable e) {
553+
554+
}
555+
556+
@Override
557+
public void onNext(TOpening opening) {
533558
final Chunk<T, C> chunk = chunks.createChunk();
534559
Observable<? extends TClosing> closingObservable = chunkClosingSelector.call(opening);
535560

536-
closingObservable.subscribe(new Action1<TClosing>() {
561+
closingObservable.unsafeSubscribe(new Subscriber<TClosing>() {
562+
537563
@Override
538-
public void call(TClosing closing) {
539-
chunks.emitChunk(chunk);
564+
public void onCompleted() {
565+
540566
}
541-
});
567+
568+
@Override
569+
public void onError(Throwable e) {
570+
571+
}
572+
573+
@Override
574+
public void onNext(TClosing t) {
575+
chunks.emitChunk(chunk);
576+
}
577+
578+
});
542579
}
543-
}));
580+
581+
});
544582
}
545583

546584
@Override

rxjava-core/src/main/java/rx/operators/NotificationLite.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package rx.operators;
22

3-
import java.io.ObjectStreamException;
43
import java.io.Serializable;
54

65
import rx.Notification;

rxjava-core/src/main/java/rx/operators/OperationAll.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import rx.Observable;
2121
import rx.Observable.OnSubscribeFunc;
2222
import rx.Observer;
23+
import rx.Subscriber;
2324
import rx.Subscription;
2425
import rx.functions.Func1;
2526

@@ -39,20 +40,18 @@ private static class AllObservable<T> implements OnSubscribeFunc<Boolean> {
3940
private final Observable<? extends T> sequence;
4041
private final Func1<? super T, Boolean> predicate;
4142

42-
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
43-
4443
private AllObservable(Observable<? extends T> sequence, Func1<? super T, Boolean> predicate) {
4544
this.sequence = sequence;
4645
this.predicate = predicate;
4746
}
4847

4948
@Override
5049
public Subscription onSubscribe(final Observer<? super Boolean> observer) {
51-
return subscription.wrap(sequence.subscribe(new AllObserver(observer)));
50+
return sequence.unsafeSubscribe(new AllObserver(observer));
5251

5352
}
5453

55-
private class AllObserver implements Observer<T> {
54+
private class AllObserver extends Subscriber<T> {
5655
private final Observer<? super Boolean> underlying;
5756

5857
private final AtomicBoolean status = new AtomicBoolean(true);
@@ -82,7 +81,7 @@ public void onNext(T args) {
8281
if (changed && !result) {
8382
underlying.onNext(false);
8483
underlying.onCompleted();
85-
subscription.unsubscribe();
84+
unsubscribe();
8685
}
8786
}
8887
}

rxjava-core/src/main/java/rx/operators/OperationAny.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
*/
1616
package rx.operators;
1717

18-
import static rx.functions.Functions.*;
18+
import static rx.functions.Functions.alwaysTrue;
1919

2020
import java.util.concurrent.atomic.AtomicBoolean;
2121

2222
import rx.Observable;
2323
import rx.Observable.OnSubscribeFunc;
2424
import rx.Observer;
25+
import rx.Subscriber;
2526
import rx.Subscription;
2627
import rx.functions.Func1;
2728

@@ -81,7 +82,7 @@ private Any(Observable<? extends T> source, Func1<? super T, Boolean> predicate,
8182
@Override
8283
public Subscription onSubscribe(final Observer<? super Boolean> observer) {
8384
final SafeObservableSubscription subscription = new SafeObservableSubscription();
84-
return subscription.wrap(source.subscribe(new Observer<T>() {
85+
return subscription.wrap(source.unsafeSubscribe(new Subscriber<T>() {
8586

8687
private final AtomicBoolean hasEmitted = new AtomicBoolean(false);
8788

rxjava-core/src/main/java/rx/operators/OperationAsObservable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.Observable.OnSubscribeFunc;
2020
import rx.Observer;
2121
import rx.Subscription;
22+
import rx.observers.Subscribers;
2223

2324
/**
2425
* Hides the identity of another observable.
@@ -34,7 +35,7 @@ public OperationAsObservable(Observable<? extends T> source) {
3435
}
3536

3637
@Override
37-
public Subscription onSubscribe(Observer<? super T> t1) {
38-
return source.subscribe(t1);
38+
public Subscription onSubscribe(final Observer<? super T> t1) {
39+
return source.unsafeSubscribe(Subscribers.from(t1));
3940
}
4041
}

rxjava-core/src/main/java/rx/operators/OperationAverage.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Observable.OnSubscribeFunc;
2020
import rx.Observer;
21+
import rx.Subscriber;
2122
import rx.Subscription;
2223
import rx.functions.Func1;
2324
import rx.functions.Func2;
@@ -124,11 +125,11 @@ public AverageIntegerExtractor(Observable<? extends T> source, Func1<? super T,
124125

125126
@Override
126127
public Subscription onSubscribe(Observer<? super Integer> t1) {
127-
return source.subscribe(new AverageObserver(t1));
128+
return source.unsafeSubscribe(new AverageObserver(t1));
128129
}
129130

130131
/** Computes the average. */
131-
private final class AverageObserver implements Observer<T> {
132+
private final class AverageObserver extends Subscriber<T> {
132133
final Observer<? super Integer> observer;
133134
int sum;
134135
int count;
@@ -184,11 +185,11 @@ public AverageLongExtractor(Observable<? extends T> source, Func1<? super T, Lon
184185

185186
@Override
186187
public Subscription onSubscribe(Observer<? super Long> t1) {
187-
return source.subscribe(new AverageObserver(t1));
188+
return source.unsafeSubscribe(new AverageObserver(t1));
188189
}
189190

190191
/** Computes the average. */
191-
private final class AverageObserver implements Observer<T> {
192+
private final class AverageObserver extends Subscriber<T> {
192193
final Observer<? super Long> observer;
193194
long sum;
194195
int count;
@@ -244,11 +245,11 @@ public AverageFloatExtractor(Observable<? extends T> source, Func1<? super T, Fl
244245

245246
@Override
246247
public Subscription onSubscribe(Observer<? super Float> t1) {
247-
return source.subscribe(new AverageObserver(t1));
248+
return source.unsafeSubscribe(new AverageObserver(t1));
248249
}
249250

250251
/** Computes the average. */
251-
private final class AverageObserver implements Observer<T> {
252+
private final class AverageObserver extends Subscriber<T> {
252253
final Observer<? super Float> observer;
253254
float sum;
254255
int count;
@@ -304,11 +305,11 @@ public AverageDoubleExtractor(Observable<? extends T> source, Func1<? super T, D
304305

305306
@Override
306307
public Subscription onSubscribe(Observer<? super Double> t1) {
307-
return source.subscribe(new AverageObserver(t1));
308+
return source.unsafeSubscribe(new AverageObserver(t1));
308309
}
309310

310311
/** Computes the average. */
311-
private final class AverageObserver implements Observer<T> {
312+
private final class AverageObserver extends Subscriber<T> {
312313
final Observer<? super Double> observer;
313314
double sum;
314315
int count;

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.Observable.OnSubscribeFunc;
2525
import rx.Observer;
2626
import rx.Scheduler;
27+
import rx.Subscriber;
2728
import rx.Subscription;
2829
import rx.functions.Func0;
2930
import rx.functions.Func1;
@@ -71,7 +72,7 @@ public Subscription onSubscribe(Observer<? super List<T>> observer) {
7172
ChunkCreator creator = new ObservableBasedSingleChunkCreator<T, List<T>, TClosing>(buffers, bufferClosingSelector);
7273
return new CompositeSubscription(
7374
new ChunkToSubscription(creator),
74-
source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator)));
75+
source.unsafeSubscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator)));
7576
}
7677
};
7778
}
@@ -111,7 +112,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
111112
ChunkCreator creator = new ObservableBasedMultiChunkCreator<T, List<T>, TOpening, TClosing>(buffers, bufferOpenings, bufferClosingSelector);
112113
return new CompositeSubscription(
113114
new ChunkToSubscription(creator),
114-
source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator)));
115+
source.unsafeSubscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator)));
115116
}
116117
};
117118
}
@@ -170,7 +171,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
170171
ChunkCreator creator = new SkippingChunkCreator<T, List<T>>(chunks, skip);
171172
return new CompositeSubscription(
172173
new ChunkToSubscription(creator),
173-
source.subscribe(new ChunkObserver<T, List<T>>(chunks, observer, creator)));
174+
source.unsafeSubscribe(new ChunkObserver<T, List<T>>(chunks, observer, creator)));
174175
}
175176
};
176177
}
@@ -229,7 +230,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
229230
ChunkCreator creator = new TimeBasedChunkCreator<T, List<T>>(buffers, timespan, unit, scheduler);
230231
return new CompositeSubscription(
231232
new ChunkToSubscription(creator),
232-
source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator)));
233+
source.unsafeSubscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator)));
233234
}
234235
};
235236
}
@@ -295,7 +296,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
295296
return new CompositeSubscription(
296297
chunks,
297298
new ChunkToSubscription(creator),
298-
source.subscribe(new ChunkObserver<T, List<T>>(chunks, observer, creator)));
299+
source.unsafeSubscribe(new ChunkObserver<T, List<T>>(chunks, observer, creator)));
299300
}
300301
};
301302
}
@@ -361,7 +362,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
361362
return new CompositeSubscription(
362363
buffers,
363364
new ChunkToSubscription(creator),
364-
source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator)));
365+
source.unsafeSubscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator)));
365366
}
366367
};
367368
}
@@ -460,16 +461,16 @@ public Subscription onSubscribe(Observer<? super List<T>> t1) {
460461
CompositeSubscription csub = new CompositeSubscription();
461462

462463
SourceObserver<T> so = new SourceObserver<T>(t1, initialCapacity, csub);
463-
csub.add(source.subscribe(so));
464-
csub.add(boundary.subscribe(new BoundaryObserver<B>(so)));
464+
csub.add(source.unsafeSubscribe(so));
465+
csub.add(boundary.unsafeSubscribe(new BoundaryObserver<B>(so)));
465466

466467
return csub;
467468
}
468469

469470
/**
470471
* Observes the source.
471472
*/
472-
private static final class SourceObserver<T> implements Observer<T> {
473+
private static final class SourceObserver<T> extends Subscriber<T> {
473474
final Observer<? super List<T>> observer;
474475
/** The buffer, if null, that indicates a terminal state. */
475476
List<T> buffer;
@@ -539,7 +540,7 @@ void emitAndComplete() {
539540
/**
540541
* Observes the boundary.
541542
*/
542-
private static final class BoundaryObserver<T> implements Observer<T> {
543+
private static final class BoundaryObserver<T> extends Subscriber<T> {
543544
final SourceObserver so;
544545

545546
public BoundaryObserver(SourceObserver so) {

0 commit comments

Comments
 (0)