Skip to content

Commit 9803086

Browse files
author
oleksiys
committed
[1.9.x] + Grizzly: check if the connection is keep-alive, because framework itself doesn't do it for the client side (yet)
1 parent 2caf1c4 commit 9803086

File tree

6 files changed

+156
-45
lines changed

6 files changed

+156
-45
lines changed

src/main/java/com/ning/http/client/providers/grizzly/AhcEventFilter.java

Lines changed: 103 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@
4343
import org.glassfish.grizzly.filterchain.NextAction;
4444
import org.glassfish.grizzly.http.HttpClientFilter;
4545
import org.glassfish.grizzly.http.HttpContent;
46+
import org.glassfish.grizzly.http.HttpContext;
4647
import org.glassfish.grizzly.http.HttpHeader;
47-
import org.glassfish.grizzly.http.HttpRequestPacket;
4848
import org.glassfish.grizzly.http.HttpResponsePacket;
49-
import org.glassfish.grizzly.http.Method;
49+
import org.glassfish.grizzly.http.Protocol;
50+
import org.glassfish.grizzly.http.util.DataChunk;
5051
import org.glassfish.grizzly.http.util.Header;
5152
import org.glassfish.grizzly.http.util.HttpStatus;
5253
import org.glassfish.grizzly.utils.Exceptions;
@@ -57,7 +58,6 @@
5758

5859
import static com.ning.http.client.providers.netty.util.HttpUtils.getNTLM;
5960
import static com.ning.http.util.MiscUtils.isNonEmpty;
60-
import org.glassfish.grizzly.http.HttpContext;
6161
/**
6262
* AHC {@link HttpClientFilter} implementation.
6363
*
@@ -73,6 +73,34 @@ final class AhcEventFilter extends HttpClientFilter {
7373
private static IOException maximumPooledConnectionExceededReason;
7474

7575
private final GrizzlyAsyncHttpProvider provider;
76+
77+
78+
/**
79+
* Close bytes.
80+
*/
81+
private static final byte[] CLOSE_BYTES = {
82+
(byte) 'c',
83+
(byte) 'l',
84+
(byte) 'o',
85+
(byte) 's',
86+
(byte) 'e'
87+
};
88+
/**
89+
* Keep-alive bytes.
90+
*/
91+
private static final byte[] KEEPALIVE_BYTES = {
92+
(byte) 'k',
93+
(byte) 'e',
94+
(byte) 'e',
95+
(byte) 'p',
96+
(byte) '-',
97+
(byte) 'a',
98+
(byte) 'l',
99+
(byte) 'i',
100+
(byte) 'v',
101+
(byte) 'e'
102+
};
103+
76104
// -------------------------------------------------------- Constructors
77105

