diff --git a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java index 6badb43b38..592af17ad7 100644 --- a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java +++ b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java @@ -96,6 +96,7 @@ import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.net.URI; +import java.net.URL; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; @@ -401,7 +402,7 @@ public void updated(WriteResult result) { } - private void setHttpTransactionContext(final AttributeStorage storage, + void setHttpTransactionContext(final AttributeStorage storage, final HttpTransactionContext httpTransactionState) { if (httpTransactionState == null) { @@ -412,14 +413,14 @@ private void setHttpTransactionContext(final AttributeStorage storage, } - private HttpTransactionContext getHttpTransactionContext(final AttributeStorage storage) { + HttpTransactionContext getHttpTransactionContext(final AttributeStorage storage) { return REQUEST_STATE_ATTR.get(storage); } - private void timeout(final Connection c) { + void timeout(final Connection c) { final HttpTransactionContext context = getHttpTransactionContext(c); setHttpTransactionContext(c, null); @@ -427,7 +428,7 @@ private void timeout(final Connection c) { } - private static int getPort(final URI uri, final int p) { + static int getPort(final URI uri, final int p) { int port = p; if (port == -1) { final String protocol = uri.getScheme().toLowerCase(); @@ -444,9 +445,9 @@ private static int getPort(final URI uri, final int p) { @SuppressWarnings({"unchecked"}) - private void sendRequest(final FilterChainContext ctx, - final Request request, - final HttpRequestPacket requestPacket) + void sendRequest(final FilterChainContext ctx, + final Request request, + final HttpRequestPacket requestPacket) throws IOException { if (requestHasEntityBody(request)) { @@ -497,26 +498,27 @@ boolean handleStatus(final HttpResponsePacket httpResponse, } // END StatusHandler - private final class HttpTransactionContext { + final class HttpTransactionContext { - private final AtomicInteger redirectCount = new AtomicInteger(0); + final AtomicInteger redirectCount = new AtomicInteger(0); - private final int maxRedirectCount; - private final boolean redirectsAllowed; - private final GrizzlyAsyncHttpProvider provider = + final int maxRedirectCount; + final boolean redirectsAllowed; + final GrizzlyAsyncHttpProvider provider = GrizzlyAsyncHttpProvider.this; - private Request request; - private AsyncHandler handler; - private BodyHandler bodyHandler; - private StatusHandler statusHandler; - private StatusHandler.InvocationStatus invocationStatus = + Request request; + String requestUrl; + AsyncHandler handler; + BodyHandler bodyHandler; + StatusHandler statusHandler; + StatusHandler.InvocationStatus invocationStatus = StatusHandler.InvocationStatus.CONTINUE; - private GrizzlyResponseStatus responseStatus; - private GrizzlyResponseFuture future; - private String lastRedirectURI; - private AtomicLong totalBodyWritten = new AtomicLong(); - private AsyncHandler.STATE currentState; + GrizzlyResponseStatus responseStatus; + GrizzlyResponseFuture future; + String lastRedirectURI; + AtomicLong totalBodyWritten = new AtomicLong(); + AsyncHandler.STATE currentState; // -------------------------------------------------------- Constructors @@ -531,6 +533,7 @@ private final class HttpTransactionContext { this.handler = handler; redirectsAllowed = provider.clientConfig.isRedirectEnabled(); maxRedirectCount = provider.clientConfig.getMaxRedirects(); + this.requestUrl = request.getUrl(); } @@ -538,7 +541,7 @@ private final class HttpTransactionContext { // ----------------------------------------------------- Private Methods - private HttpTransactionContext copy() { + HttpTransactionContext copy() { final HttpTransactionContext newContext = new HttpTransactionContext(future, request, @@ -554,20 +557,20 @@ private HttpTransactionContext copy() { } - private void abort(final Throwable t) { + void abort(final Throwable t) { if (future != null) { future.abort(t); } } - private void done(final Callable c) { + void done(final Callable c) { if (future != null) { future.done(c); } } @SuppressWarnings({"unchecked"}) - private void result(Object result) { + void result(Object result) { if (future != null) { future.delegate.result(result); future.done(null); @@ -699,7 +702,8 @@ private void sendAsGrizzlyRequest(final Request request, final FilterChainContext ctx) throws IOException { - final URI uri = AsyncHttpProviderUtils.createUri(request.getUrl()); + final HttpTransactionContext httpCtx = getHttpTransactionContext(ctx.getConnection()); + final URI uri = AsyncHttpProviderUtils.createUri(httpCtx.requestUrl); final HttpRequestPacket.Builder builder = HttpRequestPacket.builder(); builder.method(request.getMethod()); @@ -756,7 +760,7 @@ private void sendAsGrizzlyRequest(final Request request, } } } - final AsyncHandler h = getHttpTransactionContext(ctx.getConnection()).handler; + final AsyncHandler h = httpCtx.handler; if (TransferCompletionHandler.class.isAssignableFrom(h.getClass())) { final FluentCaseInsensitiveStringsMap map = new FluentCaseInsensitiveStringsMap(request.getHeaders()); @@ -954,10 +958,10 @@ protected void onHttpContentEncoded(HttpContent content, FilterChainContext ctx) final AsyncHandler handler = context.handler; if (TransferCompletionHandler.class.isAssignableFrom(handler.getClass())) { final int written = content.getContent().remaining(); - context.totalBodyWritten.addAndGet(written); + final long total = context.totalBodyWritten.addAndGet(written); ((TransferCompletionHandler) handler).onContentWriteProgress( written, - context.totalBodyWritten.get(), + total, content.getHttpHeader().getContentLength()); } } @@ -1012,7 +1016,7 @@ protected void onInitialLineParsed(HttpHeader httpHeader, } final GrizzlyResponseStatus responseStatus = new GrizzlyResponseStatus((HttpResponsePacket) httpHeader, - getURI(context.request.getUrl()), + getURI(context.requestUrl), provider); context.responseStatus = responseStatus; if (context.statusHandler != null) { @@ -1033,6 +1037,16 @@ protected void onInitialLineParsed(HttpHeader httpHeader, } + @Override + protected void onHttpError(final HttpHeader httpHeader, + final FilterChainContext ctx, + final Throwable t) throws IOException { + httpHeader.setSkipRemainder(true); + final HttpTransactionContext context = + provider.getHttpTransactionContext(ctx.getConnection()); + context.abort(t); + } + @SuppressWarnings({"unchecked"}) @Override protected void onHttpHeadersParsed(HttpHeader httpHeader, @@ -1042,6 +1056,11 @@ protected void onHttpHeadersParsed(HttpHeader httpHeader, if (LOGGER.isDebugEnabled()) { LOGGER.debug("RESPONSE: " + httpHeader.toString()); } + if (httpHeader.containsHeader(Header.Connection)) { + if ("close".equals(httpHeader.getHeader(Header.Connection))) { + ConnectionManager.markConnectionAsDoNotCache(ctx.getConnection()); + } + } if (httpHeader.isSkipRemainder()) { return; } @@ -1157,20 +1176,21 @@ private static boolean isRedirectAllowed(final HttpTransactionContext ctx) { private static HttpTransactionContext cleanup(final FilterChainContext ctx, final GrizzlyAsyncHttpProvider provider) { + final Connection c = ctx.getConnection(); final HttpTransactionContext context = - provider.getHttpTransactionContext(ctx.getConnection()); - - if (!context.provider.connectionManager.canReturnConnection(ctx.getConnection())) { + provider.getHttpTransactionContext(c); + context.provider.setHttpTransactionContext(c, null); + if (!context.provider.connectionManager.canReturnConnection(c)) { context.abort(new IOException("Maximum pooled connections exceeded")); } else { - if (!context.provider.connectionManager.returnConnection(context.request.getUrl(), ctx.getConnection())) { + if (!context.provider.connectionManager.returnConnection(context.requestUrl, c)) { try { ctx.getConnection().close().markForRecycle(true); } catch (IOException ignored) { } } } - context.provider.setHttpTransactionContext(ctx.getConnection(), null); + return context; } @@ -1237,7 +1257,7 @@ public boolean handleStatus(final HttpResponsePacket responsePacket, final Request req = httpTransactionContext.request; realm = new Realm.RealmBuilder().clone(realm) .setScheme(realm.getAuthScheme()) - .setUri(URI.create(req.getUrl()).getPath()) + .setUri(URI.create(httpTransactionContext.requestUrl).getPath()) .setMethodName(req.getMethod()) .setUsePreemptiveAuth(true) .parseWWWAuthenticateHeader(auth) @@ -1314,9 +1334,9 @@ public boolean handleStatus(final HttpResponsePacket responsePacket, URI orig; if (httpTransactionContext.lastRedirectURI == null) { - orig = AsyncHttpProviderUtils.createUri(httpTransactionContext.request.getUrl()); + orig = AsyncHttpProviderUtils.createUri(httpTransactionContext.requestUrl); } else { - orig = AsyncHttpProviderUtils.getRedirectUri(AsyncHttpProviderUtils.createUri(httpTransactionContext.request.getUrl()), + orig = AsyncHttpProviderUtils.getRedirectUri(AsyncHttpProviderUtils.createUri(httpTransactionContext.requestUrl), httpTransactionContext.lastRedirectURI); } httpTransactionContext.lastRedirectURI = redirectURL; @@ -1354,6 +1374,7 @@ public boolean handleStatus(final HttpResponsePacket responsePacket, httpTransactionContext.future = null; newContext.invocationStatus = InvocationStatus.CONTINUE; newContext.request = requestToSend; + newContext.requestUrl = requestToSend.getUrl(); httpTransactionContext.provider.setHttpTransactionContext(c, newContext); httpTransactionContext.provider.execute(c, requestToSend, @@ -1637,7 +1658,7 @@ public void doHandle(final FilterChainContext ctx, ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); } - } // END StringBodyHandler + } // END NoBodyHandler private final class ParamsBodyHandler implements BodyHandler { @@ -1666,18 +1687,21 @@ public void doHandle(final FilterChainContext ctx, charset = Charsets.DEFAULT_CHARACTER_ENCODING; } final FluentStringsMap params = request.getParams(); - for (Map.Entry> entry : params.entrySet()) { - String name = entry.getKey(); - List values = entry.getValue(); - if (values != null && !values.isEmpty()) { - if (sb == null) { - sb = new StringBuilder(128); - } - for (String value : values) { - if (sb.length() > 0) { - sb.append('&'); + if (!params.isEmpty()) { + for (Map.Entry> entry : params.entrySet()) { + String name = entry.getKey(); + List values = entry.getValue(); + if (values != null && !values.isEmpty()) { + if (sb == null) { + sb = new StringBuilder(128); + } + for (String value : values) { + if (sb.length() > 0) { + sb.append('&'); + } + sb.append(URLEncoder.encode(name, charset)) + .append('=').append(URLEncoder.encode(value, charset)); } - sb.append(URLEncoder.encode(name, charset)).append('=').append(URLEncoder.encode(value, charset)); } } } @@ -1954,9 +1978,9 @@ static boolean isConnectionCacheable(final Connection c) { return ((canCache != null) ? canCache : false); } - private void doAsyncTrackedConnection(final Request request, - final GrizzlyResponseFuture requestFuture, - final CompletionHandler connectHandler) + void doAsyncTrackedConnection(final Request request, + final GrizzlyResponseFuture requestFuture, + final CompletionHandler connectHandler) throws IOException, ExecutionException, InterruptedException { final String url = request.getUrl(); Connection c = pool.poll(AsyncHttpProviderUtils.getBaseUrl(url)); @@ -1995,7 +2019,7 @@ Connection obtainTrackedConnection(final Request request, Connection obtainConnection(final Request request, final GrizzlyResponseFuture requestFuture) - throws IOException, ExecutionException, InterruptedException { + throws IOException, ExecutionException, InterruptedException, TimeoutException { final Connection c = (obtainConnection0(request.getUrl(), request, @@ -2005,10 +2029,10 @@ Connection obtainConnection(final Request request, } - private void doAsyncConnect(final String url, - final Request request, - final GrizzlyResponseFuture requestFuture, - final CompletionHandler connectHandler) + void doAsyncConnect(final String url, + final Request request, + final GrizzlyResponseFuture requestFuture, + final CompletionHandler connectHandler) throws IOException, ExecutionException, InterruptedException { final URI uri = AsyncHttpProviderUtils.createUri(url); @@ -2026,7 +2050,7 @@ private void doAsyncConnect(final String url, private Connection obtainConnection0(final String url, final Request request, final GrizzlyResponseFuture requestFuture) - throws IOException, ExecutionException, InterruptedException { + throws IOException, ExecutionException, InterruptedException, TimeoutException { final URI uri = AsyncHttpProviderUtils.createUri(url); ProxyServer proxy = getProxyServer(request); @@ -2035,8 +2059,18 @@ private Connection obtainConnection0(final String url, } String host = ((proxy != null) ? proxy.getHost() : uri.getHost()); int port = ((proxy != null) ? proxy.getPort() : uri.getPort()); - return connectionHandler.connect(new InetSocketAddress(host, getPort(uri, port)), - createConnectionCompletionHandler(request, requestFuture, null)).get(); + int cTimeout = provider.clientConfig.getConnectionTimeoutInMs(); + if (cTimeout > 0) { + return connectionHandler.connect(new InetSocketAddress(host, getPort(uri, port)), + createConnectionCompletionHandler(request, + requestFuture, + null)).get(cTimeout, TimeUnit.MILLISECONDS); + } else { + return connectionHandler.connect(new InetSocketAddress(host, getPort(uri, port)), + createConnectionCompletionHandler(request, + requestFuture, + null)).get(); + } } diff --git a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyConnectionsPool.java b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyConnectionsPool.java index 70bfda5a8f..f6649035f1 100644 --- a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyConnectionsPool.java +++ b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyConnectionsPool.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -67,8 +68,10 @@ public GrizzlyConnectionsPool(final AsyncHttpClientConfig config) { listener = new Connection.CloseListener() { @Override public void onClosed(Connection connection, Connection.CloseType closeType) throws IOException { - if (LOG.isInfoEnabled()) { - LOG.info("Remote closed connection ({}). Removing from cache", connection.toString()); + if (closeType == Connection.CloseType.REMOTELY) { + if (LOG.isInfoEnabled()) { + LOG.info("Remote closed connection ({}). Removing from cache", connection.toString()); + } } GrizzlyConnectionsPool.this.removeAll(connection); } @@ -91,6 +94,10 @@ public boolean offer(String uri, Connection connection) { DelayedExecutor.IdleConnectionQueue conQueue = connectionsPool.get(uri); if (conQueue == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating new Connection queue for uri [{}] and connection [{}]", + new Object[]{uri, connection}); + } DelayedExecutor.IdleConnectionQueue newPool = delayedExecutor.createIdleConnectionQueue(timeout); conQueue = connectionsPool.putIfAbsent(uri, newPool); @@ -100,13 +107,21 @@ public boolean offer(String uri, Connection connection) { } final int size = conQueue.size(); - if (maxConnectionsPerHost == -1 || size < maxConnectionsPerHost) { conQueue.offer(connection); connection.addCloseListener(listener); - totalCachedConnections.incrementAndGet(); + final int total = totalCachedConnections.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("[offer] Pooling connection [{}] for uri [{}]. Current size (for host; before pooling): [{}]. Max size (for host): [{}]. Total number of cached connections: [{}].", + new Object[]{connection, uri, size, maxConnectionsPerHost, total}); + } return true; } + if (LOG.isDebugEnabled()) { + LOG.debug("[offer] Unable to pool connection [{}] for uri [{}]. Current size (for host): [{}]. Max size (for host): [{}]. Total number of cached connections: [{}].", + new Object[]{connection, uri, size, maxConnectionsPerHost, totalCachedConnections.get()}); + } + return false; } @@ -125,7 +140,7 @@ public Connection poll(String uri) { if (conQueue != null) { boolean poolEmpty = false; while (!poolEmpty && connection == null) { - if (conQueue.size() > 0) { + if (!conQueue.isEmpty()) { connection = conQueue.poll(); } @@ -136,11 +151,18 @@ public Connection poll(String uri) { connection = null; } } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("[poll] No existing queue for uri [{}].", + new Object[]{uri}); + } } if (connection != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("[poll] Found pooled connection [{}] for uri [{}].", + new Object[]{connection, uri}); + } totalCachedConnections.decrementAndGet(); - } - if (connection != null) { connection.removeCloseListener(listener); } return connection; @@ -306,8 +328,8 @@ public void run() { delayQueue.queue.offer(element); } else { try { - if (LOG.isInfoEnabled()) { - LOG.info("Idle connection ({}) detected. Removing from cache.", element.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Idle connection ({}) detected. Removing from cache.", element.toString()); } element.close().markForRecycle(true); } catch (Exception ignored) { @@ -332,8 +354,8 @@ public void run() { final class IdleConnectionQueue { - final BlockingQueue queue = - DataStructures.getLTQInstance(Connection.class); + final ConcurrentLinkedQueue queue = + new ConcurrentLinkedQueue(); final TimeoutResolver resolver = new TimeoutResolver(); @@ -350,18 +372,18 @@ public IdleConnectionQueue(final long timeout) { // ------------------------------------------------- Private Methods - private void offer(final Connection c) { + void offer(final Connection c) { if (timeout >= 0) { resolver.setTimeoutMs(c, System.currentTimeMillis() + timeout); } queue.offer(c); } - private Connection poll() { + Connection poll() { return queue.poll(); } - public boolean remove(final Connection c) { + boolean remove(final Connection c) { if (timeout >= 0) { resolver.removeTimeout(c); @@ -369,11 +391,15 @@ public boolean remove(final Connection c) { return queue.remove(c); } - public int size() { + int size() { return queue.size(); } + + boolean isEmpty() { + return queue.isEmpty(); + } - public void destroy() { + void destroy() { try { for (Connection c : queue) { c.close().markForRecycle(true); @@ -390,7 +416,7 @@ public void destroy() { // ------------------------------------------------------ Nested Classes - private static final class TimeoutResolver { + static final class TimeoutResolver { private static final String IDLE_ATTRIBUTE_NAME = "grizzly-ahc-conn-pool-idle-attribute"; private static final Attribute IDLE_ATTR = @@ -407,25 +433,25 @@ public IdleRecord evaluate() { // ------------------------------------------------- Private Methods - private boolean removeTimeout(final Connection c) { + boolean removeTimeout(final Connection c) { IDLE_ATTR.get(c).timeoutMs = 0; return true; } - private Long getTimeoutMs(final Connection c) { + Long getTimeoutMs(final Connection c) { return IDLE_ATTR.get(c).timeoutMs; } - private void setTimeoutMs(final Connection c, final long timeoutMs) { + void setTimeoutMs(final Connection c, final long timeoutMs) { IDLE_ATTR.get(c).timeoutMs = timeoutMs; } // -------------------------------------------------- Nested Classes - private static final class IdleRecord { + static final class IdleRecord { - private volatile long timeoutMs; + volatile long timeoutMs; } // END IdleRecord diff --git a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseFuture.java b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseFuture.java index b61ec69089..970599d956 100644 --- a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseFuture.java +++ b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseFuture.java @@ -142,11 +142,7 @@ public boolean isDone() { public V get() throws InterruptedException, ExecutionException { - try { - return get(60, TimeUnit.SECONDS); - } catch (TimeoutException te) { - throw new ExecutionException(te); - } + return delegate.get(); } diff --git a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseHeaders.java b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseHeaders.java index ed362c23d9..e7f0ac8a35 100644 --- a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseHeaders.java +++ b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyResponseHeaders.java @@ -30,7 +30,10 @@ */ public class GrizzlyResponseHeaders extends HttpResponseHeaders { - private final FluentCaseInsensitiveStringsMap headers = new FluentCaseInsensitiveStringsMap(); + private final FluentCaseInsensitiveStringsMap headers = + new FluentCaseInsensitiveStringsMap(); + private final HttpResponsePacket response; + private volatile boolean initialized; // ------------------------------------------------------------ Constructors @@ -38,14 +41,10 @@ public class GrizzlyResponseHeaders extends HttpResponseHeaders { public GrizzlyResponseHeaders(final HttpResponsePacket response, final URI uri, final AsyncHttpProvider provider) { + super(uri, provider); + this.response = response; - final MimeHeaders headersLocal = response.getHeaders(); - for (String name : headersLocal.names()) { - for (String header : headersLocal.values(name)) { - headers.add(name, header); - } - } } @@ -57,6 +56,19 @@ public GrizzlyResponseHeaders(final HttpResponsePacket response, */ @Override public FluentCaseInsensitiveStringsMap getHeaders() { + if (!initialized) { + synchronized (headers) { + if (!initialized) { + initialized = true; + final MimeHeaders headersLocal = response.getHeaders(); + for (String name : headersLocal.names()) { + for (String header : headersLocal.values(name)) { + headers.add(name, header); + } + } + } + } + } return headers; } diff --git a/src/main/java/com/ning/http/client/providers/jdk/JDKAsyncHttpProvider.java b/src/main/java/com/ning/http/client/providers/jdk/JDKAsyncHttpProvider.java index 457f3c3cfa..490cee3429 100644 --- a/src/main/java/com/ning/http/client/providers/jdk/JDKAsyncHttpProvider.java +++ b/src/main/java/com/ning/http/client/providers/jdk/JDKAsyncHttpProvider.java @@ -41,8 +41,6 @@ import com.ning.http.util.ProxyUtils; import com.ning.http.util.SslUtils; import com.ning.http.util.UTF8UrlEncoder; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -626,7 +624,6 @@ private void configure(URI uri, HttpURLConnection urlConnection, Request request urlConnection.setRequestProperty("Content-Type", mre.getContentType()); urlConnection.setRequestProperty("Content-Length", String.valueOf(mre.getContentLength())); - ChannelBuffer b = ChannelBuffers.dynamicBuffer(lenght); mre.writeRequest(urlConnection.getOutputStream()); } else if (request.getEntityWriter() != null) { int lenght = (int) request.getContentLength(); diff --git a/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java b/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java index 929107e0db..af988c394a 100644 --- a/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java +++ b/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java @@ -554,6 +554,11 @@ private static HttpRequest construct(AsyncHttpClientConfig config, if (config.isCompressionEnabled()) { nettyRequest.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); } + } else { + List auth = request.getHeaders().get(HttpHeaders.Names.PROXY_AUTHORIZATION); + if (auth != null && auth.size() > 0 && auth.get(0).startsWith("NTLM")) { + nettyRequest.addHeader(HttpHeaders.Names.PROXY_AUTHORIZATION, auth.get(0)); + } } ProxyServer proxyServer = request.getProxyServer() != null ? request.getProxyServer() : config.getProxyServer(); Realm realm = request.getRealm() != null ? request.getRealm() : config.getRealm(); @@ -626,8 +631,24 @@ private static HttpRequest construct(AsyncHttpClientConfig config, } if (proxyServer.getPrincipal() != null) { - nettyRequest.setHeader(HttpHeaders.Names.PROXY_AUTHORIZATION, - AuthenticatorUtils.computeBasicAuthentication(proxyServer)); + if (proxyServer.getNtlmDomain() != null) { + + List auth = request.getHeaders().get(HttpHeaders.Names.PROXY_AUTHORIZATION); + if (!(auth != null && auth.size() > 0 && auth.get(0).startsWith("NTLM"))) { + try { + String msg = ntlmEngine.generateType1Msg(proxyServer.getNtlmDomain(), + proxyServer.getHost()); + nettyRequest.setHeader(HttpHeaders.Names.PROXY_AUTHORIZATION, "NTLM " + msg); + } catch (NTLMEngineException e) { + IOException ie = new IOException(); + ie.initCause(e); + throw ie; + } + } + } else { + nettyRequest.setHeader(HttpHeaders.Names.PROXY_AUTHORIZATION, + AuthenticatorUtils.computeBasicAuthentication(proxyServer)); + } } } @@ -824,11 +845,12 @@ private ListenableFuture doConnect(final Request request, final AsyncHand boolean useSSl = uri.getScheme().compareToIgnoreCase(HTTPS) == 0 && proxyServer == null; if (channel != null && channel.isOpen() && channel.isConnected()) { - HttpRequest nettyRequest = buildRequest(config, request, uri, false, bufferedBytes); + HttpRequest nettyRequest = buildRequest(config, request, uri, f == null ? false : f.isConnectAllowed(), bufferedBytes); if (f == null) { f = newFuture(uri, request, asyncHandler, nettyRequest, config, this); } else { + nettyRequest = buildRequest(config, request, uri, f.isConnectAllowed(), bufferedBytes); f.setNettyRequest(nettyRequest); } f.setState(NettyResponseFuture.STATE.POOLED); @@ -905,10 +927,9 @@ private ListenableFuture doConnect(final Request request, final AsyncHand } try { - if (request.getInetAddress() != null) { - channelFuture = bootstrap.connect(new InetSocketAddress(request.getInetAddress(), AsyncHttpProviderUtils.getPort(uri))); - } - else if (proxyServer == null || avoidProxy) { + if (request.getInetAddress() != null) { + channelFuture = bootstrap.connect(new InetSocketAddress(request.getInetAddress(), AsyncHttpProviderUtils.getPort(uri))); + } else if (proxyServer == null || avoidProxy) { channelFuture = bootstrap.connect(new InetSocketAddress(AsyncHttpProviderUtils.getHost(uri), AsyncHttpProviderUtils.getPort(uri))); } else { channelFuture = bootstrap.connect(new InetSocketAddress(proxyServer.getHost(), proxyServer.getPort())); @@ -1159,8 +1180,9 @@ public Object call() throws Exception { log.debug("Sending proxy authentication to {}", request.getUrl()); future.setState(NettyResponseFuture.STATE.NEW); - if (!proxyAuth.contains("Kerberos") && (proxyAuth.contains("NTLM") || (proxyAuth.contains("Negotiate")))) { - newRealm = ntlmChallenge(proxyAuth, request, proxyServer, headers, realm, future); + + if (!proxyAuth.contains("Kerberos") && (proxyAuth.get(0).contains("NTLM") || (proxyAuth.contains("Negotiate")))) { + newRealm = ntlmProxyChallenge(proxyAuth, request, proxyServer, headers, realm, future); // SPNEGO KERBEROS } else if (proxyAuth.contains("Negotiate")) { newRealm = kerberosChallenge(proxyAuth, request, proxyServer, headers, realm, future); @@ -1169,7 +1191,10 @@ public Object call() throws Exception { newRealm = future.getRequest().getRealm(); } - nextRequest(builder.setHeaders(headers).setRealm(newRealm).build(), future); + Request req = builder.setHeaders(headers).setRealm(newRealm).build(); + future.setReuseChannel(true); + future.setConnectAllowed(true); + nextRequest(req, future); return; } @@ -1188,7 +1213,10 @@ public Object call() throws Exception { } catch (Throwable ex) { abort(future, ex); } - nextRequest(builder.build(), future); + Request req = builder.build(); + future.setReuseChannel(true); + future.setConnectAllowed(false); + nextRequest(req, future); return; } @@ -1388,6 +1416,39 @@ private Realm ntlmChallenge(List wwwAuth, return newRealm; } + private Realm ntlmProxyChallenge(List wwwAuth, + Request request, + ProxyServer proxyServer, + FluentCaseInsensitiveStringsMap headers, + Realm realm, + NettyResponseFuture future) throws NTLMEngineException { + future.getAndSetAuth(false); + headers.remove(HttpHeaders.Names.PROXY_AUTHORIZATION); + + if (wwwAuth.get(0).startsWith("NTLM ")) { + String serverChallenge = wwwAuth.get(0).trim().substring("NTLM ".length()); + String challengeHeader = ntlmEngine.generateType3Msg(proxyServer.getPrincipal(), + proxyServer.getPassword(), + proxyServer.getNtlmDomain(), + proxyServer.getHost(), + serverChallenge); + headers.add(HttpHeaders.Names.PROXY_AUTHORIZATION, "NTLM " + challengeHeader); + } + Realm newRealm; + Realm.RealmBuilder realmBuilder; + if (realm != null) { + realmBuilder = new Realm.RealmBuilder().clone(realm); + } else { + realmBuilder = new Realm.RealmBuilder(); + } + newRealm = realmBuilder//.setScheme(realm.getAuthScheme()) + .setUri(URI.create(request.getUrl()).getPath()) + .setMethodName(request.getMethod()) + .build(); + + return newRealm; + } + private void drainChannel(final ChannelHandlerContext ctx, final NettyResponseFuture future, final boolean keepAlive, final URI uri) { ctx.setAttachment(new AsyncCallable(future) { public Object call() throws Exception { diff --git a/src/main/java/com/ning/http/client/providers/netty/NettyResponseFuture.java b/src/main/java/com/ning/http/client/providers/netty/NettyResponseFuture.java index f743f47f30..44965a44e3 100755 --- a/src/main/java/com/ning/http/client/providers/netty/NettyResponseFuture.java +++ b/src/main/java/com/ning/http/client/providers/netty/NettyResponseFuture.java @@ -82,6 +82,7 @@ enum STATE { private boolean writeHeaders; private boolean writeBody; private final AtomicBoolean throwableCalled = new AtomicBoolean(false); + private boolean allowConnect = false; public NettyResponseFuture(URI uri, Request request, @@ -405,6 +406,18 @@ protected void attachChannel(Channel channel) { this.channel = channel; } + public void setReuseChannel(boolean reuseChannel) { + this.reuseChannel = reuseChannel; + } + + public boolean isConnectAllowed() { + return allowConnect; + } + + public void setConnectAllowed(boolean allowConnect) { + this.allowConnect = allowConnect; + } + protected void attachChannel(Channel channel, boolean reuseChannel) { this.channel = channel; this.reuseChannel = reuseChannel; @@ -429,7 +442,6 @@ public void setRequest(Request request) { this.request = request; } - /** * Return true if the {@link Future} cannot be recovered. There is some scenario where a connection can be * closed by an unexpected IOException, and in some situation we can recover from that exception. diff --git a/src/test/java/com/ning/http/client/async/grizzly/GrizzlyByteBufferCapacityTest.java b/src/test/java/com/ning/http/client/async/grizzly/GrizzlyByteBufferCapacityTest.java index b577e5123f..ce2bc93761 100644 --- a/src/test/java/com/ning/http/client/async/grizzly/GrizzlyByteBufferCapacityTest.java +++ b/src/test/java/com/ning/http/client/async/grizzly/GrizzlyByteBufferCapacityTest.java @@ -17,6 +17,7 @@ import com.ning.http.client.AsyncHttpClientConfig; import com.ning.http.client.async.ByteBufferCapacityTest; import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider; +import org.testng.annotations.Test; public class GrizzlyByteBufferCapacityTest extends ByteBufferCapacityTest { @@ -28,4 +29,7 @@ public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) { return new AsyncHttpClient(new GrizzlyAsyncHttpProvider(config), config); } + @Test(groups = {"standalone", "default_provider"}, enabled=false) + public void basicByteBufferTest() throws Throwable { + } }