Skip to content

Commit 5f8ba4c

Browse files
committed
+ Minor cleanup to the get() calls.
+ Add connection pool logging. + Fix cleanup() method to remove the HttpTransactionContext before offering the connection to the pool. + Switch queue implementations in the pool. + Performance changes: - make methods/classes of nested classes package private to eliminate parent class access. - cache the result of Request.getUrl() within HttpTransactionContext. - make the population of the AHC headers map lazy instead of performing the initialization during construction.
1 parent 568d097 commit 5f8ba4c

File tree

4 files changed

+107
-61
lines changed

4 files changed

+107
-61
lines changed

src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import java.io.UnsupportedEncodingException;
9797
import java.net.InetSocketAddress;
9898
import java.net.URI;
99+
import java.net.URL;
99100
import java.net.URLEncoder;
100101
import java.nio.ByteBuffer;
101102
import java.security.NoSuchAlgorithmException;
@@ -401,7 +402,7 @@ public void updated(WriteResult result) {
401402
}
402403

403404

404-
private void setHttpTransactionContext(final AttributeStorage storage,
405+
void setHttpTransactionContext(final AttributeStorage storage,
405406
final HttpTransactionContext httpTransactionState) {
406407

407408
if (httpTransactionState == null) {
@@ -412,7 +413,7 @@ private void setHttpTransactionContext(final AttributeStorage storage,
412413

413414
}
414415

415-
private HttpTransactionContext getHttpTransactionContext(final AttributeStorage storage) {
416+
HttpTransactionContext getHttpTransactionContext(final AttributeStorage storage) {
416417

417418
return REQUEST_STATE_ATTR.get(storage);
418419

@@ -497,26 +498,27 @@ boolean handleStatus(final HttpResponsePacket httpResponse,
497498
} // END StatusHandler
498499

499500

500-
private final class HttpTransactionContext {
501+
final class HttpTransactionContext {
501502

502-
private final AtomicInteger redirectCount = new AtomicInteger(0);
503+
final AtomicInteger redirectCount = new AtomicInteger(0);
503504

504-
private final int maxRedirectCount;
505-
private final boolean redirectsAllowed;
506-
private final GrizzlyAsyncHttpProvider provider =
505+
final int maxRedirectCount;
506+
final boolean redirectsAllowed;
507+
final GrizzlyAsyncHttpProvider provider =
507508
GrizzlyAsyncHttpProvider.this;
508509

509-
private Request request;
510-
private AsyncHandler handler;
511-
private BodyHandler bodyHandler;
512-
private StatusHandler statusHandler;
513-
private StatusHandler.InvocationStatus invocationStatus =
510+
Request request;
511+
String requestUrl;
512+
AsyncHandler handler;
513+
BodyHandler bodyHandler;
514+
StatusHandler statusHandler;
515+
StatusHandler.InvocationStatus invocationStatus =
514516
StatusHandler.InvocationStatus.CONTINUE;
515-
private GrizzlyResponseStatus responseStatus;
516-
private GrizzlyResponseFuture future;
517-
private String lastRedirectURI;
518-
private AtomicLong totalBodyWritten = new AtomicLong();
519-
private AsyncHandler.STATE currentState;
517+
GrizzlyResponseStatus responseStatus;
518+
GrizzlyResponseFuture future;
519+
String lastRedirectURI;
520+
AtomicLong totalBodyWritten = new AtomicLong();
521+
AsyncHandler.STATE currentState;
520522

521523

522524
// -------------------------------------------------------- Constructors
@@ -531,14 +533,15 @@ private final class HttpTransactionContext {
531533
this.handler = handler;
532534
redirectsAllowed = provider.clientConfig.isRedirectEnabled();
533535
maxRedirectCount = provider.clientConfig.getMaxRedirects();
536+
this.requestUrl = request.getUrl();
534537

535538
}
536539

537540

538541
// ----------------------------------------------------- Private Methods
539542

540543

541-
private HttpTransactionContext copy() {
544+
HttpTransactionContext copy() {
542545
final HttpTransactionContext newContext =
543546
new HttpTransactionContext(future,
544547
request,
@@ -554,20 +557,20 @@ private HttpTransactionContext copy() {
554557
}
555558

556559

557-
private void abort(final Throwable t) {
560+
void abort(final Throwable t) {
558561
if (future != null) {
559562
future.abort(t);
560563
}
561564
}
562565

563-
private void done(final Callable c) {
566+
void done(final Callable c) {
564567
if (future != null) {
565568
future.done(c);
566569
}
567570
}
568571

569572
@SuppressWarnings({"unchecked"})
570-
private void result(Object result) {
573+
void result(Object result) {
571574
if (future != null) {
572575
future.delegate.result(result);
573576
future.done(null);
@@ -699,7 +702,8 @@ private void sendAsGrizzlyRequest(final Request request,
699702
final FilterChainContext ctx)
700703
throws IOException {
701704

702-
final URI uri = AsyncHttpProviderUtils.createUri(request.getUrl());
705+
final HttpTransactionContext httpCtx = getHttpTransactionContext(ctx.getConnection());
706+
final URI uri = AsyncHttpProviderUtils.createUri(httpCtx.requestUrl);
703707
final HttpRequestPacket.Builder builder = HttpRequestPacket.builder();
704708

705709
builder.method(request.getMethod());
@@ -756,7 +760,7 @@ private void sendAsGrizzlyRequest(final Request request,
756760
}
757761
}
758762
}
759-
final AsyncHandler h = getHttpTransactionContext(ctx.getConnection()).handler;
763+
final AsyncHandler h = httpCtx.handler;
760764
if (TransferCompletionHandler.class.isAssignableFrom(h.getClass())) {
761765
final FluentCaseInsensitiveStringsMap map =
762766
new FluentCaseInsensitiveStringsMap(request.getHeaders());
@@ -1012,7 +1016,7 @@ protected void onInitialLineParsed(HttpHeader httpHeader,
10121016
}
10131017
final GrizzlyResponseStatus responseStatus =
10141018
new GrizzlyResponseStatus((HttpResponsePacket) httpHeader,
1015-
getURI(context.request.getUrl()),
1019+
getURI(context.requestUrl),
10161020
provider);
10171021
context.responseStatus = responseStatus;
10181022
if (context.statusHandler != null) {
@@ -1033,6 +1037,16 @@ protected void onInitialLineParsed(HttpHeader httpHeader,
10331037

10341038
}
10351039

1040+
@Override
1041+
protected void onHttpError(final HttpHeader httpHeader,
1042+
final FilterChainContext ctx,
1043+
final Throwable t) throws IOException {
1044+
httpHeader.setSkipRemainder(true);
1045+
final HttpTransactionContext context =
1046+
provider.getHttpTransactionContext(ctx.getConnection());
1047+
context.abort(t);
1048+
}
1049+
10361050
@SuppressWarnings({"unchecked"})
10371051
@Override
10381052
protected void onHttpHeadersParsed(HttpHeader httpHeader,
@@ -1162,20 +1176,21 @@ private static boolean isRedirectAllowed(final HttpTransactionContext ctx) {
11621176
private static HttpTransactionContext cleanup(final FilterChainContext ctx,
11631177
final GrizzlyAsyncHttpProvider provider) {
11641178

1179+
final Connection c = ctx.getConnection();
11651180
final HttpTransactionContext context =
1166-
provider.getHttpTransactionContext(ctx.getConnection());
1167-
1168-
if (!context.provider.connectionManager.canReturnConnection(ctx.getConnection())) {
1181+
provider.getHttpTransactionContext(c);
1182+
context.provider.setHttpTransactionContext(c, null);
1183+
if (!context.provider.connectionManager.canReturnConnection(c)) {
11691184
context.abort(new IOException("Maximum pooled connections exceeded"));
11701185
} else {
1171-
if (!context.provider.connectionManager.returnConnection(context.request.getUrl(), ctx.getConnection())) {
1186+
if (!context.provider.connectionManager.returnConnection(context.requestUrl, c)) {
11721187
try {
11731188
ctx.getConnection().close().markForRecycle(true);
11741189
} catch (IOException ignored) {
11751190
}
11761191
}
11771192
}
1178-
context.provider.setHttpTransactionContext(ctx.getConnection(), null);
1193+
11791194
return context;
11801195

11811196
}
@@ -1242,7 +1257,7 @@ public boolean handleStatus(final HttpResponsePacket responsePacket,
12421257
final Request req = httpTransactionContext.request;
12431258
realm = new Realm.RealmBuilder().clone(realm)
12441259
.setScheme(realm.getAuthScheme())
1245-
.setUri(URI.create(req.getUrl()).getPath())
1260+
.setUri(URI.create(httpTransactionContext.requestUrl).getPath())
12461261
.setMethodName(req.getMethod())
12471262
.setUsePreemptiveAuth(true)
12481263
.parseWWWAuthenticateHeader(auth)
@@ -1319,9 +1334,9 @@ public boolean handleStatus(final HttpResponsePacket responsePacket,
13191334

13201335
URI orig;
13211336
if (httpTransactionContext.lastRedirectURI == null) {
1322-
orig = AsyncHttpProviderUtils.createUri(httpTransactionContext.request.getUrl());
1337+
orig = AsyncHttpProviderUtils.createUri(httpTransactionContext.requestUrl);
13231338
} else {
1324-
orig = AsyncHttpProviderUtils.getRedirectUri(AsyncHttpProviderUtils.createUri(httpTransactionContext.request.getUrl()),
1339+
orig = AsyncHttpProviderUtils.getRedirectUri(AsyncHttpProviderUtils.createUri(httpTransactionContext.requestUrl),
13251340
httpTransactionContext.lastRedirectURI);
13261341
}
13271342
httpTransactionContext.lastRedirectURI = redirectURL;
@@ -1359,6 +1374,7 @@ public boolean handleStatus(final HttpResponsePacket responsePacket,
13591374
httpTransactionContext.future = null;
13601375
newContext.invocationStatus = InvocationStatus.CONTINUE;
13611376
newContext.request = requestToSend;
1377+
newContext.requestUrl = requestToSend.getUrl();
13621378
httpTransactionContext.provider.setHttpTransactionContext(c, newContext);
13631379
httpTransactionContext.provider.execute(c,
13641380
requestToSend,
@@ -1959,9 +1975,9 @@ static boolean isConnectionCacheable(final Connection c) {
19591975
return ((canCache != null) ? canCache : false);
19601976
}
19611977

1962-
private void doAsyncTrackedConnection(final Request request,
1963-
final GrizzlyResponseFuture requestFuture,
1964-
final CompletionHandler<Connection> connectHandler)
1978+
void doAsyncTrackedConnection(final Request request,
1979+
final GrizzlyResponseFuture requestFuture,
1980+
final CompletionHandler<Connection> connectHandler)
19651981
throws IOException, ExecutionException, InterruptedException {
19661982
final String url = request.getUrl();
19671983
Connection c = pool.poll(AsyncHttpProviderUtils.getBaseUrl(url));
@@ -2010,10 +2026,10 @@ Connection obtainConnection(final Request request,
20102026

20112027
}
20122028

2013-
private void doAsyncConnect(final String url,
2014-
final Request request,
2015-
final GrizzlyResponseFuture requestFuture,
2016-
final CompletionHandler<Connection> connectHandler)
2029+
void doAsyncConnect(final String url,
2030+
final Request request,
2031+
final GrizzlyResponseFuture requestFuture,
2032+
final CompletionHandler<Connection> connectHandler)
20172033
throws IOException, ExecutionException, InterruptedException {
20182034

20192035
final URI uri = AsyncHttpProviderUtils.createUri(url);

src/main/java/com/ning/http/client/providers/grizzly/GrizzlyConnectionsPool.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.concurrent.BlockingQueue;
3030
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.ConcurrentLinkedQueue;
3132
import java.util.concurrent.ExecutorService;
3233
import java.util.concurrent.Executors;
3334
import java.util.concurrent.TimeUnit;
@@ -67,8 +68,10 @@ public GrizzlyConnectionsPool(final AsyncHttpClientConfig config) {
6768
listener = new Connection.CloseListener() {
6869
@Override
6970
public void onClosed(Connection connection, Connection.CloseType closeType) throws IOException {
70-
if (LOG.isInfoEnabled()) {
71-
LOG.info("Remote closed connection ({}). Removing from cache", connection.toString());
71+
if (closeType == Connection.CloseType.REMOTELY) {
72+
if (LOG.isInfoEnabled()) {
73+
LOG.info("Remote closed connection ({}). Removing from cache", connection.toString());
74+
}
7275
}
7376
GrizzlyConnectionsPool.this.removeAll(connection);
7477
}
@@ -91,6 +94,10 @@ public boolean offer(String uri, Connection connection) {
9194

9295
DelayedExecutor.IdleConnectionQueue conQueue = connectionsPool.get(uri);
9396
if (conQueue == null) {
97+
if (LOG.isDebugEnabled()) {
98+
LOG.debug("Creating new Connection queue for uri [{}] and connection [{}]",
99+
new Object[]{uri, connection});
100+
}
94101
DelayedExecutor.IdleConnectionQueue newPool =
95102
delayedExecutor.createIdleConnectionQueue(timeout);
96103
conQueue = connectionsPool.putIfAbsent(uri, newPool);
@@ -100,13 +107,21 @@ public boolean offer(String uri, Connection connection) {
100107
}
101108

102109
final int size = conQueue.size();
103-
104110
if (maxConnectionsPerHost == -1 || size < maxConnectionsPerHost) {
105111
conQueue.offer(connection);
106112
connection.addCloseListener(listener);
107-
totalCachedConnections.incrementAndGet();
113+
final int total = totalCachedConnections.incrementAndGet();
114+
if (LOG.isDebugEnabled()) {
115+
LOG.debug("[offer] Pooling connection [{}] for uri [{}]. Current size (for host; before pooling): [{}]. Max size (for host): [{}]. Total number of cached connections: [{}].",
116+
new Object[]{connection, uri, size, maxConnectionsPerHost, total});
117+
}
108118
return true;
109119
}
120+
if (LOG.isDebugEnabled()) {
121+
LOG.debug("[offer] Unable to pool connection [{}] for uri [{}]. Current size (for host): [{}]. Max size (for host): [{}]. Total number of cached connections: [{}].",
122+
new Object[]{connection, uri, size, maxConnectionsPerHost, totalCachedConnections.get()});
123+
}
124+
110125
return false;
111126
}
112127

@@ -136,11 +151,18 @@ public Connection poll(String uri) {
136151
connection = null;
137152
}
138153
}
154+
} else {
155+
if (LOG.isDebugEnabled()) {
156+
LOG.debug("[poll] No existing queue for uri [{}].",
157+
new Object[]{uri});
158+
}
139159
}
140160
if (connection != null) {
161+
if (LOG.isDebugEnabled()) {
162+
LOG.debug("[poll] Found pooled connection [{}] for uri [{}].",
163+
new Object[]{connection, uri});
164+
}
141165
totalCachedConnections.decrementAndGet();
142-
}
143-
if (connection != null) {
144166
connection.removeCloseListener(listener);
145167
}
146168
return connection;
@@ -306,8 +328,8 @@ public void run() {
306328
delayQueue.queue.offer(element);
307329
} else {
308330
try {
309-
if (LOG.isInfoEnabled()) {
310-
LOG.info("Idle connection ({}) detected. Removing from cache.", element.toString());
331+
if (LOG.isDebugEnabled()) {
332+
LOG.debug("Idle connection ({}) detected. Removing from cache.", element.toString());
311333
}
312334
element.close().markForRecycle(true);
313335
} catch (Exception ignored) {
@@ -332,8 +354,8 @@ public void run() {
332354

333355

334356
final class IdleConnectionQueue {
335-
final BlockingQueue<Connection> queue =
336-
DataStructures.getLTQInstance(Connection.class);
357+
final ConcurrentLinkedQueue<Connection> queue =
358+
new ConcurrentLinkedQueue<Connection>();
337359

338360

339361
final TimeoutResolver resolver = new TimeoutResolver();

src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseFuture.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,7 @@ public boolean isDone() {
142142

143143
public V get() throws InterruptedException, ExecutionException {
144144

145-
try {
146-
return get(60, TimeUnit.SECONDS);
147-
} catch (TimeoutException te) {
148-
throw new ExecutionException(te);
149-
}
145+
return delegate.get();
150146

151147
}
152148

src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseHeaders.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,21 @@
3030
*/
3131
public class GrizzlyResponseHeaders extends HttpResponseHeaders {
3232

33-
private final FluentCaseInsensitiveStringsMap headers = new FluentCaseInsensitiveStringsMap();
33+
private final FluentCaseInsensitiveStringsMap headers =
34+
new FluentCaseInsensitiveStringsMap();
35+
private final HttpResponsePacket response;
36+
private volatile boolean initialized;
3437

3538
// ------------------------------------------------------------ Constructors
3639

3740

3841
public GrizzlyResponseHeaders(final HttpResponsePacket response,
3942
final URI uri,
4043
final AsyncHttpProvider provider) {
44+
4145
super(uri, provider);
46+
this.response = response;
4247

43-
final MimeHeaders headersLocal = response.getHeaders();
44-
for (String name : headersLocal.names()) {
45-
for (String header : headersLocal.values(name)) {
46-
headers.add(name, header);
47-
}
48-
}
4948
}
5049

5150

@@ -57,6 +56,19 @@ public GrizzlyResponseHeaders(final HttpResponsePacket response,
5756
*/
5857
@Override
5958
public FluentCaseInsensitiveStringsMap getHeaders() {
59+
if (!initialized) {
60+
synchronized (headers) {
61+
if (!initialized) {
62+
initialized = true;
63+
final MimeHeaders headersLocal = response.getHeaders();
64+
for (String name : headersLocal.names()) {
65+
for (String header : headersLocal.values(name)) {
66+
headers.add(name, header);
67+
}
68+
}
69+
}
70+
}
71+
}
6072
return headers;
6173
}
6274

0 commit comments

Comments
 (0)