File tree Expand file tree Collapse file tree 1 file changed +9
-2
lines changed
src/main/java/rx/internal/operators Expand file tree Collapse file tree 1 file changed +9
-2
lines changed Original file line number Diff line number Diff line change 21
21
22
22
import rx .*;
23
23
import rx .Completable .OnSubscribe ;
24
- import rx .internal .util .unsafe .MpscLinkedQueue ;
24
+ import rx .internal .util .atomic .MpscLinkedAtomicQueue ;
25
+ import rx .internal .util .unsafe .*;
25
26
import rx .subscriptions .CompositeSubscription ;
26
27
27
28
public final class CompletableOnSubscribeMergeDelayErrorIterable implements OnSubscribe {
@@ -53,7 +54,13 @@ public void call(final CompletableSubscriber s) {
53
54
54
55
final AtomicInteger wip = new AtomicInteger (1 );
55
56
56
- final Queue <Throwable > queue = new MpscLinkedQueue <Throwable >();
57
+ final Queue <Throwable > queue ;
58
+
59
+ if (UnsafeAccess .isUnsafeAvailable ()) {
60
+ queue = new MpscLinkedQueue <Throwable >();
61
+ } else {
62
+ queue = new MpscLinkedAtomicQueue <Throwable >();
63
+ }
57
64
58
65
for (;;) {
59
66
if (set .isUnsubscribed ()) {
You can’t perform that action at this time.
0 commit comments