diff --git a/client/pom.xml b/client/pom.xml
index ee1eb829f9..2ae8bfc91b 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -43,6 +43,14 @@
netty-transport-native-epoll
linux-x86_64
+
+ io.netty
+ netty-codec-socks
+
+
+ io.netty
+ netty-handler-proxy
+
io.netty
netty-resolver-dns
diff --git a/client/src/main/java/org/asynchttpclient/channel/ChannelPoolPartitioning.java b/client/src/main/java/org/asynchttpclient/channel/ChannelPoolPartitioning.java
index 4cce3c6361..38d704c7fc 100644
--- a/client/src/main/java/org/asynchttpclient/channel/ChannelPoolPartitioning.java
+++ b/client/src/main/java/org/asynchttpclient/channel/ChannelPoolPartitioning.java
@@ -13,6 +13,7 @@
package org.asynchttpclient.channel;
import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyType;
import org.asynchttpclient.uri.Uri;
import org.asynchttpclient.util.HttpUtils;
@@ -23,12 +24,14 @@ class ProxyPartitionKey {
private final int proxyPort;
private final boolean secured;
private final String targetHostBaseUrl;
+ private final ProxyType proxyType;
- public ProxyPartitionKey(String proxyHost, int proxyPort, boolean secured, String targetHostBaseUrl) {
+ public ProxyPartitionKey(String proxyHost, int proxyPort, boolean secured, String targetHostBaseUrl, ProxyType proxyType) {
this.proxyHost = proxyHost;
this.proxyPort = proxyPort;
this.secured = secured;
this.targetHostBaseUrl = targetHostBaseUrl;
+ this.proxyType = proxyType;
}
@Override
@@ -37,6 +40,7 @@ public int hashCode() {
int result = 1;
result = prime * result + ((proxyHost == null) ? 0 : proxyHost.hashCode());
result = prime * result + proxyPort;
+ result = prime * result + proxyType.hashCode();
result = prime * result + (secured ? 1231 : 1237);
result = prime * result + ((targetHostBaseUrl == null) ? 0 : targetHostBaseUrl.hashCode());
return result;
@@ -60,6 +64,8 @@ public boolean equals(Object obj) {
return false;
if (secured != other.secured)
return false;
+ if (proxyType != other.proxyType)
+ return false;
if (targetHostBaseUrl == null) {
if (other.targetHostBaseUrl != null)
return false;
@@ -70,12 +76,13 @@ public boolean equals(Object obj) {
@Override
public String toString() {
- return new StringBuilder()//
- .append("ProxyPartitionKey(proxyHost=").append(proxyHost)//
- .append(", proxyPort=").append(proxyPort)//
- .append(", secured=").append(secured)//
- .append(", targetHostBaseUrl=").append(targetHostBaseUrl)//
- .toString();
+ return //
+ "ProxyPartitionKey(proxyHost=" + proxyHost +//
+ ", proxyPort=" + proxyPort +//
+ ", secured=" + secured +//
+ ", targetHostBaseUrl=" + targetHostBaseUrl +//
+ ", proxyType=" + proxyType//
+ ;
}
}
@@ -89,8 +96,8 @@ public Object getPartitionKey(Uri uri, String virtualHost, ProxyServer proxyServ
String targetHostBaseUrl = virtualHost != null ? virtualHost : HttpUtils.getBaseUrl(uri);
if (proxyServer != null) {
return uri.isSecured() ? //
- new ProxyPartitionKey(proxyServer.getHost(), proxyServer.getSecuredPort(), true, targetHostBaseUrl)
- : new ProxyPartitionKey(proxyServer.getHost(), proxyServer.getPort(), false, targetHostBaseUrl);
+ new ProxyPartitionKey(proxyServer.getHost(), proxyServer.getSecuredPort(), true, targetHostBaseUrl, proxyServer.getProxyType())
+ : new ProxyPartitionKey(proxyServer.getHost(), proxyServer.getPort(), false, targetHostBaseUrl, proxyServer.getProxyType());
} else {
return targetHostBaseUrl;
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java b/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java
index ff45fb681b..4261216420 100755
--- a/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java
+++ b/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java
@@ -220,7 +220,7 @@ public String toString() {
sb.append("\t\t").append(header.getKey()).append(": ").append(header.getValue()).append("\n");
}
sb.append("\tbody=\n").append(getResponseBody()).append("\n")//
- .append("}").toString();
+ .append("}");
return sb.toString();
}
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java
index 0a6b0caad8..6b652579b4 100755
--- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java
+++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java
@@ -17,12 +17,7 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFactory;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
+import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -34,13 +29,16 @@
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.proxy.Socks4ProxyHandler;
+import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.Timer;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.*;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ThreadFactory;
@@ -51,11 +49,7 @@
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
-import org.asynchttpclient.AsyncHandler;
-import org.asynchttpclient.AsyncHttpClientConfig;
-import org.asynchttpclient.ClientStats;
-import org.asynchttpclient.HostStats;
-import org.asynchttpclient.SslEngineFactory;
+import org.asynchttpclient.*;
import org.asynchttpclient.channel.ChannelPool;
import org.asynchttpclient.channel.ChannelPoolPartitioning;
import org.asynchttpclient.channel.NoopChannelPool;
@@ -68,6 +62,7 @@
import org.asynchttpclient.netty.request.NettyRequestSender;
import org.asynchttpclient.netty.ssl.DefaultSslEngineFactory;
import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyType;
import org.asynchttpclient.uri.Uri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,6 +73,7 @@ public class ChannelManager {
public static final String PINNED_ENTRY = "entry";
public static final String HTTP_CLIENT_CODEC = "http";
public static final String SSL_HANDLER = "ssl";
+ public static final String SOCKS_HANDLER = "socks";
public static final String DEFLATER_HANDLER = "deflater";
public static final String INFLATER_HANDLER = "inflater";
public static final String CHUNKED_WRITER_HANDLER = "chunked-writer";
@@ -391,8 +387,49 @@ public SslHandler addSslHandler(ChannelPipeline pipeline, Uri uri, String virtua
return sslHandler;
}
- public Bootstrap getBootstrap(Uri uri, ProxyServer proxy) {
- return uri.isWebSocket() && proxy == null ? wsBootstrap : httpBootstrap;
+ public Promise getBootstrap(Request request, ProxyServer proxy) {
+ final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise();
+
+ if (request.getUri().isWebSocket() && proxy == null) {
+ promise.setSuccess(wsBootstrap);
+ } else {
+ if (proxy != null && proxy.getProxyType().isSocks()) {
+ request.getNameResolver().resolve(proxy.getHost()).addListener((Future proxyDnsFuture) -> {
+ if (proxyDnsFuture.isSuccess()) {
+ Bootstrap bootstrap = httpBootstrap.clone();
+ ChannelHandler handler = bootstrap.config().handler();
+
+ bootstrap.handler(new ChannelInitializer() {
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ handler.handlerAdded(ctx);
+ super.handlerAdded(ctx);
+ }
+
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ InetSocketAddress proxyAddress = new InetSocketAddress(proxyDnsFuture.get(), proxy.getPort());
+ if (proxy.getProxyType() == ProxyType.SOCKS_V4) {
+ channel.pipeline().addFirst(ChannelManager.SOCKS_HANDLER, new Socks4ProxyHandler(proxyAddress));
+ } else if (proxy.getProxyType() == ProxyType.SOCKS_V5) {
+ channel.pipeline().addFirst(ChannelManager.SOCKS_HANDLER, new Socks5ProxyHandler(proxyAddress));
+ } else {
+ throw new IllegalArgumentException("Only SOCKS4 and SOCKS5 supported at the moment.");
+ }
+ }
+ });
+
+ promise.setSuccess(bootstrap);
+ } else {
+ promise.setFailure(proxyDnsFuture.cause());
+ }
+ });
+ } else {
+ promise.setSuccess(httpBootstrap);
+ }
+ }
+
+ return promise;
}
public void upgradePipelineForWebSockets(ChannelPipeline pipeline) {
diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java
index 5140535cea..50e20ade36 100755
--- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java
+++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java
@@ -17,24 +17,36 @@
import static org.asynchttpclient.util.HttpUtils.getBaseUrl;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.proxy.Socks4ProxyHandler;
+import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslHandler;
import java.net.ConnectException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.asynchttpclient.Request;
+import org.asynchttpclient.RequestBuilderBase;
import org.asynchttpclient.handler.AsyncHandlerExtensions;
import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.SimpleFutureListener;
import org.asynchttpclient.netty.future.StackTraceInspector;
import org.asynchttpclient.netty.request.NettyRequestSender;
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
+import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyType;
import org.asynchttpclient.uri.Uri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLException;
+
/**
* Non Blocking connect.
*/
@@ -94,12 +106,7 @@ public void onSuccess(Channel channel, InetSocketAddress remoteAddress) {
Object partitionKeyLock = future.takePartitionKeyLock();
if (partitionKeyLock != null) {
- channel.closeFuture().addListener(new GenericFutureListener>() {
- @Override
- public void operationComplete(Future super Void> future) throws Exception {
- connectionSemaphore.releaseChannelLock(partitionKeyLock);
- }
- });
+ channel.closeFuture().addListener(future -> connectionSemaphore.releaseChannelLock(partitionKeyLock));
}
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestFactory.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestFactory.java
index a26f4fc063..58251febdd 100755
--- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestFactory.java
+++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestFactory.java
@@ -45,6 +45,7 @@
import org.asynchttpclient.netty.request.body.NettyMultipartBody;
import org.asynchttpclient.netty.request.body.NettyReactiveStreamsBody;
import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyType;
import org.asynchttpclient.request.body.generator.FileBodyGenerator;
import org.asynchttpclient.request.body.generator.InputStreamBodyGenerator;
import org.asynchttpclient.request.body.generator.ReactiveStreamsBodyGenerator;
@@ -237,7 +238,7 @@ private String requestUri(Uri uri, ProxyServer proxyServer, boolean connect) {
// proxy tunnelling, connect need host and explicit port
return getAuthority(uri);
- } else if (proxyServer != null && !uri.isSecured()) {
+ } else if (proxyServer != null && !uri.isSecured() && proxyServer.getProxyType() == ProxyType.HTTP) {
// proxy over HTTP, need full url
return uri.toUrl();
diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java
index 53e07c1ec8..31b8e4d46b 100755
--- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java
+++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java
@@ -13,41 +13,18 @@
*/
package org.asynchttpclient.netty.request;
-import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT;
-import static java.util.Collections.singletonList;
-import static org.asynchttpclient.handler.AsyncHandlerExtensionsUtils.toAsyncHandlerExtensions;
-import static org.asynchttpclient.util.Assertions.assertNotNull;
-import static org.asynchttpclient.util.AuthenticatorUtils.*;
-import static org.asynchttpclient.util.HttpConstants.Methods.*;
-import static org.asynchttpclient.util.MiscUtils.getCause;
-import static org.asynchttpclient.util.ProxyUtils.getProxyServer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http.DefaultHttpHeaders;
-import io.netty.handler.codec.http.HttpHeaderValues;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.*;
import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-
-import org.asynchttpclient.AsyncHandler;
-import org.asynchttpclient.AsyncHttpClientConfig;
-import org.asynchttpclient.AsyncHttpClientState;
-import org.asynchttpclient.ListenableFuture;
-import org.asynchttpclient.Realm;
+import org.asynchttpclient.*;
import org.asynchttpclient.Realm.AuthScheme;
-import org.asynchttpclient.Request;
import org.asynchttpclient.exception.PoolAlreadyClosedException;
import org.asynchttpclient.exception.RemotelyClosedException;
import org.asynchttpclient.filter.FilterContext;
@@ -58,11 +35,7 @@
import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.OnLastHttpContentCallback;
import org.asynchttpclient.netty.SimpleFutureListener;
-import org.asynchttpclient.netty.channel.ChannelManager;
-import org.asynchttpclient.netty.channel.ChannelState;
-import org.asynchttpclient.netty.channel.Channels;
-import org.asynchttpclient.netty.channel.ConnectionSemaphore;
-import org.asynchttpclient.netty.channel.NettyConnectListener;
+import org.asynchttpclient.netty.channel.*;
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
import org.asynchttpclient.proxy.ProxyServer;
import org.asynchttpclient.resolver.RequestHostnameResolver;
@@ -71,6 +44,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+
+import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT;
+import static java.util.Collections.singletonList;
+import static org.asynchttpclient.handler.AsyncHandlerExtensionsUtils.toAsyncHandlerExtensions;
+import static org.asynchttpclient.util.Assertions.assertNotNull;
+import static org.asynchttpclient.util.AuthenticatorUtils.perConnectionAuthorizationHeader;
+import static org.asynchttpclient.util.AuthenticatorUtils.perConnectionProxyAuthorizationHeader;
+import static org.asynchttpclient.util.HttpConstants.Methods.CONNECT;
+import static org.asynchttpclient.util.HttpConstants.Methods.GET;
+import static org.asynchttpclient.util.MiscUtils.getCause;
+import static org.asynchttpclient.util.ProxyUtils.getProxyServer;
+
public final class NettyRequestSender {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyRequestSender.class);
@@ -108,7 +97,8 @@ public ListenableFuture sendRequest(final Request request,//
ProxyServer proxyServer = getProxyServer(config, request);
// websockets use connect tunnelling to work with proxies
- if (proxyServer != null && (request.getUri().isSecured() || request.getUri().isWebSocket()) && !isConnectDone(request, future))
+ if (proxyServer != null && (request.getUri().isSecured() || request.getUri().isWebSocket()) && !isConnectDone(request, future) //
+ && !proxyServer.getProxyType().isSocks())
if (future != null && future.isConnectAllowed())
// SSL proxy or websocket: CONNECT for sure
return sendRequestWithCertainForceConnect(request, asyncHandler, future, performingNextRequest, proxyServer, true);
@@ -282,10 +272,6 @@ private ListenableFuture sendRequestWithNewChannel(//
future.setInAuth(realm != null && realm.isUsePreemptiveAuth() && realm.getScheme() != AuthScheme.NTLM);
future.setInProxyAuth(proxyRealm != null && proxyRealm.isUsePreemptiveAuth() && proxyRealm.getScheme() != AuthScheme.NTLM);
- // Do not throw an exception when we need an extra connection for a redirect
- // FIXME why? This violate the max connection per host handling, right?
- Bootstrap bootstrap = channelManager.getBootstrap(request.getUri(), proxy);
-
Object partitionKey = future.getPartitionKey();
try {
@@ -310,7 +296,15 @@ protected void onSuccess(List addresses) {
NettyConnectListener connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, connectionSemaphore, partitionKey);
NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, clientState, config);
if (!future.isDone()) {
- connector.connect(bootstrap, connectListener);
+ // Do not throw an exception when we need an extra connection for a redirect
+ // FIXME why? This violate the max connection per host handling, right?
+ channelManager.getBootstrap(request, proxy).addListener((Future bootstrapFuture) -> {
+ if (bootstrapFuture.isSuccess()) {
+ connector.connect(bootstrapFuture.get(), connectListener);
+ } else {
+ abort(null, future, bootstrapFuture.cause());
+ }
+ });
}
}
@@ -332,7 +326,7 @@ private Future> resolveAddresses(
Uri uri = request.getUri();
final Promise> promise = ImmediateEventExecutor.INSTANCE.newPromise();
- if (proxy != null && !proxy.isIgnoredForHost(uri.getHost())) {
+ if (proxy != null && !proxy.isIgnoredForHost(uri.getHost()) && !proxy.getProxyType().isSocks()) {
int port = uri.isSecured() ? proxy.getSecuredPort() : proxy.getPort();
InetSocketAddress unresolvedRemoteAddress = InetSocketAddress.createUnresolved(proxy.getHost(), port);
scheduleRequestTimeout(future, unresolvedRemoteAddress);
diff --git a/client/src/main/java/org/asynchttpclient/proxy/ProxyServer.java b/client/src/main/java/org/asynchttpclient/proxy/ProxyServer.java
index 62e5a53932..6c0326652f 100644
--- a/client/src/main/java/org/asynchttpclient/proxy/ProxyServer.java
+++ b/client/src/main/java/org/asynchttpclient/proxy/ProxyServer.java
@@ -35,13 +35,15 @@ public class ProxyServer {
private final int securedPort;
private final Realm realm;
private final List nonProxyHosts;
+ private final ProxyType proxyType;
- public ProxyServer(String host, int port, int securedPort, Realm realm, List nonProxyHosts) {
+ public ProxyServer(String host, int port, int securedPort, Realm realm, List nonProxyHosts, ProxyType proxyType) {
this.host = host;
this.port = port;
this.securedPort = securedPort;
this.realm = realm;
this.nonProxyHosts = nonProxyHosts;
+ this.proxyType = proxyType;
}
public String getHost() {
@@ -52,6 +54,10 @@ public int getPort() {
return port;
}
+ public ProxyType getProxyType() {
+ return proxyType;
+ }
+
public int getSecuredPort() {
return securedPort;
}
@@ -103,6 +109,7 @@ public static class Builder {
private int securedPort;
private Realm realm;
private List nonProxyHosts;
+ private ProxyType proxyType;
public Builder(String host, int port) {
this.host = host;
@@ -115,6 +122,11 @@ public Builder setSecuredPort(int securedPort) {
return this;
}
+ public Builder setProxyType(ProxyType proxyType) {
+ this.proxyType = proxyType;
+ return this;
+ }
+
public Builder setRealm(Realm realm) {
this.realm = realm;
return this;
@@ -139,7 +151,8 @@ public Builder setNonProxyHosts(List nonProxyHosts) {
public ProxyServer build() {
List nonProxyHosts = this.nonProxyHosts != null ? Collections.unmodifiableList(this.nonProxyHosts) : Collections.emptyList();
- return new ProxyServer(host, port, securedPort, realm, nonProxyHosts);
+ ProxyType proxyType = this.proxyType != null ? this.proxyType : ProxyType.HTTP;
+ return new ProxyServer(host, port, securedPort, realm, nonProxyHosts, proxyType);
}
}
}
diff --git a/client/src/main/java/org/asynchttpclient/proxy/ProxyType.java b/client/src/main/java/org/asynchttpclient/proxy/ProxyType.java
new file mode 100644
index 0000000000..866d099d34
--- /dev/null
+++ b/client/src/main/java/org/asynchttpclient/proxy/ProxyType.java
@@ -0,0 +1,21 @@
+package org.asynchttpclient.proxy;
+
+public enum ProxyType {
+ HTTP,
+ SOCKS_V4 {
+ @Override
+ public boolean isSocks() {
+ return true;
+ }
+ },
+ SOCKS_V5 {
+ @Override
+ public boolean isSocks() {
+ return true;
+ }
+ };
+
+ public boolean isSocks() {
+ return false;
+ }
+}
diff --git a/client/src/test/java/org/asynchttpclient/proxy/ProxyTest.java b/client/src/test/java/org/asynchttpclient/proxy/ProxyTest.java
index d4a29e87be..74faca3ef7 100644
--- a/client/src/test/java/org/asynchttpclient/proxy/ProxyTest.java
+++ b/client/src/test/java/org/asynchttpclient/proxy/ProxyTest.java
@@ -43,6 +43,7 @@
import org.asynchttpclient.Response;
import org.asynchttpclient.config.AsyncHttpClientConfigDefaults;
import org.asynchttpclient.config.AsyncHttpClientConfigHelper;
+import org.asynchttpclient.testserver.SOCKSProxy;
import org.asynchttpclient.util.ProxyUtils;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.testng.annotations.Test;
@@ -192,6 +193,25 @@ public void runSequentiallyBecauseNotThreadSafe() throws Exception {
testUseProxySelector();
}
+ @Test(groups = "standalone")
+ public void runSocksProxy() throws Exception {
+ new Thread(() -> {
+ try {
+ new SOCKSProxy(60000);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ try (AsyncHttpClient client = asyncHttpClient()) {
+ String target = "http://localhost/" + port1 + "/";
+ Future f = client.prepareGet(target).setProxyServer(new ProxyServer.Builder("localhost", 8000).setProxyType(ProxyType.SOCKS_V4)).execute();
+
+ assertEquals(200, f.get(60, TimeUnit.SECONDS).getStatusCode());
+ }
+ }
+
+
// @Test(groups = "standalone")
public void testProxyProperties() throws IOException, ExecutionException, TimeoutException, InterruptedException {
// FIXME not threadsafe!
diff --git a/client/src/test/java/org/asynchttpclient/testserver/SOCKSProxy.java b/client/src/test/java/org/asynchttpclient/testserver/SOCKSProxy.java
new file mode 100644
index 0000000000..3a5d225d1b
--- /dev/null
+++ b/client/src/test/java/org/asynchttpclient/testserver/SOCKSProxy.java
@@ -0,0 +1,192 @@
+package org.asynchttpclient.testserver;
+
+/*************************************
+ * SOCKS Proxy in JAVA
+ * By Gareth Owen
+ * drgowen@gmail.com
+ * MIT Licence
+ ************************************/
+
+// NOTES : LISTENS ON PORT 8000
+
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.io.*;
+import java.net.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+
+public class SOCKSProxy {
+
+ // socks client class - one per client connection
+ class SocksClient {
+ SocketChannel client, remote;
+ boolean connected;
+ long lastData = 0;
+
+ SocksClient(SocketChannel c) throws IOException {
+ client = c;
+ client.configureBlocking(false);
+ lastData = System.currentTimeMillis();
+ }
+
+ public void newRemoteData(Selector selector, SelectionKey sk) throws IOException {
+ ByteBuffer buf = ByteBuffer.allocate(1024);
+ if(remote.read(buf) == -1)
+ throw new IOException("disconnected");
+ lastData = System.currentTimeMillis();
+ buf.flip();
+ client.write(buf);
+ }
+
+ public void newClientData(Selector selector, SelectionKey sk) throws IOException {
+ if(!connected) {
+ ByteBuffer inbuf = ByteBuffer.allocate(512);
+ if(client.read(inbuf)<1)
+ return;
+ inbuf.flip();
+
+ // read socks header
+ int ver = inbuf.get();
+ if (ver != 4) {
+ throw new IOException("incorrect version" + ver);
+ }
+ int cmd = inbuf.get();
+
+ // check supported command
+ if (cmd != 1) {
+ throw new IOException("incorrect version");
+ }
+
+ final int port = inbuf.getShort() & 0xffff;
+
+ final byte ip[] = new byte[4];
+ // fetch IP
+ inbuf.get(ip);
+
+ InetAddress remoteAddr = InetAddress.getByAddress(ip);
+
+ while ((inbuf.get()) != 0) ; // username
+
+ // hostname provided, not IP
+ if (ip[0] == 0 && ip[1] == 0 && ip[2] == 0 && ip[3] != 0) { // host provided
+ StringBuilder host = new StringBuilder();
+ byte b;
+ while ((b = inbuf.get()) != 0) {
+ host.append(b);
+ }
+ remoteAddr = InetAddress.getByName(host.toString());
+ System.out.println(host.toString() + remoteAddr);
+ }
+
+ remote = SocketChannel.open(new InetSocketAddress(remoteAddr, port));
+
+ ByteBuffer out = ByteBuffer.allocate(20);
+ out.put((byte)0);
+ out.put((byte) (remote.isConnected() ? 0x5a : 0x5b));
+ out.putShort((short) port);
+ out.put(remoteAddr.getAddress());
+ out.flip();
+ client.write(out);
+
+ if(!remote.isConnected())
+ throw new IOException("connect failed");
+
+ remote.configureBlocking(false);
+ remote.register(selector, SelectionKey.OP_READ);
+
+ connected = true;
+ } else {
+ ByteBuffer buf = ByteBuffer.allocate(1024);
+ if(client.read(buf) == -1)
+ throw new IOException("disconnected");
+ lastData = System.currentTimeMillis();
+ buf.flip();
+ remote.write(buf);
+ }
+ }
+ }
+
+ static ArrayList clients = new ArrayList();
+
+ // utility function
+ public SocksClient addClient(SocketChannel s) {
+ SocksClient cl;
+ try {
+ cl = new SocksClient(s);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ clients.add(cl);
+ return cl;
+ }
+
+ public SOCKSProxy(int runningTime) throws IOException {
+ ServerSocketChannel socks = ServerSocketChannel.open();
+ socks.socket().bind(new InetSocketAddress(8000));
+ socks.configureBlocking(false);
+ Selector select = Selector.open();
+ socks.register(select, SelectionKey.OP_ACCEPT);
+
+ int lastClients = clients.size();
+ // select loop
+ for (long end = System.currentTimeMillis() + runningTime; System.currentTimeMillis() < end;) {
+ select.select(5000);
+
+ Set keys = select.selectedKeys();
+ for (Object key : keys) {
+ SelectionKey k = (SelectionKey) key;
+
+ if (!k.isValid())
+ continue;
+
+ // new connection?
+ if (k.isAcceptable() && k.channel() == socks) {
+ // server socket
+ SocketChannel csock = socks.accept();
+ if (csock == null)
+ continue;
+ addClient(csock);
+ csock.register(select, SelectionKey.OP_READ);
+ } else if (k.isReadable()) {
+ // new data on a client/remote socket
+ for (int i = 0; i < clients.size(); i++) {
+ SocksClient cl = clients.get(i);
+ try {
+ if (k.channel() == cl.client) // from client (e.g. socks client)
+ cl.newClientData(select, k);
+ else if (k.channel() == cl.remote) { // from server client is connected to (e.g. website)
+ cl.newRemoteData(select, k);
+ }
+ } catch (IOException e) { // error occurred - remove client
+ cl.client.close();
+ if (cl.remote != null)
+ cl.remote.close();
+ k.cancel();
+ clients.remove(cl);
+ }
+
+ }
+ }
+ }
+
+ // client timeout check
+ for (int i = 0; i < clients.size(); i++) {
+ SocksClient cl = clients.get(i);
+ if((System.currentTimeMillis() - cl.lastData) > 30000L) {
+ cl.client.close();
+ if(cl.remote != null)
+ cl.remote.close();
+ clients.remove(cl);
+ }
+ }
+ if(clients.size() != lastClients) {
+ System.out.println(clients.size());
+ lastClients = clients.size();
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b1a66bff47..02ed18c103 100644
--- a/pom.xml
+++ b/pom.xml
@@ -230,6 +230,16 @@
netty-codec
${netty.version}
+
+ io.netty
+ netty-codec-socks
+ ${netty.version}
+
+
+ io.netty
+ netty-handler-proxy
+ ${netty.version}
+
io.netty
netty-common