Skip to content

Commit 214522c

Browse files
committed
Don't use a ProgressivePromise when we write a request without a body , close AsyncHttpClient#1144
1 parent 79ddf22 commit 214522c

File tree

8 files changed

+142
-71
lines changed

8 files changed

+142
-71
lines changed

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelFuture;
2424
import io.netty.channel.ChannelProgressivePromise;
25+
import io.netty.channel.ChannelPromise;
2526
import io.netty.handler.codec.http.DefaultHttpHeaders;
2627
import io.netty.handler.codec.http.HttpHeaders;
2728
import io.netty.handler.codec.http.HttpMethod;
@@ -277,7 +278,8 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
277278
@Override
278279
protected void onSuccess(List<InetSocketAddress> addresses) {
279280
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, channelPreempted, partitionKey);
280-
new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed, config).connect(bootstrap, connectListener);
281+
new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed, config).connect(bootstrap,
282+
connectListener);
281283
}
282284

283285
@Override
@@ -327,12 +329,21 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
327329
boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue() && httpRequest.getMethod() != HttpMethod.CONNECT && nettyRequest.getBody() != null;
328330

329331
if (!future.isHeadersAlreadyWrittenOnContinue()) {
330-
if (future.getAsyncHandler() instanceof AsyncHandlerExtensions)
331-
AsyncHandlerExtensions.class.cast(future.getAsyncHandler()).onRequestSend(nettyRequest);
332-
333-
ChannelProgressivePromise promise = channel.newProgressivePromise();
334-
ChannelFuture f = writeBody ? channel.write(httpRequest, promise) : channel.writeAndFlush(httpRequest, promise);
335-
f.addListener(new ProgressListener(future.getAsyncHandler(), future, true, 0L));
332+
if (handler instanceof AsyncHandlerExtensions) {
333+
AsyncHandlerExtensions.class.cast(handler).onRequestSend(nettyRequest);
334+
}
335+
336+
// if the request has a body, we want to track progress
337+
if (writeBody) {
338+
ChannelProgressivePromise promise = channel.newProgressivePromise();
339+
ChannelFuture f = channel.write(httpRequest, promise);
340+
f.addListener(new WriteProgressListener(future, true, 0L));
341+
} else {
342+
// we can just track write completion
343+
ChannelPromise promise = channel.newPromise();
344+
ChannelFuture f = channel.writeAndFlush(httpRequest, promise);
345+
f.addListener(new WriteCompleteListener(future));
346+
}
336347
}
337348

