Skip to content

Commit c9aaff8

Browse files
author
Stephane Landelle
committed
Compute request and body together and wrap them into NettyRequest
1 parent e4e6d7b commit c9aaff8

18 files changed

+387
-168
lines changed

providers/netty/src/main/java/org/asynchttpclient/providers/netty/future/NettyResponseFuture.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
*/
1616
package org.asynchttpclient.providers.netty.future;
1717

18-
import static org.asynchttpclient.util.DateUtil.millisTime;
18+
import static org.asynchttpclient.util.DateUtil.*;
1919
import io.netty.channel.Channel;
20-
import io.netty.handler.codec.http.HttpRequest;
20+
import io.netty.handler.codec.http.HttpHeaders;
2121
import io.netty.handler.codec.http.HttpResponse;
2222

2323
import java.net.URI;
@@ -40,6 +40,7 @@
4040
import org.asynchttpclient.listenable.AbstractListenableFuture;
4141
import org.asynchttpclient.providers.netty.DiscardEvent;
4242
import org.asynchttpclient.providers.netty.channel.Channels;
43+
import org.asynchttpclient.providers.netty.request.NettyRequest;
4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
4546

@@ -50,8 +51,8 @@
5051
*/
5152
public final class NettyResponseFuture<V> extends AbstractListenableFuture<V> {
5253

53-
private final static Logger logger = LoggerFactory.getLogger(NettyResponseFuture.class);
54-
public final static String MAX_RETRY = "org.asynchttpclient.providers.netty.maxRetry";
54+
private static final Logger logger = LoggerFactory.getLogger(NettyResponseFuture.class);
55+
public static final String MAX_RETRY = "org.asynchttpclient.providers.netty.maxRetry";
5556

5657
public enum STATE {
5758
NEW, POOLED, RECONNECTED, CLOSED,
@@ -86,8 +87,8 @@ public enum STATE {
8687
private URI uri;
8788
private boolean keepAlive = true;
8889
private Request request;
89-
private HttpRequest nettyRequest;
90-
private HttpResponse httpResponse;
90+
private NettyRequest nettyRequest;
91+
private HttpHeaders httpHeaders;
9192
private AsyncHandler<V> asyncHandler;
9293
private HttpResponse pendingResponse;
9394
private boolean streamWasAlreadyConsumed;
@@ -99,7 +100,7 @@ public enum STATE {
99100
public NettyResponseFuture(URI uri,//
100101
Request request,//
101102
AsyncHandler<V> asyncHandler,//
102-
HttpRequest nettyRequest,//
103+
NettyRequest nettyRequest,//
103104
int requestTimeoutInMs,//
104105
AsyncHttpClientConfig config,//
105106
ConnectionPoolKeyStrategy connectionPoolKeyStrategy,//
@@ -333,11 +334,11 @@ public final Request getRequest() {
333334
return request;
334335
}
335336

336-
public final HttpRequest getNettyRequest() {
337+
public final NettyRequest getNettyRequest() {
337338
return nettyRequest;
338339
}
339340

340-
public final void setNettyRequest(HttpRequest nettyRequest) {
341+
public final void setNettyRequest(NettyRequest nettyRequest) {
341342
this.nettyRequest = nettyRequest;
342343
}
343344

@@ -353,12 +354,12 @@ public final void setKeepAlive(final boolean keepAlive) {
353354
this.keepAlive = keepAlive;
354355
}
355356

356-
public final HttpResponse getHttpResponse() {
357-
return httpResponse;
357+
public final HttpHeaders getHttpHeaders() {
358+
return httpHeaders;
358359
}
359360

360-
public final void setHttpResponse(final HttpResponse httpResponse) {
361-
this.httpResponse = httpResponse;
361+
public final void setHttpHeaders(HttpHeaders httpHeaders) {
362+
this.httpHeaders = httpHeaders;
362363
}
363364

364365
public int incrementAndGetCurrentRedirectCount() {
@@ -495,7 +496,7 @@ public String toString() {
495496
",\n\tcontent=" + content + //
496497
",\n\turi=" + uri + //
497498
",\n\tkeepAlive=" + keepAlive + //
498-
",\n\thttpResponse=" + httpResponse + //
499+
",\n\thttpHeaders=" + httpHeaders + //
499500
",\n\texEx=" + exEx + //
500501
",\n\tredirectCount=" + redirectCount + //
501502
",\n\treaperFuture=" + reaperFuture + //

providers/netty/src/main/java/org/asynchttpclient/providers/netty/future/NettyResponseFutures.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package org.asynchttpclient.providers.netty.future;
22

33
import io.netty.handler.codec.http.HttpHeaders;
4-
import io.netty.handler.codec.http.HttpRequest;
54

65
import java.net.URI;
76

87
import org.asynchttpclient.AsyncHandler;
98
import org.asynchttpclient.AsyncHttpClientConfig;
109
import org.asynchttpclient.ProxyServer;
1110
import org.asynchttpclient.Request;
11+
import org.asynchttpclient.providers.netty.request.NettyRequest;
1212
import org.asynchttpclient.util.AsyncHttpProviderUtils;
1313

1414
public class NettyResponseFutures {
1515

16-
public static <T> NettyResponseFuture<T> newNettyResponseFuture(URI uri, Request request, AsyncHandler<T> asyncHandler, HttpRequest nettyRequest, AsyncHttpClientConfig config, ProxyServer proxyServer) {
16+
public static <T> NettyResponseFuture<T> newNettyResponseFuture(URI uri, Request request, AsyncHandler<T> asyncHandler, NettyRequest nettyRequest, AsyncHttpClientConfig config, ProxyServer proxyServer) {
1717

1818
int requestTimeout = AsyncHttpProviderUtils.requestTimeout(config, request);
1919
NettyResponseFuture<T> f = new NettyResponseFuture<T>(uri,//

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/HttpProtocol.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,18 @@
5353
import org.asynchttpclient.providers.netty.NettyAsyncHttpProviderConfig;
5454
import org.asynchttpclient.providers.netty.channel.Channels;
5555
import org.asynchttpclient.providers.netty.future.NettyResponseFuture;
56+
import org.asynchttpclient.providers.netty.request.NettyRequest;
5657
import org.asynchttpclient.providers.netty.request.NettyRequestSender;
5758
import org.asynchttpclient.providers.netty.response.ResponseHeaders;
5859
import org.asynchttpclient.providers.netty.response.ResponseStatus;
5960
import org.asynchttpclient.spnego.SpnegoEngine;
6061
import org.asynchttpclient.util.AsyncHttpProviderUtils;
62+
import org.slf4j.Logger;
63+
import org.slf4j.LoggerFactory;
6164

6265
final class HttpProtocol extends Protocol {
66+
67+
private static final Logger LOGGER = LoggerFactory.getLogger(HttpProtocol.class);
6368

6469
public HttpProtocol(Channels channels, AsyncHttpClientConfig config, NettyAsyncHttpProviderConfig nettyConfig, NettyRequestSender requestSender) {
6570
super(channels, config, nettyConfig, requestSender);
@@ -199,7 +204,7 @@ private void markAsDone(NettyResponseFuture<?> future, final ChannelHandlerConte
199204
future.done();
200205
} catch (Throwable t) {
201206
// Never propagate exception once we know we are done.
202-
NettyChannelHandler.LOGGER.debug(t.getMessage(), t);
207+
LOGGER.debug(t.getMessage(), t);
203208
}
204209

205210
if (!future.isKeepAlive() || !ctx.channel().isActive()) {
@@ -249,7 +254,7 @@ private boolean handleResponseAndExit(final ChannelHandlerContext ctx, final Net
249254

250255
// store the original headers so we can re-send all them to
251256
// the handler in case of trailing headers
252-
future.setHttpResponse(response);
257+
future.setHttpHeaders(response.headers());
253258

254259
future.setKeepAlive(!HttpHeaders.Values.CLOSE.equalsIgnoreCase(response.headers().get(HttpHeaders.Names.CONNECTION)));
255260

@@ -280,7 +285,7 @@ private boolean handleResponseAndExit(final ChannelHandlerContext ctx, final Net
280285

281286
final Realm nr = new Realm.RealmBuilder().clone(newRealm).setUri(URI.create(request.getUrl()).getPath()).build();
282287

283-
NettyChannelHandler.LOGGER.debug("Sending authentication to {}", request.getUrl());
288+
LOGGER.debug("Sending authentication to {}", request.getUrl());
284289
Callback callback = new Callback(future) {
285290
public void call() throws Exception {
286291
channels.drainChannel(ctx, future);
@@ -309,7 +314,7 @@ public void call() throws Exception {
309314
} else if (statusCode == PROXY_AUTHENTICATION_REQUIRED.code()) {
310315
List<String> proxyAuth = getAuthorizationToken(response.headers(), HttpHeaders.Names.PROXY_AUTHENTICATE);
311316
if (realm != null && !proxyAuth.isEmpty() && !future.getAndSetAuth(true)) {
312-
NettyChannelHandler.LOGGER.debug("Sending proxy authentication to {}", request.getUrl());
317+
LOGGER.debug("Sending proxy authentication to {}", request.getUrl());
313318

314319
future.setState(NettyResponseFuture.STATE.NEW);
315320
Realm newRealm = null;
@@ -335,14 +340,14 @@ public void call() throws Exception {
335340

336341
} else if (statusCode == OK.code() && nettyRequest.getMethod() == HttpMethod.CONNECT) {
337342

338-
NettyChannelHandler.LOGGER.debug("Connected to {}:{}", proxyServer.getHost(), proxyServer.getPort());
343+
LOGGER.debug("Connected to {}:{}", proxyServer.getHost(), proxyServer.getPort());
339344

340345
if (future.isKeepAlive()) {
341346
future.attachChannel(ctx.channel(), true);
342347
}
343348

344349
try {
345-
NettyChannelHandler.LOGGER.debug("Connecting to proxy {} for scheme {}", proxyServer, request.getUrl());
350+
LOGGER.debug("Connecting to proxy {} for scheme {}", proxyServer, request.getUrl());
346351
channels.upgradeProtocol(ctx.channel().pipeline(), request.getURI().getScheme());
347352
} catch (Throwable ex) {
348353
channels.abort(future, ex);
@@ -376,23 +381,22 @@ public void handle(final ChannelHandlerContext ctx, final NettyResponseFuture fu
376381
return;
377382
}
378383

379-
HttpRequest nettyRequest = future.getNettyRequest();
384+
NettyRequest nettyRequest = future.getNettyRequest();
380385
AsyncHandler handler = future.getAsyncHandler();
381386
ProxyServer proxyServer = future.getProxyServer();
382387
try {
383388
if (e instanceof HttpResponse) {
384389
HttpResponse response = (HttpResponse) e;
385-
NettyChannelHandler.LOGGER.debug("\n\nRequest {}\n\nResponse {}\n", nettyRequest, response);
390+
LOGGER.debug("\n\nRequest {}\n\nResponse {}\n", nettyRequest.getHttpRequest(), response);
386391
future.setPendingResponse(response);
387392
return;
388393
}
389394

390395
if (e instanceof HttpContent) {
391-
392396
HttpResponse response = future.getPendingResponse();
393397
future.setPendingResponse(null);
394398
if (handler != null) {
395-
if (response != null && handleResponseAndExit(ctx, future, handler, nettyRequest, proxyServer, response)) {
399+
if (response != null && handleResponseAndExit(ctx, future, handler, nettyRequest.getHttpRequest(), proxyServer, response)) {
396400
return;
397401
}
398402

@@ -405,7 +409,7 @@ public void handle(final ChannelHandlerContext ctx, final NettyResponseFuture fu
405409
LastHttpContent lastChunk = (LastHttpContent) chunk;
406410
HttpHeaders trailingHeaders = lastChunk.trailingHeaders();
407411
if (!trailingHeaders.isEmpty()) {
408-
interrupt = handler.onHeadersReceived(new ResponseHeaders(future.getURI(), future.getHttpResponse().headers(), trailingHeaders)) != STATE.CONTINUE;
412+
interrupt = handler.onHeadersReceived(new ResponseHeaders(future.getURI(), future.getHttpHeaders(), trailingHeaders)) != STATE.CONTINUE;
409413
}
410414
}
411415

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/NettyChannelHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
@Sharable
4343
public class NettyChannelHandler extends ChannelInboundHandlerAdapter {
4444

45-
static final Logger LOGGER = LoggerFactory.getLogger(NettyChannelHandler.class);
45+
private static final Logger LOGGER = LoggerFactory.getLogger(NettyChannelHandler.class);
4646

4747
private final AsyncHttpClientConfig config;
4848
private final NettyRequestSender requestSender;

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/Protocol.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,13 @@ protected boolean redirect(Request request, NettyResponseFuture<?> future, HttpR
9595

9696
logger.debug("Redirecting to {}", newUrl);
9797

98-
for (String cookieStr : future.getHttpResponse().headers().getAll(HttpHeaders.Names.SET_COOKIE)) {
98+
for (String cookieStr : future.getHttpHeaders().getAll(HttpHeaders.Names.SET_COOKIE)) {
9999
for (Cookie c : CookieDecoder.decode(cookieStr)) {
100100
nBuilder.addOrReplaceCookie(c);
101101
}
102102
}
103103

104-
for (String cookieStr : future.getHttpResponse().headers().getAll(HttpHeaders.Names.SET_COOKIE2)) {
104+
for (String cookieStr : future.getHttpHeaders().getAll(HttpHeaders.Names.SET_COOKIE2)) {
105105
for (Cookie c : CookieDecoder.decode(cookieStr)) {
106106
nBuilder.addOrReplaceCookie(c);
107107
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/WebSocketProtocol.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,14 @@
4848
import org.asynchttpclient.providers.netty.ws.NettyWebSocket;
4949
import org.asynchttpclient.providers.netty.ws.WebSocketUtil;
5050
import org.asynchttpclient.websocket.WebSocketUpgradeHandler;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
5153

5254
final class WebSocketProtocol extends Protocol {
5355

56+
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketProtocol.class);
57+
58+
5459
private static final byte OPCODE_TEXT = 0x1;
5560
private static final byte OPCODE_BINARY = 0x2;
5661
private static final byte OPCODE_UNKNOWN = -1;
@@ -67,7 +72,7 @@ private void invokeOnSucces(ChannelHandlerContext ctx, WebSocketUpgradeHandler h
6772
try {
6873
h.onSuccess(new NettyWebSocket(ctx.channel()));
6974
} catch (Exception ex) {
70-
NettyChannelHandler.LOGGER.warn("onSuccess unexpected exception", ex);
75+
LOGGER.warn("onSuccess unexpected exception", ex);
7176
}
7277
}
7378
}
@@ -105,7 +110,7 @@ public void handle(ChannelHandlerContext ctx, NettyResponseFuture future, Object
105110
return;
106111
}
107112

108-
future.setHttpResponse(response);
113+
future.setHttpHeaders(response.headers());
109114
if (redirect(request, future, response, ctx))
110115
return;
111116

@@ -128,7 +133,7 @@ public void handle(ChannelHandlerContext ctx, NettyResponseFuture future, Object
128133
}
129134

130135
String accept = response.headers().get(HttpHeaders.Names.SEC_WEBSOCKET_ACCEPT);
131-
String key = WebSocketUtil.getAcceptKey(future.getNettyRequest().headers().get(HttpHeaders.Names.SEC_WEBSOCKET_KEY));
136+
String key = WebSocketUtil.getAcceptKey(future.getNettyRequest().getHttpRequest().headers().get(HttpHeaders.Names.SEC_WEBSOCKET_KEY));
132137
if (accept == null || !accept.equals(key)) {
133138
throw new IOException(String.format("Invalid challenge. Actual: %s. Expected: %s", accept, key));
134139
}
@@ -173,20 +178,20 @@ public void handle(ChannelHandlerContext ctx, NettyResponseFuture future, Object
173178
}
174179
}
175180
} else {
176-
NettyChannelHandler.LOGGER.debug("UpgradeHandler returned a null NettyWebSocket ");
181+
LOGGER.debug("UpgradeHandler returned a null NettyWebSocket ");
177182
}
178183
} else if (e instanceof LastHttpContent) {
179184
// FIXME what to do with this kind of messages?
180185
} else {
181-
NettyChannelHandler.LOGGER.error("Invalid message {}", e);
186+
LOGGER.error("Invalid message {}", e);
182187
}
183188
}
184189

185190
@Override
186191
public void onError(ChannelHandlerContext ctx, Throwable e) {
187192
try {
188193
Object attribute = Channels.getDefaultAttribute(ctx);
189-
NettyChannelHandler.LOGGER.warn("onError {}", e);
194+
LOGGER.warn("onError {}", e);
190195
if (!(attribute instanceof NettyResponseFuture)) {
191196
return;
192197
}
@@ -200,13 +205,13 @@ public void onError(ChannelHandlerContext ctx, Throwable e) {
200205
webSocket.close();
201206
}
202207
} catch (Throwable t) {
203-
NettyChannelHandler.LOGGER.error("onError", t);
208+
LOGGER.error("onError", t);
204209
}
205210
}
206211

207212
@Override
208213
public void onClose(ChannelHandlerContext ctx) {
209-
NettyChannelHandler.LOGGER.trace("onClose {}");
214+
LOGGER.trace("onClose {}");
210215
Object attribute = Channels.getDefaultAttribute(ctx);
211216
if (!(attribute instanceof NettyResponseFuture)) {
212217
return;
@@ -222,7 +227,7 @@ public void onClose(ChannelHandlerContext ctx) {
222227
if (attribute != DiscardEvent.INSTANCE)
223228
webSocket.close(1006, "Connection was closed abnormally (that is, with no close frame being sent).");
224229
} catch (Throwable t) {
225-
NettyChannelHandler.LOGGER.error("onError", t);
230+
LOGGER.error("onError", t);
226231
}
227232
}
228233
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/NettyConnectListener.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.netty.channel.Channel;
2020
import io.netty.channel.ChannelFuture;
2121
import io.netty.channel.ChannelFutureListener;
22-
import io.netty.handler.codec.http.HttpRequest;
2322
import io.netty.handler.ssl.SslHandler;
2423

2524
import java.io.IOException;
@@ -124,7 +123,7 @@ public Builder(AsyncHttpClientConfig config, NettyRequestSender requestSender, R
124123

125124
public NettyConnectListener<T> build(final URI uri) throws IOException {
126125
ProxyServer proxyServer = ProxyUtils.getProxyServer(config, request);
127-
HttpRequest nettyRequest = NettyRequests.newNettyRequest(config, request, uri, true, proxyServer);
126+
NettyRequest nettyRequest = NettyRequests.newNettyRequest(config, request, uri, true, proxyServer);
128127
if (future == null) {
129128
future = NettyResponseFutures.newNettyResponseFuture(uri, request, asyncHandler, nettyRequest, config, proxyServer);
130129
} else {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2010-2013 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package org.asynchttpclient.providers.netty.request;
17+
18+
import org.asynchttpclient.providers.netty.request.body.NettyBody;
19+
20+
import io.netty.handler.codec.http.HttpRequest;
21+
22+
public class NettyRequest {
23+
24+
private final HttpRequest httpRequest;
25+
private final NettyBody body;
26+
27+
public NettyRequest(HttpRequest httpRequest, NettyBody body) {
28+
this.httpRequest = httpRequest;
29+
this.body = body;
30+
}
31+
32+
public HttpRequest getHttpRequest() {
33+
return httpRequest;
34+
}
35+
36+
public NettyBody getBody() {
37+
return body;
38+
}
39+
}

0 commit comments

Comments
 (0)