Skip to content

Commit db8c908

Browse files
author
Stephane Landelle
committed
Minor clean up
1 parent b3a95b9 commit db8c908

File tree

6 files changed

+46
-51
lines changed

6 files changed

+46
-51
lines changed

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/Constants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
public class Constants {
66

7-
// FIXME move into a state class along with isClose
7+
// FIXME move into a state class along with closed
88
public static final ThreadLocal<Boolean> IN_IO_THREAD = new ThreadLocalBoolean();
99

1010
// FIXME what to do with this???

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/FutureReaper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@
1616
*/
1717
public final class FutureReaper implements Runnable {
1818

19-
private final AtomicBoolean isClose;
19+
private final AtomicBoolean closed;
2020
private final Channels channels;
2121
private Future<?> scheduledFuture;
2222
private NettyResponseFuture<?> nettyResponseFuture;
2323
private AsyncHttpClientConfig config;
2424

25-
public FutureReaper(NettyResponseFuture<?> nettyResponseFuture, AsyncHttpClientConfig config, AtomicBoolean isClose, Channels channels) {
25+
public FutureReaper(NettyResponseFuture<?> nettyResponseFuture, AsyncHttpClientConfig config, AtomicBoolean closed, Channels channels) {
2626
this.nettyResponseFuture = nettyResponseFuture;
2727
this.channels = channels;
2828
this.config = config;
29-
this.isClose = isClose;
29+
this.closed = closed;
3030
}
3131

3232
public void setScheduledFuture(Future<?> scheduledFuture) {
@@ -79,7 +79,7 @@ private void expire(String message) {
7979
* @Override
8080
*/
8181
public synchronized void run() {
82-
if (isClose.get()) {
82+
if (closed.get()) {
8383
cancel(true);
8484
return;
8585
}

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/NettyAsyncHttpProvider.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class NettyAsyncHttpProvider implements AsyncHttpProvider {
3737

3838
private final AsyncHttpClientConfig config;
3939
private final NettyAsyncHttpProviderConfig asyncHttpProviderConfig;
40-
private final AtomicBoolean isClose = new AtomicBoolean(false);
40+
private final AtomicBoolean closed = new AtomicBoolean(false);
4141
private final Channels channels;
4242
private final NettyRequestSender requestSender;
4343
private final NettyChannelHandler channelHandler;
@@ -53,8 +53,8 @@ public NettyAsyncHttpProvider(AsyncHttpClientConfig config) {
5353
}
5454

5555
channels = new Channels(config, asyncHttpProviderConfig);
56-
requestSender = new NettyRequestSender(isClose, config, channels);
57-
channelHandler = new NettyChannelHandler(config, requestSender, channels, isClose);
56+
requestSender = new NettyRequestSender(closed, config, channels);
57+
channelHandler = new NettyChannelHandler(config, requestSender, channels, closed);
5858
channels.configure(channelHandler);
5959

6060
executeConnectAsync = asyncHttpProviderConfig.isAsyncConnect();
@@ -72,7 +72,7 @@ public String toString() {
7272

7373
@Override
7474
public void close() {
75-
isClose.set(true);
75+
closed.set(true);
7676
try {
7777
channels.close();
7878
// config.executorService().shutdown();

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/NettyChannelHandler.java

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,12 @@
11
package org.asynchttpclient.providers.netty4;
22

3-
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
4-
import static io.netty.handler.codec.http.HttpResponseStatus.FOUND;
5-
import static io.netty.handler.codec.http.HttpResponseStatus.MOVED_PERMANENTLY;
6-
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
7-
import static io.netty.handler.codec.http.HttpResponseStatus.PROXY_AUTHENTICATION_REQUIRED;
8-
import static io.netty.handler.codec.http.HttpResponseStatus.SEE_OTHER;
9-
import static io.netty.handler.codec.http.HttpResponseStatus.TEMPORARY_REDIRECT;
10-
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
11-
import static org.asynchttpclient.providers.netty4.util.HttpUtil.HTTP;
12-
import static org.asynchttpclient.providers.netty4.util.HttpUtil.WEBSOCKET;
13-
import static org.asynchttpclient.providers.netty4.util.HttpUtil.isNTLM;
3+
import static io.netty.handler.codec.http.HttpResponseStatus.*;
4+
import static org.asynchttpclient.providers.netty4.util.HttpUtil.*;
145
import io.netty.buffer.Unpooled;
156
import io.netty.channel.Channel;
7+
import io.netty.channel.ChannelHandler.Sharable;
168
import io.netty.channel.ChannelHandlerContext;
179
import io.netty.channel.ChannelInboundHandlerAdapter;
18-
import io.netty.channel.ChannelHandler.Sharable;
1910
import io.netty.handler.codec.PrematureChannelClosureException;
2011
import io.netty.handler.codec.http.DefaultHttpContent;
2112
import io.netty.handler.codec.http.HttpClientCodec;
@@ -73,15 +64,15 @@ public class NettyChannelHandler extends ChannelInboundHandlerAdapter {
7364
private final AsyncHttpClientConfig config;
7465
private final NettyRequestSender requestSender;
7566
private final Channels channels;
76-
private final AtomicBoolean isClose;
67+
private final AtomicBoolean closed;
7768
private final Protocol httpProtocol = new HttpProtocol();
7869
private final Protocol webSocketProtocol = new WebSocketProtocol();
7970

8071
public NettyChannelHandler(AsyncHttpClientConfig config, NettyRequestSender requestSender, Channels channels, AtomicBoolean isClose) {
8172
this.config = config;
8273
this.requestSender = requestSender;
8374
this.channels = channels;
84-
this.isClose = isClose;
75+
this.closed = isClose;
8576
}
8677

8778
@Override
@@ -117,7 +108,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object e) throws Except
117108

118109
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
119110

120-
if (isClose.get()) {
111+
if (closed.get()) {
121112
return;
122113
}
123114

@@ -137,7 +128,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
137128
callback.call();
138129

139130
} else if (attachment instanceof NettyResponseFuture<?>) {
140-
NettyResponseFuture future = (NettyResponseFuture) attachment;
131+
NettyResponseFuture<?> future = NettyResponseFuture.class.cast(attachment);
141132
future.touch();
142133

143134
if (!config.getIOExceptionFilters().isEmpty() && applyIoExceptionFiltersAndReplayRequest(ctx, future, new IOException("Channel Closed"))) {
@@ -219,8 +210,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Excep
219210
}
220211
}
221212

222-
Protocol p = (ctx.pipeline().get(HttpClientCodec.class) != null ? httpProtocol : webSocketProtocol);
223-
p.onError(ctx, e);
213+
Protocol protocol = ctx.pipeline().get(HttpClientCodec.class) != null ? httpProtocol : webSocketProtocol;
214+
protocol.onError(ctx, e);
224215

225216
channels.closeChannel(ctx);
226217
// FIXME not really sure
@@ -253,9 +244,11 @@ private boolean applyIoExceptionFiltersAndReplayRequest(ChannelHandlerContext ct
253244

254245
private boolean redirect(Request request, NettyResponseFuture<?> future, HttpResponse response, final ChannelHandlerContext ctx) throws Exception {
255246

256-
int statusCode = response.getStatus().code();
247+
io.netty.handler.codec.http.HttpResponseStatus status = response.getStatus();
248+
// int statusCode = response.getStatus().code();
257249
boolean redirectEnabled = request.isRedirectOverrideSet() ? request.isRedirectEnabled() : config.isRedirectEnabled();
258-
if (redirectEnabled && (statusCode == MOVED_PERMANENTLY.code() || statusCode == FOUND.code() || statusCode == SEE_OTHER.code() || statusCode == TEMPORARY_REDIRECT.code())) {
250+
boolean isRedirectStatus = status == MOVED_PERMANENTLY || status == FOUND || status == SEE_OTHER || status == TEMPORARY_REDIRECT;
251+
if (redirectEnabled && isRedirectStatus) {
259252

260253
if (future.incrementAndGetCurrentRedirectCount() < config.getMaxRedirects()) {
261254
// We must allow 401 handling again.
@@ -270,13 +263,12 @@ private boolean redirect(Request request, NettyResponseFuture<?> future, HttpRes
270263
nBuilder.setQueryParameters(null);
271264
}
272265

273-
// FIXME what about 307?
274-
if (!(statusCode < FOUND.code() || statusCode > SEE_OTHER.code()) && !(statusCode == FOUND.code() && config.isStrict302Handling())) {
266+
// FIXME why not do that for 301 and 307 too?
267+
if ((status == FOUND || status == SEE_OTHER) && !(status == FOUND && config.isStrict302Handling())) {
275268
nBuilder.setMethod(HttpMethod.GET.name());
276269
}
277270

278-
// in case of a redirect from HTTP to HTTPS, those values
279-
// might be different
271+
// in case of a redirect from HTTP to HTTPS, future attributes might change
280272
final boolean initialConnectionKeepAlive = future.isKeepAlive();
281273
final String initialPoolKey = channels.getPoolKey(future);
282274

@@ -285,8 +277,8 @@ private boolean redirect(Request request, NettyResponseFuture<?> future, HttpRes
285277
if (request.getUrl().startsWith(WEBSOCKET)) {
286278
newUrl = newUrl.replace(HTTP, WEBSOCKET);
287279
}
288-
289280
LOGGER.debug("Redirecting to {}", newUrl);
281+
290282
for (String cookieStr : future.getHttpResponse().headers().getAll(HttpHeaders.Names.SET_COOKIE)) {
291283
for (Cookie c : CookieDecoder.decode(cookieStr)) {
292284
nBuilder.addOrReplaceCookie(c);
@@ -310,8 +302,10 @@ public void call() throws Exception {
310302
if (HttpHeaders.isTransferEncodingChunked(response)) {
311303
// We must make sure there is no bytes left before
312304
// executing the next request.
305+
// FIXME investigate this
313306
Channels.setDefaultAttribute(ctx, callback);
314307
} else {
308+
// FIXME don't understand: this offers the connection to the pool, or even closes it, while the request has not been sent, right?
315309
callback.call();
316310
}
317311

@@ -590,6 +584,7 @@ public void call() throws Exception {
590584
} else if (statusCode == CONTINUE.code()) {
591585
future.getAndSetWriteHeaders(false);
592586
future.getAndSetWriteBody(true);
587+
// FIXME is this necessary
593588
future.setIgnoreNextContents(true);
594589
requestSender.writeRequest(ctx.channel(), config, future);
595590
return;
@@ -623,7 +618,7 @@ public void call() throws Exception {
623618
return;
624619
}
625620

626-
} else if (statusCode == OK.code() && nettyRequest.getMethod().equals(HttpMethod.CONNECT)) {
621+
} else if (statusCode == OK.code() && nettyRequest.getMethod() == HttpMethod.CONNECT) {
627622

628623
LOGGER.debug("Connected to {}:{}", proxyServer.getHost(), proxyServer.getPort());
629624

@@ -764,9 +759,7 @@ public void handle(ChannelHandlerContext ctx, NettyResponseFuture future, Object
764759
if (redirect(request, future, response, ctx))
765760
return;
766761

767-
io.netty.handler.codec.http.HttpResponseStatus status = io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
768-
769-
boolean validStatus = response.getStatus().equals(status);
762+
boolean validStatus = response.getStatus() == SWITCHING_PROTOCOLS;
770763
boolean validUpgrade = response.headers().get(HttpHeaders.Names.UPGRADE) != null;
771764
String c = response.headers().get(HttpHeaders.Names.CONNECTION);
772765
if (c == null) {

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/NettyConnectionsPool.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class NettyConnectionsPool implements ConnectionsPool<String, Channel> {
3838
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>> connectionsPool = new ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>>();
3939
private final ConcurrentHashMap<Channel, IdleChannel> channel2IdleChannel = new ConcurrentHashMap<Channel, IdleChannel>();
4040
private final ConcurrentHashMap<Channel, Long> channel2CreationDate = new ConcurrentHashMap<Channel, Long>();
41-
private final AtomicBoolean isClosed = new AtomicBoolean(false);
41+
private final AtomicBoolean closed = new AtomicBoolean(false);
4242
private final Timer idleConnectionDetector;
4343
private final boolean sslConnectionPoolEnabled;
4444
private final int maxTotalConnections;
@@ -93,7 +93,7 @@ private class IdleChannelDetector extends TimerTask {
9393
@Override
9494
public void run() {
9595
try {
96-
if (isClosed.get()) return;
96+
if (closed.get()) return;
9797

9898
if (log.isDebugEnabled()) {
9999
Set<String> keys = connectionsPool.keySet();
@@ -155,7 +155,7 @@ public void run() {
155155
* {@inheritDoc}
156156
*/
157157
public boolean offer(String uri, Channel channel) {
158-
if (isClosed.get()) return false;
158+
if (closed.get()) return false;
159159

160160
if (!sslConnectionPoolEnabled && uri.startsWith("https")) {
161161
return false;
@@ -232,7 +232,7 @@ public Channel poll(String uri) {
232232
}
233233

234234
private boolean remove(IdleChannel pooledChannel) {
235-
if (pooledChannel == null || isClosed.get()) return false;
235+
if (pooledChannel == null || closed.get()) return false;
236236

237237
boolean isRemoved = false;
238238
ConcurrentLinkedQueue<IdleChannel> pooledConnectionForHost = connectionsPool.get(pooledChannel.uri);
@@ -248,14 +248,14 @@ private boolean remove(IdleChannel pooledChannel) {
248248
*/
249249
public boolean removeAll(Channel channel) {
250250
channel2CreationDate.remove(channel);
251-
return !isClosed.get() && remove(channel2IdleChannel.get(channel));
251+
return !closed.get() && remove(channel2IdleChannel.get(channel));
252252
}
253253

254254
/**
255255
* {@inheritDoc}
256256
*/
257257
public boolean canCacheConnection() {
258-
if (!isClosed.get() && maxTotalConnections != -1 && channel2IdleChannel.size() >= maxTotalConnections) {
258+
if (!closed.get() && maxTotalConnections != -1 && channel2IdleChannel.size() >= maxTotalConnections) {
259259
return false;
260260
} else {
261261
return true;
@@ -266,7 +266,7 @@ public boolean canCacheConnection() {
266266
* {@inheritDoc}
267267
*/
268268
public void destroy() {
269-
if (isClosed.getAndSet(true)) return;
269+
if (closed.getAndSet(true)) return;
270270

271271
// stop timer
272272
idleConnectionDetector.cancel();

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/NettyRequestSender.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ public class NettyRequestSender {
5454

5555
private static final Logger LOGGER = LoggerFactory.getLogger(NettyRequestSender.class);
5656

57-
private final AtomicBoolean isClose;
57+
private final AtomicBoolean closed;
5858
private final AsyncHttpClientConfig config;
5959
private final Channels channels;
6060

61-
public NettyRequestSender(AtomicBoolean isClose, AsyncHttpClientConfig config, Channels channels) {
62-
this.isClose = isClose;
61+
public NettyRequestSender(AtomicBoolean closed, AsyncHttpClientConfig config, Channels channels) {
62+
this.closed = closed;
6363
this.config = config;
6464
this.channels = channels;
6565
}
@@ -68,7 +68,7 @@ public boolean retry(Channel channel, NettyResponseFuture<?> future) {
6868

6969
boolean success = false;
7070

71-
if (!isClose.get()) {
71+
if (!closed.get()) {
7272
channels.removeAll(channel);
7373

7474
if (future == null) {
@@ -105,14 +105,15 @@ public <T> void execute(final Request request, final NettyResponseFuture<T> f) t
105105
doConnect(request, f.getAsyncHandler(), f, true, true, true);
106106
}
107107

108+
// FIXME is this useful? Can't we do that when building the request?
108109
private final boolean validateWebSocketRequest(Request request, AsyncHandler<?> asyncHandler) {
109110
return request.getMethod().equals(HttpMethod.GET.name()) && asyncHandler instanceof WebSocketUpgradeHandler;
110111
}
111112

112113
public <T> ListenableFuture<T> doConnect(final Request request, final AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future, boolean useCache, boolean asyncConnect,
113114
boolean reclaimCache) throws IOException {
114115

115-
if (isClose.get()) {
116+
if (closed.get()) {
116117
throw new IOException("Closed");
117118
}
118119

@@ -122,6 +123,7 @@ public <T> ListenableFuture<T> doConnect(final Request request, final AsyncHandl
122123

123124
ProxyServer proxyServer = ProxyUtils.getProxyServer(config, request);
124125
boolean useProxy = proxyServer != null;
126+
125127
URI uri;
126128
if (config.isUseRawUrl()) {
127129
uri = request.getRawURI();
@@ -403,7 +405,7 @@ private void scheduleReaper(NettyResponseFuture<?> future) {
403405
: requestTimeout) : config.getIdleConnectionTimeoutInMs();
404406

405407
if (schedulePeriod != -1 && !future.isDone() && !future.isCancelled()) {
406-
FutureReaper reaperFuture = new FutureReaper(future, config, isClose, channels);
408+
FutureReaper reaperFuture = new FutureReaper(future, config, closed, channels);
407409
Future<?> scheduledFuture = config.reaper().scheduleAtFixedRate(reaperFuture, 0, schedulePeriod, TimeUnit.MILLISECONDS);
408410
reaperFuture.setScheduledFuture(scheduledFuture);
409411
future.setReaperFuture(reaperFuture);

0 commit comments

Comments
 (0)