diff --git a/pom.xml b/pom.xml
index 045026284d..8e49fe4a53 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,17 +9,17 @@
com.ning
async-http-client
Asynchronous Http Client
- 1.7.6-SNAPSHOT
+ 1.7.8-px-1
jar
Async Http Client library purpose is to allow Java applications to easily execute HTTP requests and
asynchronously process the HTTP responses.
- http://github.com/sonatype/async-http-client
+ http://github.com/AsyncHttpClient/async-http-client
- scm:git:git@github.com:sonatype/async-http-client.git
- https://github.com/sonatype/async-http-client
- scm:git:git@github.com:sonatype/async-http-client.git
+ scm:git:git@github.com:AsyncHttpClient/async-http-client.git
+ https://github.com/AsyncHttpClient/async-http-client
+ scm:git:git@github.com:AsyncHttpClient/async-http-client.git
jira
@@ -528,7 +528,7 @@
org.glassfish.grizzly
grizzly-websockets
- 2.2.10
+ 2.2.16
true
@@ -609,7 +609,7 @@
github
- gitsite:git@github.com/sonatype/async-http-client.git
+ gitsite:git@github.com/AsyncHttpClient/async-http-client.git
diff --git a/src/main/java/com/ning/http/client/RequestBuilderBase.java b/src/main/java/com/ning/http/client/RequestBuilderBase.java
index 9cc5ec40e0..7d4f843890 100644
--- a/src/main/java/com/ning/http/client/RequestBuilderBase.java
+++ b/src/main/java/com/ning/http/client/RequestBuilderBase.java
@@ -75,8 +75,9 @@ public RequestImpl(boolean useRawUrl) {
public RequestImpl(Request prototype) {
if (prototype != null) {
this.method = prototype.getMethod();
- int pos = prototype.getUrl().indexOf("?");
- this.url = pos > 0 ? prototype.getUrl().substring(0, pos) : prototype.getUrl();
+ String prototypeUrl = prototype.getUrl();
+ int pos = prototypeUrl.indexOf("?");
+ this.url = pos > 0 ? prototypeUrl.substring(0, pos) : prototypeUrl;
this.address = prototype.getInetAddress();
this.localAddress = prototype.getLocalAddress();
this.headers = new FluentCaseInsensitiveStringsMap(prototype.getHeaders());
@@ -94,7 +95,7 @@ public RequestImpl(Request prototype) {
this.proxyServer = prototype.getProxyServer();
this.realm = prototype.getRealm();
this.file = prototype.getFile();
- this.followRedirects = prototype.isRedirectOverrideSet()? prototype.isRedirectEnabled() : null;
+ this.followRedirects = prototype.isRedirectOverrideSet() ? prototype.isRedirectEnabled() : null;
this.perRequestConfig = prototype.getPerRequestConfig();
this.rangeOffset = prototype.getRangeOffset();
this.charset = prototype.getBodyEncoding();
@@ -125,7 +126,7 @@ public InetAddress getInetAddress() {
public InetAddress getLocalAddress() {
return localAddress;
}
-
+
private String toUrl(boolean encode) {
if (url == null) {
@@ -270,7 +271,7 @@ public boolean isRedirectEnabled() {
return (followRedirects != null && followRedirects);
}
- public boolean isRedirectOverrideSet(){
+ public boolean isRedirectOverrideSet() {
return followRedirects != null;
}
@@ -292,11 +293,23 @@ public String toString() {
sb.append("\t");
sb.append(method);
- for (String name : headers.keySet()) {
- sb.append("\t");
- sb.append(name);
- sb.append(":");
- sb.append(headers.getJoinedValue(name, ", "));
+ sb.append("\theaders:");
+ if (headers != null && !headers.isEmpty()) {
+ for (String name : headers.keySet()) {
+ sb.append("\t");
+ sb.append(name);
+ sb.append(":");
+ sb.append(headers.getJoinedValue(name, ", "));
+ }
+ }
+ if (params != null && !params.isEmpty()) {
+ sb.append("\tparams:");
+ for (String name : params.keySet()) {
+ sb.append("\t");
+ sb.append(name);
+ sb.append(":");
+ sb.append(params.getJoinedValue(name, ", "));
+ }
}
return sb.toString();
@@ -330,10 +343,10 @@ public T setUrl(String url) {
}
public T setInetAddress(InetAddress address) {
- request.address = address;
- return derived.cast(this);
+ request.address = address;
+ return derived.cast(this);
}
-
+
public T setLocalInetAddress(InetAddress address) {
request.localAddress = address;
return derived.cast(this);
diff --git a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java
index 3cfa92c9e3..a4ee140e75 100644
--- a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java
+++ b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java
@@ -51,7 +51,6 @@
import com.ning.http.util.AuthenticatorUtils;
import com.ning.http.util.ProxyUtils;
import com.ning.http.util.SslUtils;
-
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
@@ -77,13 +76,12 @@
import org.glassfish.grizzly.http.HttpResponsePacket;
import org.glassfish.grizzly.http.Method;
import org.glassfish.grizzly.http.Protocol;
-import org.glassfish.grizzly.impl.FutureImpl;
-import org.glassfish.grizzly.utils.Charsets;
import org.glassfish.grizzly.http.util.CookieSerializerUtils;
import org.glassfish.grizzly.http.util.DataChunk;
import org.glassfish.grizzly.http.util.Header;
import org.glassfish.grizzly.http.util.HttpStatus;
import org.glassfish.grizzly.http.util.MimeHeaders;
+import org.glassfish.grizzly.impl.FutureImpl;
import org.glassfish.grizzly.impl.SafeFutureImpl;
import org.glassfish.grizzly.memory.Buffers;
import org.glassfish.grizzly.memory.MemoryManager;
@@ -95,12 +93,14 @@
import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
import org.glassfish.grizzly.strategies.WorkerThreadIOStrategy;
import org.glassfish.grizzly.utils.BufferOutputStream;
+import org.glassfish.grizzly.utils.Charsets;
import org.glassfish.grizzly.utils.DelayedExecutor;
import org.glassfish.grizzly.utils.Futures;
import org.glassfish.grizzly.utils.IdleTimeoutFilter;
import org.glassfish.grizzly.websockets.DataFrame;
import org.glassfish.grizzly.websockets.DefaultWebSocket;
import org.glassfish.grizzly.websockets.HandShake;
+import org.glassfish.grizzly.websockets.HandshakeException;
import org.glassfish.grizzly.websockets.ProtocolHandler;
import org.glassfish.grizzly.websockets.Version;
import org.glassfish.grizzly.websockets.WebSocketEngine;
@@ -134,6 +134,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import static com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig.Property.MAX_HTTP_PACKET_HEADER_SIZE;
import static com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig.Property.TRANSPORT_CUSTOMIZER;
/**
@@ -147,7 +148,7 @@ public class GrizzlyAsyncHttpProvider implements AsyncHttpProvider {
private final static Logger LOGGER = LoggerFactory.getLogger(GrizzlyAsyncHttpProvider.class);
private static final boolean SEND_FILE_SUPPORT;
static {
- SEND_FILE_SUPPORT = configSendFileSupport();
+ SEND_FILE_SUPPORT = /*configSendFileSupport()*/ false;
}
private final Attribute REQUEST_STATE_ATTR =
Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(HttpTransactionContext.class.getName());
@@ -369,8 +370,12 @@ public void onTimeout(Connection connection) {
false);
final SwitchingSSLFilter filter = new SwitchingSSLFilter(configurator, defaultSecState);
fcb.add(filter);
+ final GrizzlyAsyncHttpProviderConfig providerConfig =
+ clientConfig.getAsyncHttpProviderConfig() instanceof GrizzlyAsyncHttpProviderConfig ?
+ (GrizzlyAsyncHttpProviderConfig) clientConfig.getAsyncHttpProviderConfig()
+ : new GrizzlyAsyncHttpProviderConfig();
final AsyncHttpClientEventFilter eventFilter = new
- AsyncHttpClientEventFilter(this);
+ AsyncHttpClientEventFilter(this, (Integer) providerConfig.getProperty(MAX_HTTP_PACKET_HEADER_SIZE));
final AsyncHttpClientFilter clientFilter =
new AsyncHttpClientFilter(clientConfig);
ContentEncoding[] encodings = eventFilter.getContentEncodings();
@@ -388,8 +393,6 @@ public void onTimeout(Connection connection) {
fcb.add(eventFilter);
fcb.add(clientFilter);
- GrizzlyAsyncHttpProviderConfig providerConfig =
- (GrizzlyAsyncHttpProviderConfig) clientConfig.getAsyncHttpProviderConfig();
if (providerConfig != null) {
final TransportCustomizer customizer = (TransportCustomizer)
providerConfig.getProperty(TRANSPORT_CUSTOMIZER);
@@ -619,6 +622,7 @@ final class HttpTransactionContext {
HandShake handshake;
ProtocolHandler protocolHandler;
WebSocket webSocket;
+ boolean establishingTunnel;
// -------------------------------------------------------- Constructors
@@ -677,6 +681,15 @@ void result(Object result) {
}
}
+ boolean isTunnelEstablished(final Connection c) {
+ return c.getAttributes().getAttribute("tunnel-established") != null;
+ }
+
+
+ void tunnelEstablished(final Connection c) {
+ c.getAttributes().setAttribute("tunnel-established", Boolean.TRUE);
+ }
+
} // END HttpTransactionContext
@@ -842,9 +855,12 @@ private boolean sendAsGrizzlyRequest(final Request request,
}
}
final ProxyServer proxy = getProxyServer(request);
- final boolean useProxy = (proxy != null);
+ boolean avoidProxy = ProxyUtils.avoidProxy(proxy, request);
+ final boolean useProxy = !(avoidProxy || proxy == null);
if (useProxy) {
- if (secure) {
+ if ((secure || httpCtx.isWSRequest) && !httpCtx.isTunnelEstablished(ctx.getConnection())) {
+ secure = false;
+ httpCtx.establishingTunnel = true;
builder.method(Method.CONNECT);
builder.uri(AsyncHttpProviderUtils.getAuthority(uri));
} else {
@@ -864,7 +880,7 @@ private boolean sendAsGrizzlyRequest(final Request request,
}
HttpRequestPacket requestPacket;
- if (httpCtx.isWSRequest) {
+ if (httpCtx.isWSRequest && !httpCtx.establishingTunnel) {
try {
final URI wsURI = new URI(httpCtx.wsRequestURI);
httpCtx.protocolHandler = Version.DRAFT17.createHandler(true);
@@ -877,7 +893,10 @@ private boolean sendAsGrizzlyRequest(final Request request,
} else {
requestPacket = builder.build();
}
- requestPacket.setSecure(true);
+ requestPacket.setSecure(secure);
+ if (secure) {
+ ctx.notifyDownstream(new SwitchingSSLFilter.SSLSwitchingEvent(true, ctx.getConnection()));
+ }
if (!useProxy && !httpCtx.isWSRequest) {
addQueryString(request, requestPacket);
}
@@ -885,16 +904,13 @@ private boolean sendAsGrizzlyRequest(final Request request,
addCookies(request, requestPacket);
if (useProxy) {
- boolean avoidProxy = ProxyUtils.avoidProxy(proxy, request);
- if (!avoidProxy) {
- if (!requestPacket.getHeaders().contains(Header.ProxyConnection)) {
- requestPacket.setHeader(Header.ProxyConnection, "keep-alive");
- }
+ if (!requestPacket.getHeaders().contains(Header.ProxyConnection)) {
+ requestPacket.setHeader(Header.ProxyConnection, "keep-alive");
+ }
- if (proxy.getPrincipal() != null) {
- requestPacket.setHeader(Header.ProxyAuthorization,
- AuthenticatorUtils.computeBasicAuthentication(proxy));
- }
+ if (proxy.getPrincipal() != null) {
+ requestPacket.setHeader(Header.ProxyAuthorization,
+ AuthenticatorUtils.computeBasicAuthentication(proxy));
}
}
final AsyncHandler h = httpCtx.handler;
@@ -1053,15 +1069,15 @@ private static final class AsyncHttpClientEventFilter extends HttpClientFilter {
private final GrizzlyAsyncHttpProvider provider;
-
// -------------------------------------------------------- Constructors
- AsyncHttpClientEventFilter(final GrizzlyAsyncHttpProvider provider) {
+ AsyncHttpClientEventFilter(final GrizzlyAsyncHttpProvider provider, int maxHerdersSizeProperty) {
+ super(maxHerdersSizeProperty);
this.provider = provider;
HANDLER_MAP.put(HttpStatus.UNAUTHORIZED_401.getStatusCode(),
- AuthorizationHandler.INSTANCE);
+ AuthorizationHandler.INSTANCE);
HANDLER_MAP.put(HttpStatus.MOVED_PERMANENTLY_301.getStatusCode(),
RedirectHandler.INSTANCE);
HANDLER_MAP.put(HttpStatus.FOUND_302.getStatusCode(),
@@ -1139,14 +1155,19 @@ protected void onInitialLineParsed(HttpHeader httpHeader,
if (httpHeader.isSkipRemainder()) {
return;
}
+ final Connection connection = ctx.getConnection();
final HttpTransactionContext context =
- provider.getHttpTransactionContext(ctx.getConnection());
+ provider.getHttpTransactionContext(connection);
final int status = ((HttpResponsePacket) httpHeader).getStatus();
+ if (context.establishingTunnel && HttpStatus.OK_200.statusMatches(status)) {
+ return;
+ }
if (HttpStatus.CONINTUE_100.statusMatches(status)) {
ctx.notifyUpstream(new ContinueEvent(context));
return;
}
+
if (context.statusHandler != null && !context.statusHandler.handlesStatus(status)) {
context.statusHandler = null;
context.invocationStatus = StatusHandler.InvocationStatus.CONTINUE;
@@ -1180,9 +1201,9 @@ protected void onInitialLineParsed(HttpHeader httpHeader,
}
}
final GrizzlyResponseStatus responseStatus =
- new GrizzlyResponseStatus((HttpResponsePacket) httpHeader,
- getURI(context.requestUrl),
- provider);
+ new GrizzlyResponseStatus((HttpResponsePacket) httpHeader,
+ getURI(context.requestUrl),
+ provider);
context.responseStatus = responseStatus;
if (context.statusHandler != null) {
return;
@@ -1193,6 +1214,10 @@ protected void onInitialLineParsed(HttpHeader httpHeader,
final AsyncHandler handler = context.handler;
if (handler != null) {
context.currentState = handler.onStatusReceived(responseStatus);
+ if (context.isWSRequest && context.currentState == AsyncHandler.STATE.ABORT) {
+ httpHeader.setSkipRemainder(true);
+ context.abort(new HandshakeException("Upgrade failed"));
+ }
}
} catch (Exception e) {
httpHeader.setSkipRemainder(true);
@@ -1221,24 +1246,23 @@ protected void onHttpHeadersParsed(HttpHeader httpHeader,
FilterChainContext ctx) {
super.onHttpHeadersParsed(httpHeader, ctx);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("RESPONSE: " + httpHeader.toString());
- }
+ LOGGER.debug("RESPONSE: {}", httpHeader);
if (httpHeader.containsHeader(Header.Connection)) {
if ("close".equals(httpHeader.getHeader(Header.Connection))) {
ConnectionManager.markConnectionAsDoNotCache(ctx.getConnection());
}
}
- if (httpHeader.isSkipRemainder()) {
- return;
- }
final HttpTransactionContext context =
provider.getHttpTransactionContext(ctx.getConnection());
+ if (httpHeader.isSkipRemainder() || context.establishingTunnel) {
+ return;
+ }
+
final AsyncHandler handler = context.handler;
final List filters = context.provider.clientConfig.getResponseFilters();
final GrizzlyResponseHeaders responseHeaders = new GrizzlyResponseHeaders((HttpResponsePacket) httpHeader,
- null,
- provider);
+ null,
+ provider);
if (!filters.isEmpty()) {
FilterContext fc = new FilterContext.FilterContextBuilder()
.asyncHandler(handler).request(context.request)
@@ -1260,16 +1284,16 @@ protected void onHttpHeadersParsed(HttpHeader httpHeader,
context.provider.connectionManager;
final Connection c =
m.obtainConnection(newRequest,
- context.future);
+ context.future);
final HttpTransactionContext newContext =
context.copy();
context.future = null;
provider.setHttpTransactionContext(c, newContext);
try {
context.provider.execute(c,
- newRequest,
- newHandler,
- context.future);
+ newRequest,
+ newHandler,
+ context.future);
} catch (IOException ioe) {
newContext.abort(ioe);
}
@@ -1281,8 +1305,8 @@ protected void onHttpHeadersParsed(HttpHeader httpHeader,
}
if (context.statusHandler != null && context.invocationStatus == StatusHandler.InvocationStatus.CONTINUE) {
final boolean result = context.statusHandler.handleStatus(((HttpResponsePacket) httpHeader),
- context,
- ctx);
+ context,
+ ctx);
if (!result) {
httpHeader.setSkipRemainder(true);
return;
@@ -1347,20 +1371,38 @@ protected boolean onHttpPacketParsed(HttpHeader httpHeader, FilterChainContext c
result = super.onHttpPacketParsed(httpHeader, ctx);
- final HttpTransactionContext context = cleanup(ctx, provider);
-
- final AsyncHandler handler = context.handler;
- if (handler != null) {
+ final HttpTransactionContext context = provider.getHttpTransactionContext(ctx.getConnection());
+ if (context.establishingTunnel
+ && HttpStatus.OK_200.statusMatches(
+ ((HttpResponsePacket) httpHeader).getStatus())) {
+ context.establishingTunnel = false;
+ final Connection c = ctx.getConnection();
+ context.tunnelEstablished(c);
try {
- context.result(handler.onCompleted());
- } catch (Exception e) {
+ context.provider.execute(c,
+ context.request,
+ context.handler,
+ context.future);
+ return result;
+ } catch (IOException e) {
context.abort(e);
+ return result;
}
} else {
- context.done(null);
- }
+ cleanup(ctx, provider);
+ final AsyncHandler handler = context.handler;
+ if (handler != null) {
+ try {
+ context.result(handler.onCompleted());
+ } catch (Exception e) {
+ context.abort(e);
+ }
+ } else {
+ context.done(null);
+ }
- return result;
+ return result;
+ }
}
@@ -1389,7 +1431,7 @@ private static HttpTransactionContext cleanup(final FilterChainContext ctx,
context.abort(new IOException("Maximum pooled connections exceeded"));
} else {
if (!context.provider.connectionManager.returnConnection(context.requestUrl, c)) {
- ctx.getConnection().close().markForRecycle(true);
+ ctx.getConnection().close();
}
}
diff --git a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProviderConfig.java b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProviderConfig.java
index 70b7425391..8751195bbd 100644
--- a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProviderConfig.java
+++ b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProviderConfig.java
@@ -14,6 +14,7 @@
package com.ning.http.client.providers.grizzly;
import com.ning.http.client.AsyncHttpProviderConfig;
+import org.glassfish.grizzly.http.HttpCodecFilter;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import java.util.HashMap;
@@ -49,7 +50,17 @@ public static enum Property {
*
* @see TransportCustomizer
*/
- TRANSPORT_CUSTOMIZER(TransportCustomizer.class);
+ TRANSPORT_CUSTOMIZER(TransportCustomizer.class),
+
+
+ /**
+ * Defines the maximum HTTP packet header size.
+ */
+ MAX_HTTP_PACKET_HEADER_SIZE(Integer.class, HttpCodecFilter.DEFAULT_MAX_HTTP_PACKET_HEADER_SIZE),
+
+
+
+ ;
final Object defaultValue;
diff --git a/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java b/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java
index da017b1679..a7595dbb17 100644
--- a/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java
+++ b/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java
@@ -151,6 +151,12 @@ public class NettyAsyncHttpProvider extends SimpleChannelUpstreamHandler impleme
private final AsyncHttpClientConfig config;
private final AtomicBoolean isClose = new AtomicBoolean(false);
private final ClientSocketChannelFactory socketChannelFactory;
+ private int httpClientCodecMaxInitialLineLength = 4096;
+ private int httpClientCodecMaxHeaderSize = 8192;
+ private int httpClientCodecMaxChunkSize = 8192;
+ private int httpsClientCodecMaxInitialLineLength = 4096;
+ private int httpsClientCodecMaxHeaderSize = 8192;
+ private int httpsClientCodecMaxChunkSize = 8192;
private final ChannelGroup openChannels = new
CleanupChannelGroup("asyncHttpClient") {
@@ -238,6 +244,8 @@ void configureNetty() {
for (Entry entry : asyncHttpProviderConfig.propertiesSet()) {
plainBootstrap.setOption(entry.getKey(), entry.getValue());
}
+ configureHttpClientCodec();
+ configureHttpsClientCodec();
}
plainBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -246,7 +254,7 @@ void configureNetty() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
- pipeline.addLast(HTTP_HANDLER, new HttpClientCodec());
+ pipeline.addLast(HTTP_HANDLER, createHttpClientCodec());
if (config.getRequestCompressionLevel() > 0) {
pipeline.addLast("deflater", new HttpContentCompressor(config.getRequestCompressionLevel()));
@@ -284,6 +292,42 @@ public ChannelPipeline getPipeline() throws Exception {
});
}
+ protected void configureHttpClientCodec() {
+ httpClientCodecMaxInitialLineLength = asyncHttpProviderConfig.getProperty(
+ NettyAsyncHttpProviderConfig.HTTP_CLIENT_CODEC_MAX_INITIAL_LINE_LENGTH,
+ Integer.class,
+ httpClientCodecMaxInitialLineLength
+ );
+ httpClientCodecMaxHeaderSize = asyncHttpProviderConfig.getProperty(
+ NettyAsyncHttpProviderConfig.HTTP_CLIENT_CODEC_MAX_HEADER_SIZE,
+ Integer.class,
+ httpClientCodecMaxHeaderSize
+ );
+ httpClientCodecMaxChunkSize = asyncHttpProviderConfig.getProperty(
+ NettyAsyncHttpProviderConfig.HTTP_CLIENT_CODEC_MAX_CHUNK_SIZE,
+ Integer.class,
+ httpClientCodecMaxChunkSize
+ );
+ }
+
+ protected void configureHttpsClientCodec() {
+ httpsClientCodecMaxInitialLineLength = asyncHttpProviderConfig.getProperty(
+ NettyAsyncHttpProviderConfig.HTTPS_CLIENT_CODEC_MAX_INITIAL_LINE_LENGTH,
+ Integer.class,
+ httpsClientCodecMaxInitialLineLength
+ );
+ httpsClientCodecMaxHeaderSize = asyncHttpProviderConfig.getProperty(
+ NettyAsyncHttpProviderConfig.HTTPS_CLIENT_CODEC_MAX_HEADER_SIZE,
+ Integer.class,
+ httpsClientCodecMaxHeaderSize
+ );
+ httpsClientCodecMaxChunkSize = asyncHttpProviderConfig.getProperty(
+ NettyAsyncHttpProviderConfig.HTTPS_CLIENT_CODEC_MAX_CHUNK_SIZE,
+ Integer.class,
+ httpsClientCodecMaxChunkSize
+ );
+ }
+
void constructSSLPipeline(final NettyConnectListener> cl) {
secureBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -298,7 +342,7 @@ public ChannelPipeline getPipeline() throws Exception {
abort(cl.future(), ex);
}
- pipeline.addLast(HTTP_HANDLER, new HttpClientCodec());
+ pipeline.addLast(HTTP_HANDLER, createHttpsClientCodec());
if (config.isCompressionEnabled()) {
pipeline.addLast("inflater", new HttpContentDecompressor());
@@ -363,6 +407,14 @@ private SSLEngine createSSLEngine() throws IOException, GeneralSecurityException
return sslEngine;
}
+ private HttpClientCodec createHttpClientCodec() {
+ return new HttpClientCodec(httpClientCodecMaxInitialLineLength, httpClientCodecMaxHeaderSize, httpClientCodecMaxChunkSize);
+ }
+
+ private HttpClientCodec createHttpsClientCodec() {
+ return new HttpClientCodec(httpsClientCodecMaxInitialLineLength, httpsClientCodecMaxHeaderSize, httpsClientCodecMaxChunkSize);
+ }
+
private Channel verifyChannelPipeline(Channel channel, String scheme) throws IOException, GeneralSecurityException {
if (channel.getPipeline().get(SSL_HANDLER) != null && HTTP.equalsIgnoreCase(scheme)) {
@@ -526,7 +578,15 @@ public void operationComplete(ChannelFuture cf) {
}
private static boolean isProxyServer(AsyncHttpClientConfig config, Request request) {
- return request.getProxyServer() != null || config.getProxyServer() != null;
+ ProxyServer proxyServer = request.getProxyServer();
+ if (proxyServer == null) {
+ proxyServer = config.getProxyServer();
+ }
+ if (proxyServer == null) {
+ return false;
+ } else {
+ return !ProxyUtils.avoidProxy(proxyServer, request);
+ }
}
protected final static HttpRequest buildRequest(AsyncHttpClientConfig config, Request request, URI uri,
@@ -547,7 +607,6 @@ private static HttpRequest construct(AsyncHttpClientConfig config,
ChannelBuffer buffer) throws IOException {
String host = AsyncHttpProviderUtils.getHost(uri);
- boolean webSocket = isWebSocket(uri);
if (request.getVirtualHost() != null) {
host = request.getVirtualHost();
@@ -568,12 +627,11 @@ private static HttpRequest construct(AsyncHttpClientConfig config,
}
nettyRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, m, path.toString());
}
-
+ boolean webSocket = isWebSocket(uri);
if (webSocket) {
nettyRequest.addHeader(HttpHeaders.Names.UPGRADE, HttpHeaders.Values.WEBSOCKET);
nettyRequest.addHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE);
- nettyRequest.addHeader("Origin", "http://" + uri.getHost() + ":"
- + (uri.getPort() == -1 ? isSecure(uri.getScheme()) ? 443 : 80 : uri.getPort()));
+ nettyRequest.addHeader("Origin", "http://" + uri.getHost() + ":" + uri.getPort());
nettyRequest.addHeader(WEBSOCKET_KEY, WebSocketUtil.getKey());
nettyRequest.addHeader("Sec-WebSocket-Version", "13");
}
@@ -901,7 +959,10 @@ private ListenableFuture doConnect(final Request request, final AsyncHand
bufferedBytes = f.getNettyRequest().getContent();
}
- boolean useSSl = isSecure(uri) && proxyServer == null;
+ boolean avoidProxy = ProxyUtils.avoidProxy(proxyServer, uri.getHost());
+ boolean useProxy = !(avoidProxy || proxyServer == null);
+
+ boolean useSSl = isSecure(uri) && !useProxy;
if (channel != null && channel.isOpen() && channel.isConnected()) {
HttpRequest nettyRequest = buildRequest(config, request, uri, f == null ? false : f.isConnectAllowed(), bufferedBytes);
@@ -968,7 +1029,6 @@ private ListenableFuture doConnect(final Request request, final AsyncHand
}
NettyConnectListener c = new NettyConnectListener.Builder(config, request, asyncHandler, f, this, bufferedBytes).build(uri);
- boolean avoidProxy = ProxyUtils.avoidProxy(proxyServer, uri.getHost());
if (useSSl) {
constructSSLPipeline(c);
@@ -987,7 +1047,7 @@ private ListenableFuture doConnect(final Request request, final AsyncHand
InetSocketAddress remoteAddress;
if (request.getInetAddress() != null) {
remoteAddress = new InetSocketAddress(request.getInetAddress(), AsyncHttpProviderUtils.getPort(uri));
- } else if (proxyServer == null || avoidProxy) {
+ } else if (!useProxy) {
remoteAddress = new InetSocketAddress(AsyncHttpProviderUtils.getHost(uri), AsyncHttpProviderUtils.getPort(uri));
} else {
remoteAddress = new InetSocketAddress(proxyServer.getHost(), proxyServer.getPort());
@@ -2227,7 +2287,7 @@ public Object call() throws Exception {
if (!future.getAndSetStatusReceived(true) && updateStatusAndInterrupt(handler, status)) {
finishUpdate(future, ctx, response.isChunked());
return;
- } else if (updateHeadersAndInterrupt(handler, responseHeaders)) {
+ } else if (response.getHeaders().size() > 0 && updateHeadersAndInterrupt(handler, responseHeaders)) {
finishUpdate(future, ctx, response.isChunked());
return;
} else if (!response.isChunked()) {
@@ -2346,16 +2406,11 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
s = new ResponseStatus(future.getURI(), response, NettyAsyncHttpProvider.this);
final boolean statusReceived = h.onStatusReceived(s) == STATE.UPGRADE;
- if (!statusReceived) {
- h.onClose(new NettyWebSocket(ctx.getChannel()), 1002, "Bad response status " + response.getStatus().getCode());
- future.done(null);
+ if (!validStatus || !validUpgrade || !validConnection || !statusReceived) {
+ abort(future, new IOException("Invalid handshake response"));
return;
}
- if (!validStatus || !validUpgrade || !validConnection) {
- throw new IOException("Invalid handshake response");
- }
-
String accept = response.getHeader("Sec-WebSocket-Accept");
String key = WebSocketUtil.getAcceptKey(future.getNettyRequest().getHeader(WEBSOCKET_KEY));
if (accept == null || !accept.equals(key)) {
@@ -2463,4 +2518,3 @@ private static boolean isSecure(URI uri) {
return isSecure(uri.getScheme());
}
}
-
diff --git a/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProviderConfig.java b/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProviderConfig.java
index 7c976149e6..c471faa4f8 100644
--- a/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProviderConfig.java
+++ b/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProviderConfig.java
@@ -57,6 +57,20 @@ public class NettyAsyncHttpProviderConfig implements AsyncHttpProviderConfig properties = new ConcurrentHashMap();
public NettyAsyncHttpProviderConfig() {
@@ -85,6 +99,20 @@ public Object getProperty(String name) {
return properties.get(name);
}
+ /**
+ * Return the value associated with the property's name
+ *
+ * @param name
+ * @return this instance of AsyncHttpProviderConfig
+ */
+ public T getProperty(String name, Class type, T defaultValue) {
+ Object value = properties.get(name);
+ if (value != null && type.isAssignableFrom(value.getClass())) {
+ return type.cast(value);
+ }
+ return defaultValue;
+ }
+
/**
* Remove the value associated with the property's name
*
@@ -103,4 +131,4 @@ public Object removeProperty(String name) {
public Set> propertiesSet() {
return properties.entrySet();
}
-}
+}
\ No newline at end of file
diff --git a/src/main/java/com/ning/http/client/websocket/WebSocketUpgradeHandler.java b/src/main/java/com/ning/http/client/websocket/WebSocketUpgradeHandler.java
index 5483720913..64bf8b32e1 100644
--- a/src/main/java/com/ning/http/client/websocket/WebSocketUpgradeHandler.java
+++ b/src/main/java/com/ning/http/client/websocket/WebSocketUpgradeHandler.java
@@ -33,7 +33,7 @@ public class WebSocketUpgradeHandler implements UpgradeHandler, Async
private final long maxTextSize;
private final AtomicBoolean ok = new AtomicBoolean(false);
- private WebSocketUpgradeHandler(Builder b) {
+ protected WebSocketUpgradeHandler(Builder b) {
l = b.l;
protocol = b.protocol;
maxByteSize = b.maxByteSize;
@@ -44,7 +44,7 @@ private WebSocketUpgradeHandler(Builder b) {
* {@inheritDoc}
*/
@Override
- public final void onThrowable(Throwable t) {
+ public void onThrowable(Throwable t) {
onFailure(t);
}
@@ -52,7 +52,7 @@ public final void onThrowable(Throwable t) {
* {@inheritDoc}
*/
@Override
- public final STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
+ public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
return STATE.CONTINUE;
}
@@ -60,7 +60,7 @@ public final STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exce
* {@inheritDoc}
*/
@Override
- public final STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
+ public STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
if (responseStatus.getStatusCode() == 101) {
return STATE.UPGRADE;
} else {
@@ -72,7 +72,7 @@ public final STATE onStatusReceived(HttpResponseStatus responseStatus) throws Ex
* {@inheritDoc}
*/
@Override
- public final STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
+ public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
return STATE.CONTINUE;
}
@@ -80,7 +80,7 @@ public final STATE onHeadersReceived(HttpResponseHeaders headers) throws Excepti
* {@inheritDoc}
*/
@Override
- public final WebSocket onCompleted() throws Exception {
+ public WebSocket onCompleted() throws Exception {
if (webSocket == null) {
throw new IllegalStateException("WebSocket is null");
}
@@ -91,7 +91,7 @@ public final WebSocket onCompleted() throws Exception {
* {@inheritDoc}
*/
@Override
- public final void onSuccess(WebSocket webSocket) {
+ public void onSuccess(WebSocket webSocket) {
this.webSocket = webSocket;
for (WebSocketListener w : l) {
webSocket.addWebSocketListener(w);
@@ -104,7 +104,7 @@ public final void onSuccess(WebSocket webSocket) {
* {@inheritDoc}
*/
@Override
- public final void onFailure(Throwable t) {
+ public void onFailure(Throwable t) {
for (WebSocketListener w : l) {
if (!ok.get() && webSocket != null) {
webSocket.addWebSocketListener(w);
@@ -113,7 +113,7 @@ public final void onFailure(Throwable t) {
}
}
- public final void onClose(WebSocket webSocket, int status, String reasonPhrase) {
+ public void onClose(WebSocket webSocket, int status, String reasonPhrase) {
// Connect failure
if (this.webSocket == null) this.webSocket = webSocket;
diff --git a/src/main/java/com/ning/http/multipart/MultipartBody.java b/src/main/java/com/ning/http/multipart/MultipartBody.java
index 88ee5da2ea..c3004e469f 100644
--- a/src/main/java/com/ning/http/multipart/MultipartBody.java
+++ b/src/main/java/com/ning/http/multipart/MultipartBody.java
@@ -80,7 +80,7 @@ public long read(ByteBuffer buffer) throws IOException {
int maxLength = buffer.capacity();
if (startPart == parts.size() && endWritten) {
- return overallLength;
+ return -1;
}
boolean full = false;
@@ -437,9 +437,9 @@ private ByteArrayOutputStream generateFileStart(FilePart filePart)
private long handleFilePart(WritableByteChannel target, FilePart filePart) throws IOException {
FilePartStallHandler handler = new FilePartStallHandler(
filePart.getStalledTime(), filePart);
-
+
handler.start();
-
+
if (FilePartSource.class.isAssignableFrom(filePart.getSource().getClass())) {
int length = 0;
@@ -464,7 +464,7 @@ private long handleFilePart(WritableByteChannel target, FilePart filePart) throw
}
try {
nWrite = fc.transferTo(fileLength, l, target);
-
+
if (nWrite == 0) {
logger.info("Waiting for writing...");
try {
@@ -496,7 +496,7 @@ private long handleFilePart(WritableByteChannel target, FilePart filePart) throw
}
}
handler.completed();
-
+
fc.close();
length += handleFileEnd(target, filePart);
@@ -556,7 +556,7 @@ private long handleMultiPart(WritableByteChannel target, Part currentPart) throw
return handleStringPart(target, (StringPart) currentPart);
} else if (currentPart.getClass().equals(FilePart.class)) {
FilePart filePart = (FilePart) currentPart;
-
+
return handleFilePart(target, filePart);
}
return 0;
diff --git a/src/main/java/com/ning/http/util/AsyncHttpProviderUtils.java b/src/main/java/com/ning/http/util/AsyncHttpProviderUtils.java
index 5395998df8..718dee9f35 100644
--- a/src/main/java/com/ning/http/util/AsyncHttpProviderUtils.java
+++ b/src/main/java/com/ning/http/util/AsyncHttpProviderUtils.java
@@ -448,7 +448,7 @@ public static String parseCharset(String contentType) {
public static Cookie parseCookie(String value) {
String[] fields = value.split(";\\s*");
- String[] cookie = fields[0].split("=");
+ String[] cookie = fields[0].split("=", 2);
String cookieName = cookie[0];
String cookieValue = (cookie.length == 1) ? null : cookie[1];
@@ -500,12 +500,14 @@ public static Cookie parseCookie(String value) {
return new Cookie(domain, cookieName, cookieValue, path, maxAge, secure);
}
- private static int convertExpireField(String timestring) throws Exception {
+ public static int convertExpireField(String timestring) throws Exception {
Exception exception = null;
+ String trimmedTimeString = removeQuote(timestring.trim());
+ long now = System.currentTimeMillis();
for (SimpleDateFormat sdf : simpleDateFormat.get()) {
try {
- long expire = sdf.parse(removeQuote(timestring.trim())).getTime();
- return (int) ((expire - System.currentTimeMillis()) / 1000);
+ long expire = sdf.parse(trimmedTimeString).getTime();
+ return (int) ((expire - now) / 1000);
} catch (ParseException e) {
exception = e;
} catch (NumberFormatException e) {
diff --git a/src/test/java/com/ning/http/client/async/ProxyyTunnellingTest.java b/src/test/java/com/ning/http/client/async/ProxyyTunnellingTest.java
index f1dd8a631b..9d5bb3ed28 100644
--- a/src/test/java/com/ning/http/client/async/ProxyyTunnellingTest.java
+++ b/src/test/java/com/ning/http/client/async/ProxyyTunnellingTest.java
@@ -28,11 +28,13 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.testng.Assert.assertEquals;
@@ -111,7 +113,7 @@ public Response onCompleted(Response response) throws Exception {
});
Response r = responseFuture.get();
assertEquals(r.getStatusCode(), 200);
- assertEquals(r.getHeader("server"), "Jetty(8.1.1.v20120215)");
+ assertEquals(r.getHeader("X-Proxy-Connection"), "keep-alive");
asyncHttpClient.close();
}
@@ -142,7 +144,7 @@ public Response onCompleted(Response response) throws Exception {
});
Response r = responseFuture.get();
assertEquals(r.getStatusCode(), 200);
- assertEquals(r.getHeader("server"), "Jetty(8.1.1.v20120215)");
+ assertEquals(r.getHeader("X-Proxy-Connection"), "keep-alive");
asyncHttpClient.close();
}
@@ -162,7 +164,22 @@ public void testSimpleAHCConfigProxy() throws IOException, InterruptedException,
Response r = client.get().get();
assertEquals(r.getStatusCode(), 200);
- assertEquals(r.getHeader("server"), "Jetty(8.1.1.v20120215)");
+ assertEquals(r.getHeader("X-Proxy-Connection"), "keep-alive");
+
+ client.close();
+ }
+
+ @Test(groups = { "standalone", "default_provider" })
+ public void testNonProxyHostsSsl() throws IOException, ExecutionException, TimeoutException, InterruptedException {
+ AsyncHttpClient client = getAsyncHttpClient(null);
+
+ Response resp = client.prepareGet(getTargetUrl2()).setProxyServer(new ProxyServer("127.0.0.1", port1 - 1)
+ .addNonProxyHost("127.0.0.1"))
+ .execute().get(3, TimeUnit.SECONDS);
+
+ assertNotNull(resp);
+ assertEquals(resp.getStatusCode(), HttpServletResponse.SC_OK);
+ assertEquals(resp.getHeader("X-pathInfo"), "/foo/test");
client.close();
}