78106
AhcEventFilter(final GrizzlyAsyncHttpProvider provider,
@@ -124,7 +152,8 @@ protected void onHttpContentParsed(final HttpContent content,
124152
final AsyncHandler handler = context.getAsyncHandler();
125153
if (handler != null && context.currentState != AsyncHandler.STATE.ABORT) {
126154
try {
127-
context.currentState = handler.onBodyPartReceived(new GrizzlyResponseBodyPart(content, ctx.getConnection()));
155+
context.currentState = handler.onBodyPartReceived(
156+
new GrizzlyResponseBodyPart(content, ctx.getConnection()));
128157
} catch (Exception e) {
129158
handler.onThrowable(e);
130159
}
@@ -263,20 +292,32 @@ protected void onHttpContentError(final HttpHeader httpHeader,
263292

264293
@SuppressWarnings(value = {"unchecked"})
265294
@Override
266-
protected void onHttpHeadersParsed(final HttpHeader httpHeader,
267-
final FilterChainContext ctx) {
268-
super.onHttpHeadersParsed(httpHeader, ctx);
295+
protected boolean onHttpHeaderParsed(final HttpHeader httpHeader,
296+
final Buffer buffer, final FilterChainContext ctx) {
297+
super.onHttpHeaderParsed(httpHeader, buffer, ctx);
269298
LOGGER.debug("RESPONSE: {}", httpHeader);
299+
300+
final HttpResponsePacket responsePacket =
301+
(HttpResponsePacket) httpHeader;
302+
303+
// @TODO review this after Grizzly 2.3.20 is integrated
304+
final boolean isKeepAlive = checkKeepAlive(responsePacket);
305+
responsePacket.getProcessingState().setKeepAlive(isKeepAlive);
306+
307+
if (httpHeader.isSkipRemainder()) {
308+
return false;
309+
}
310+
270311
final HttpTransactionContext context =
271312
HttpTransactionContext.currentTransaction(httpHeader);
272-
if (httpHeader.containsHeader(Header.Connection)) {
273-
if ("close".equals(httpHeader.getHeader(Header.Connection))) {
274-
ConnectionManager.markConnectionAsDoNotCache(ctx.getConnection());
275-
}
276-
}
277-
if (httpHeader.isSkipRemainder() || context.establishingTunnel) {
278-
return;
313+
if (context.establishingTunnel) {
314+
// finish request/response processing, because Grizzly itself
315+
// treats CONNECT traffic as part of request-response processing
316+
// and we don't want it be treated like that
317+
httpHeader.setExpectContent(false);
318+
return false;
279319
}
320+
280321
final AsyncHandler handler = context.getAsyncHandler();
281322
final List<ResponseFilter> filters =
282323
provider.getClientConfig().getResponseFilters();
@@ -319,7 +360,7 @@ protected void onHttpHeadersParsed(final HttpHeader httpHeader,
319360
} catch (Exception e) {
320361
context.abort(e);
321362
}
322-
return;
363+
return false;
323364
}
324365
}
325366
if (context.statusHandler != null &&
@@ -329,7 +370,7 @@ protected void onHttpHeadersParsed(final HttpHeader httpHeader,
329370
(HttpResponsePacket) httpHeader, context, ctx);
330371
if (!result) {
331372
httpHeader.setSkipRemainder(true);
332-
return;
373+
return false;
333374
}
334375
}
335376
if (context.isWSRequest) {
@@ -372,19 +413,7 @@ protected void onHttpHeadersParsed(final HttpHeader httpHeader,
372413
}
373414
}
374415
}
375-
}
376-
377-
@Override
378-
protected boolean onHttpHeaderParsed(final HttpHeader httpHeader,
379-
final Buffer buffer, final FilterChainContext ctx) {
380-
super.onHttpHeaderParsed(httpHeader, buffer, ctx);
381-
final HttpRequestPacket request = ((HttpResponsePacket) httpHeader).getRequest();
382-
if (Method.CONNECT.equals(request.getMethod())) {
383-
// finish request/response processing, because Grizzly itself
384-
// treats CONNECT traffic as part of request-response processing
385-
// and we don't want it be treated like that
386-
httpHeader.setExpectContent(false);
387-
}
416+
388417
return false;
389418
}
390419

@@ -454,7 +483,8 @@ private static void cleanup(final HttpContext httpContext) {
454483
if (!context.isReuseConnection()) {
455484
final Connection c = (Connection) httpContext.getCloseable();
456485
final ConnectionManager cm = context.provider.getConnectionManager();
457-
if (!cm.canReturnConnection(c) || !cm.returnConnection(context.getAhcRequest(), c)) {
486+
if (!httpContext.getRequest().getProcessingState().isStayAlive() ||
487+
!cm.canReturnConnection(c) || !cm.returnConnection(context.getAhcRequest(), c)) {
458488
// context.abort());
459489
if (maximumPooledConnectionExceededReason == null) {
460490
maximumPooledConnectionExceededReason
@@ -477,6 +507,49 @@ private static boolean isRedirect(final int status) {
477507
|| HttpStatus.PERMANENT_REDIRECT_308.statusMatches(status);
478508
}
479509

510+
private static boolean checkKeepAlive(final HttpResponsePacket response) {
511+
final int statusCode = response.getStatus();
512+
final boolean isExpectContent = response.isExpectContent();
513+
514+
boolean keepAlive = !statusDropsConnection(statusCode) ||
515+
(!isExpectContent || !response.isChunked() || response.getContentLength() == -1); // double-check the transfer encoding here
516+
517+
if (keepAlive) {
518+
// Check the Connection header
519+
final DataChunk cVal =
520+
response.getHeaders().getValue(Header.Connection);
521+
522+
if (response.getProtocol().compareTo(Protocol.HTTP_1_1) < 0) {
523+
// HTTP 1.0 response
524+
// "Connection: keep-alive" should be specified explicitly
525+
keepAlive = cVal != null && cVal.equalsIgnoreCase(KEEPALIVE_BYTES);
526+
} else {
527+
// HTTP 1.1+
528+
// keep-alive by default, if there's no "Connection: close"
529+
keepAlive = cVal == null || !cVal.equalsIgnoreCase(CLOSE_BYTES);
530+
}
531+
}
532+
533+
return keepAlive;
534+
}
535+
536+
/**
537+
* Determine if we must drop the connection because of the HTTP status
538+
* code. Use the same list of codes as Apache/httpd.
539+
*/
540+
private static boolean statusDropsConnection(int status) {
541+
return status == 400 /* SC_BAD_REQUEST */ ||
542+
status == 408 /* SC_REQUEST_TIMEOUT */ ||
543+
status == 411 /* SC_LENGTH_REQUIRED */ ||
544+
status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ ||
545+
status == 414 /* SC_REQUEST_URI_TOO_LARGE */ ||
546+
status == 417 /* FAILED EXPECTATION */ ||
547+
status == 500 /* SC_INTERNAL_SERVER_ERROR */ ||
548+
status == 503 /* SC_SERVICE_UNAVAILABLE */ ||
549+
status == 501 /* SC_NOT_IMPLEMENTED */ ||
550+
status == 505 /* SC_VERSION_NOT_SUPPORTED */;
551+
}
552+
480553
// ------------------------------------------------------- Inner Classes
481554
private static final class AuthorizationHandler implements StatusHandler {
482555

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (c) 2015 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.providers.grizzly;
14+
15+
import org.glassfish.grizzly.Closeable;
16+
import org.glassfish.grizzly.OutputSink;
17+
import org.glassfish.grizzly.attributes.AttributeStorage;
18+
import org.glassfish.grizzly.http.HttpContext;
19+
import org.glassfish.grizzly.http.HttpRequestPacket;
20+
21+
/**
22+
* AHC {@link HttpContext}.
23+
*
24+
* @author Grizzly Team
25+
*/
26+
class AhcHttpContext extends HttpContext {
27+
private final HttpTransactionContext txCtx;
28+
29+
AhcHttpContext(final AttributeStorage attributeStorage,
30+
final OutputSink outputSink, final Closeable closeable,
31+
final HttpRequestPacket request,
32+
final HttpTransactionContext txCtx) {
33+
super(attributeStorage, outputSink, closeable, request);
34+
35+
this.txCtx = txCtx;
36+
}
37+
38+
public HttpTransactionContext getHttpTransactionContext() {
39+
return txCtx;
40+
}
41+
}

src/main/java/com/ning/http/client/providers/grizzly/ConnectionManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ void getConnectionAsync(final Request request,
100100
}
101101
openConnectionAsync(request, proxy, requestFuture, connectHandler);
102102
} else {
103-
provider.touchConnection(c, request);
103+
// provider.touchConnection(c, request);
104104
connectHandler.completed(c);
105105
}
106106
}
@@ -165,9 +165,9 @@ private Connection openConnectionSync0(final Request request,
165165
}
166166

167167
boolean returnConnection(final Request request, final Connection c) {
168-
ProxyServer proxyServer = ProxyUtils.getProxyServer(config, request);
169168
final boolean result = DO_NOT_CACHE.get(c) == null &&
170-
pool.offer(getPartitionId(request, proxyServer), c);
169+
pool.offer(getPartitionId(request,
170+
ProxyUtils.getProxyServer(config, request)), c);
171171
if (result) {
172172
if (provider.resolver != null) {
173173
provider.resolver.setTimeoutMillis(c, IdleTimeoutFilter.FOREVER);
@@ -206,7 +206,7 @@ public void failed(final Throwable throwable) {
206206
}
207207

208208
public void completed(final Connection connection) {
209-
provider.touchConnection(connection, request);
209+
// provider.touchConnection(connection, request);
210210
if (wrappedHandler != null) {
211211
connection.addCloseListener(connectionMonitor);
212212
wrappedHandler.completed(connection);

src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import com.ning.http.client.providers.grizzly.events.SSLSwitchingEvent;
1717

18-
import org.glassfish.grizzly.Buffer;
1918
import org.glassfish.grizzly.CompletionHandler;
2019
import org.glassfish.grizzly.Connection;
2120
import org.glassfish.grizzly.WriteResult;
@@ -480,12 +479,12 @@ boolean sendRequest(final HttpTransactionContext httpTxCtx,
480479
final AsyncHandler h = httpTxCtx.getAsyncHandler();
481480

482481
// create HttpContext and mutually bind it with HttpTransactionContext
483-
final HttpContext httpCtx = HttpContext.newInstance(
484-
connection, connection, connection, requestPacket);
485-
requestPacket.getProcessingState().setHttpContext(httpCtx);
482+
final HttpContext httpCtx = new AhcHttpContext(
483+
connection, connection, connection, requestPacket, httpTxCtx);
486484
HttpTransactionContext.bind(httpCtx, httpTxCtx);
487-
httpCtx.attach(ctx);
488485

486+
requestPacket.getProcessingState().setHttpContext(httpCtx);
487+
httpCtx.attach(ctx);
489488

490489
if (h instanceof TransferCompletionHandler) {
491490
final FluentCaseInsensitiveStringsMap map

src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseBodyPart.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import java.nio.ByteBuffer;
2525
import java.util.concurrent.atomic.AtomicReference;
2626

27-
import static com.ning.http.client.providers.grizzly.ConnectionManager.isConnectionCacheable;
28-
import static com.ning.http.client.providers.grizzly.ConnectionManager.markConnectionAsDoNotCache;
2927

3028
/**
3129
* {@link HttpResponseBodyPart} implementation using the Grizzly 2.0 HTTP client
@@ -100,12 +98,12 @@ public boolean isLast() {
10098

10199
@Override
102100
public void markUnderlyingConnectionAsToBeClosed() {
103-
markConnectionAsDoNotCache(connection);
101+
content.getHttpHeader().getProcessingState().setKeepAlive(false);
104102
}
105103

106104
@Override
107105
public boolean isUnderlyingConnectionToBeClosed() {
108-
return !isConnectionCacheable(connection);
106+
return content.getHttpHeader().getProcessingState().isStayAlive();
109107
}
110108

111109
// ----------------------------------------------- Package Protected Methods

src/main/java/com/ning/http/client/providers/grizzly/HttpTransactionContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ static void bind(final HttpContext httpCtx,
111111
}
112112

113113
static HttpTransactionContext cleanupTransaction(final HttpContext httpCtx) {
114-
final HttpTransactionContext httpTxContext = REQUEST_STATE_ATTR.remove(httpCtx);
114+
final HttpTransactionContext httpTxContext = currentTransaction(httpCtx);
115115
if (httpTxContext != null) {
116116
httpCtx.getCloseable().removeCloseListener(httpTxContext.listener);
117117
}
@@ -129,7 +129,7 @@ static HttpTransactionContext currentTransaction(final AttributeStorage storage)
129129
}
130130

131131
static HttpTransactionContext currentTransaction(final HttpContext httpCtx) {
132-
return REQUEST_STATE_ATTR.get(httpCtx);
132+
return ((AhcHttpContext) httpCtx).getHttpTransactionContext();
133133
}
134134

135135
static HttpTransactionContext startTransaction(

0 commit comments

Comments
 (0)