17
17
import java .io .IOException ;
18
18
import java .nio .ByteBuffer ;
19
19
import java .util .concurrent .atomic .AtomicBoolean ;
20
- import java .util .concurrent .atomic .AtomicReference ;
21
20
22
21
import org .asynchttpclient .request .body .Body ;
23
22
import org .reactivestreams .Publisher ;
@@ -31,7 +30,7 @@ public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
31
30
32
31
private final Publisher <ByteBuffer > publisher ;
33
32
private final FeedableBodyGenerator feedableBodyGenerator ;
34
- private final AtomicReference < FeedListener > feedListener = new AtomicReference <>( null ) ;
33
+ private volatile FeedListener feedListener ;
35
34
36
35
public ReactiveStreamsBodyGenerator (Publisher <ByteBuffer > publisher ) {
37
36
this .publisher = publisher ;
@@ -49,7 +48,7 @@ public boolean feed(ByteBuffer buffer, boolean isLast) throws Exception {
49
48
50
49
@ Override
51
50
public void setListener (FeedListener listener ) {
52
- feedListener . set ( listener ) ;
51
+ feedListener = listener ;
53
52
feedableBodyGenerator .setListener (listener );
54
53
}
55
54
@@ -81,7 +80,7 @@ public long getContentLength() {
81
80
82
81
@ Override
83
82
public BodyState transferTo (ByteBuf target ) throws IOException {
84
- if (initialized .compareAndSet (false , true ))
83
+ if (initialized .compareAndSet (false , true ))
85
84
publisher .subscribe (subscriber );
86
85
87
86
return body .transferTo (target );
@@ -101,21 +100,22 @@ public SimpleSubscriber(FeedableBodyGenerator feeder) {
101
100
102
101
@ Override
103
102
public void onSubscribe (Subscription s ) {
104
- if (s == null ) throw null ;
103
+ if (s == null )
104
+ throw null ;
105
105
106
106
// If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
107
- if (this .subscription != null ) {
107
+ if (this .subscription != null ) {
108
108
s .cancel (); // Cancel the additional subscription
109
- }
110
- else {
111
- subscription = s ;
112
- subscription .request (Long .MAX_VALUE );
109
+ } else {
110
+ subscription = s ;
111
+ subscription .request (Long .MAX_VALUE );
113
112
}
114
113
}
115
114
116
115
@ Override
117
116
public void onNext (ByteBuffer t ) {
118
- if (t == null ) throw null ;
117
+ if (t == null )
118
+ throw null ;
119
119
try {
120
120
feeder .feed (t , false );
121
121
} catch (Exception e ) {
@@ -126,18 +126,19 @@ public void onNext(ByteBuffer t) {
126
126
127
127
@ Override
128
128
public void onError (Throwable t ) {
129
- if (t == null ) throw null ;
129
+ if (t == null )
130
+ throw null ;
130
131
LOGGER .debug ("Error occurred while consuming body stream." , t );
131
- FeedListener listener = feedListener .get ();
132
- if (listener != null ) listener .onError (t );
132
+ FeedListener listener = feedListener ;
133
+ if (listener != null )
134
+ listener .onError (t );
133
135
}
134
136
135
137
@ Override
136
138
public void onComplete () {
137
139
try {
138
- feeder .feed (EMPTY , true );
139
- }
140
- catch (Exception e ) {
140
+ feeder .feed (EMPTY , true );
141
+ } catch (Exception e ) {
141
142
LOGGER .info ("Ignoring exception occurred while completing stream processing." , e );
142
143
this .subscription .cancel ();
143
144
}
0 commit comments