338349
if (writeBody)
@@ -388,7 +399,7 @@ public void handleUnexpectedClosedChannel(Channel channel, NettyResponseFuture<?
388399
} else if (retry(future)) {
389400
future.pendingException = null;
390401
} else {
391-
abort(channel, future, future.pendingException != null? future.pendingException : RemotelyClosedException.INSTANCE);
402+
abort(channel, future, future.pendingException != null ? future.pendingException : RemotelyClosedException.INSTANCE);
392403
}
393404
}
394405

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.request;
15+
16+
import org.asynchttpclient.netty.NettyResponseFuture;
17+
18+
import io.netty.channel.ChannelFuture;
19+
import io.netty.util.concurrent.GenericFutureListener;
20+
21+
public class WriteCompleteListener extends WriteListener implements GenericFutureListener<ChannelFuture> {
22+
23+
public WriteCompleteListener(NettyResponseFuture<?> future) {
24+
super(future, true);
25+
}
26+
27+
@Override
28+
public void operationComplete(ChannelFuture future) throws Exception {
29+
operationComplete(future.channel(), future.cause());
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
2+
* Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
33
*
44
* This program is licensed to you under the Apache License Version 2.0,
55
* and you may not use this file except in compliance with the Apache License Version 2.0.
@@ -14,12 +14,9 @@
1414
package org.asynchttpclient.netty.request;
1515

1616
import io.netty.channel.Channel;
17-
import io.netty.channel.ChannelProgressiveFuture;
18-
import io.netty.channel.ChannelProgressiveFutureListener;
1917

2018
import java.nio.channels.ClosedChannelException;
2119

22-
import org.asynchttpclient.AsyncHandler;
2320
import org.asynchttpclient.handler.ProgressAsyncHandler;
2421
import org.asynchttpclient.netty.NettyResponseFuture;
2522
import org.asynchttpclient.netty.channel.ChannelState;
@@ -28,33 +25,25 @@
2825
import org.slf4j.Logger;
2926
import org.slf4j.LoggerFactory;
3027

31-
public class ProgressListener implements ChannelProgressiveFutureListener {
28+
public abstract class WriteListener {
3229

33-
private static final Logger LOGGER = LoggerFactory.getLogger(ProgressListener.class);
30+
private static final Logger LOGGER = LoggerFactory.getLogger(WriteListener.class);
31+
protected final NettyResponseFuture<?> future;
32+
protected final ProgressAsyncHandler<?> progressAsyncHandler;
33+
protected final boolean notifyHeaders;
3434

35-
private final AsyncHandler<?> asyncHandler;
36-
private final NettyResponseFuture<?> future;
37-
private final boolean notifyHeaders;
38-
private final long expectedTotal;
39-
private long lastProgress = 0L;
40-
41-
public ProgressListener(AsyncHandler<?> asyncHandler,//
42-
NettyResponseFuture<?> future,//
43-
boolean notifyHeaders,//
44-
long expectedTotal) {
45-
this.asyncHandler = asyncHandler;
35+
public WriteListener(NettyResponseFuture<?> future, boolean notifyHeaders) {
4636
this.future = future;
37+
this.progressAsyncHandler = future.getAsyncHandler() instanceof ProgressAsyncHandler ? (ProgressAsyncHandler<?>) future.getAsyncHandler() : null;
4738
this.notifyHeaders = notifyHeaders;
48-
this.expectedTotal = expectedTotal;
4939
}
5040

51-
private boolean abortOnThrowable(Throwable cause, Channel channel) {
52-
41+
private boolean abortOnThrowable(Channel channel, Throwable cause) {
5342
if (cause != null && future.getChannelState() != ChannelState.NEW) {
5443
if (cause instanceof IllegalStateException || cause instanceof ClosedChannelException || StackTraceInspector.recoverOnReadOrWriteException(cause)) {
5544
LOGGER.debug(cause.getMessage(), cause);
5645
Channels.silentlyCloseChannel(channel);
57-
46+
5847
} else {
5948
future.abort(cause);
6049
}
@@ -64,24 +53,23 @@ private boolean abortOnThrowable(Throwable cause, Channel channel) {
6453
return false;
6554
}
6655

67-
@Override
68-
public void operationComplete(ChannelProgressiveFuture cf) {
56+
protected void operationComplete(Channel channel, Throwable cause) {
57+
future.touch();
58+
6959
// The write operation failed. If the channel was cached, it means it got asynchronously closed.
7060
// Let's retry a second time.
71-
if (!abortOnThrowable(cf.cause(), cf.channel())) {
72-
73-
future.touch();
61+
if (abortOnThrowable(channel, cause)) {
62+
return;
63+
}
7464

65+
if (progressAsyncHandler != null) {
7566
/**
76-
* We need to make sure we aren't in the middle of an authorization
77-
* process before publishing events as we will re-publish again the
78-
* same event after the authorization, causing unpredictable
79-
* behavior.
67+
* We need to make sure we aren't in the middle of an authorization process before publishing events as we will re-publish again the same event after the authorization,
68+
* causing unpredictable behavior.
8069
*/
8170
boolean startPublishing = !future.getInAuth().get() && !future.getInProxyAuth().get();
82-
83-
if (startPublishing && asyncHandler instanceof ProgressAsyncHandler) {
84-
ProgressAsyncHandler<?> progressAsyncHandler = (ProgressAsyncHandler<?>) asyncHandler;
71+
if (startPublishing) {
72+
8573
if (notifyHeaders) {
8674
progressAsyncHandler.onHeadersWritten();
8775
} else {
@@ -90,16 +78,4 @@ public void operationComplete(ChannelProgressiveFuture cf) {
9078
}
9179
}
9280
}
93-
94-
@Override
95-
public void operationProgressed(ChannelProgressiveFuture f, long progress, long total) {
96-
future.touch();
97-
if (!notifyHeaders && asyncHandler instanceof ProgressAsyncHandler) {
98-
long lastLastProgress = lastProgress;
99-
lastProgress = progress;
100-
if (total < 0)
101-
total = expectedTotal;
102-
ProgressAsyncHandler.class.cast(asyncHandler).onContentWriteProgress(progress - lastLastProgress, progress, total);
103-
}
104-
}
10581
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.request;
15+
16+
import io.netty.channel.ChannelProgressiveFuture;
17+
import io.netty.channel.ChannelProgressiveFutureListener;
18+
19+
import org.asynchttpclient.netty.NettyResponseFuture;
20+
21+
public class WriteProgressListener extends WriteListener implements ChannelProgressiveFutureListener {
22+
23+
private final long expectedTotal;
24+
private long lastProgress = 0L;
25+
26+
public WriteProgressListener(NettyResponseFuture<?> future,//
27+
boolean notifyHeaders,//
28+
long expectedTotal) {
29+
super(future, notifyHeaders);
30+
this.expectedTotal = expectedTotal;
31+
}
32+
33+
@Override
34+
public void operationComplete(ChannelProgressiveFuture cf) {
35+
operationComplete(cf.channel(), cf.cause());
36+
}
37+
38+
@Override
39+
public void operationProgressed(ChannelProgressiveFuture f, long progress, long total) {
40+
future.touch();
41+
42+
if (progressAsyncHandler != null && !notifyHeaders) {
43+
long lastLastProgress = lastProgress;
44+
lastProgress = progress;
45+
if (total < 0) {
46+
total = expectedTotal;
47+
}
48+
progressAsyncHandler.onContentWriteProgress(progress - lastLastProgress, progress, total);
49+
}
50+
}
51+
}

client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.asynchttpclient.AsyncHttpClientConfig;
2626
import org.asynchttpclient.netty.NettyResponseFuture;
2727
import org.asynchttpclient.netty.channel.ChannelManager;
28-
import org.asynchttpclient.netty.request.ProgressListener;
28+
import org.asynchttpclient.netty.request.WriteProgressListener;
2929
import org.asynchttpclient.request.body.Body;
3030
import org.asynchttpclient.request.body.RandomAccessBody;
3131
import org.asynchttpclient.request.body.generator.BodyGenerator;
@@ -75,19 +75,21 @@ public void write(final Channel channel, NettyResponseFuture<?> future) throws I
7575
public void onContentAdded() {
7676
chunkedWriteHandler.resumeTransfer();
7777
}
78+
7879
@Override
79-
public void onError(Throwable t) {}
80+
public void onError(Throwable t) {
81+
}
8082
});
8183
}
8284
}
83-
ChannelFuture writeFuture = channel.write(msg, channel.newProgressivePromise());
8485

85-
writeFuture.addListener(new ProgressListener(future.getAsyncHandler(), future, false, getContentLength()) {
86+
ChannelFuture writeFuture = channel.write(msg, channel.newProgressivePromise());
87+
writeFuture.addListener(new WriteProgressListener(future, false, getContentLength()) {
8688
public void operationComplete(ChannelProgressiveFuture cf) {
8789
closeSilently(body);
8890
super.operationComplete(cf);
8991
}
9092
});
91-
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
93+
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise());
9294
}
9395
}

client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.asynchttpclient.AsyncHttpClientConfig;
2727
import org.asynchttpclient.netty.NettyResponseFuture;
2828
import org.asynchttpclient.netty.channel.ChannelManager;
29-
import org.asynchttpclient.netty.request.ProgressListener;
29+
import org.asynchttpclient.netty.request.WriteProgressListener;
3030

3131
public class NettyFileBody implements NettyBody {
3232

@@ -78,7 +78,7 @@ public void write(Channel channel, NettyResponseFuture<?> future) throws IOExcep
7878
: new DefaultFileRegion(fileChannel, offset, length);
7979

8080
channel.write(message, channel.newProgressivePromise())//
81-
.addListener(new ProgressListener(future.getAsyncHandler(), future, false, getContentLength()));
82-
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
81+
.addListener(new WriteProgressListener(future, false, getContentLength()));
82+
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise());
8383
}
8484
}

client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import static org.asynchttpclient.util.MiscUtils.closeSilently;
1717

1818
import org.asynchttpclient.netty.NettyResponseFuture;
19-
import org.asynchttpclient.netty.request.ProgressListener;
19+
import org.asynchttpclient.netty.request.WriteProgressListener;
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

@@ -68,12 +68,12 @@ public void write(Channel channel, NettyResponseFuture<?> future) throws IOExcep
6868
}
6969

7070
channel.write(new ChunkedStream(is), channel.newProgressivePromise()).addListener(
71-
new ProgressListener(future.getAsyncHandler(), future, false, getContentLength()) {
71+
new WriteProgressListener(future, false, getContentLength()) {
7272
public void operationComplete(ChannelProgressiveFuture cf) {
7373
closeSilently(is);
7474
super.operationComplete(cf);
7575
}
7676
});
77-
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
77+
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise());
7878
}
7979
}

client/src/main/java/org/asynchttpclient/netty/ws/NettyWebSocket.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,43 +87,43 @@ public SocketAddress getLocalAddress() {
8787

8888
@Override
8989
public WebSocket sendMessage(byte[] message) {
90-
channel.writeAndFlush(new BinaryWebSocketFrame(wrappedBuffer(message)));
90+
channel.writeAndFlush(new BinaryWebSocketFrame(wrappedBuffer(message)), channel.voidPromise());
9191
return this;
9292
}
9393

9494
@Override
9595
public WebSocket stream(byte[] fragment, boolean last) {
96-
channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment)));
96+
channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment)), channel.voidPromise());
9797
return this;
9898
}
9999

100100
@Override
101101
public WebSocket stream(byte[] fragment, int offset, int len, boolean last) {
102-
channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment, offset, len)));
102+
channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment, offset, len)), channel.voidPromise());
103103
return this;
104104
}
105105

106106
@Override
107107
public WebSocket sendMessage(String message) {
108-
channel.writeAndFlush(new TextWebSocketFrame(message));
108+
channel.writeAndFlush(new TextWebSocketFrame(message), channel.voidPromise());
109109
return this;
110110
}
111111

112112
@Override
113113
public WebSocket stream(String fragment, boolean last) {
114-
channel.writeAndFlush(new TextWebSocketFrame(last, 0, fragment));
114+
channel.writeAndFlush(new TextWebSocketFrame(last, 0, fragment), channel.voidPromise());
115115
return this;
116116
}
117117

118118
@Override
119119
public WebSocket sendPing(byte[] payload) {
120-
channel.writeAndFlush(new PingWebSocketFrame(wrappedBuffer(payload)));
120+
channel.writeAndFlush(new PingWebSocketFrame(wrappedBuffer(payload)), channel.voidPromise());
121121
return this;
122122
}
123123

124124
@Override
125125
public WebSocket sendPong(byte[] payload) {
126-
channel.writeAndFlush(new PongWebSocketFrame(wrappedBuffer(payload)));
126+
channel.writeAndFlush(new PongWebSocketFrame(wrappedBuffer(payload)), channel.voidPromise());
127127
return this;
128128
}
129129

0 commit comments

Comments
 (0)