diff --git a/client/pom.xml b/client/pom.xml index ee1eb829f9..2ae8bfc91b 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -43,6 +43,14 @@ netty-transport-native-epoll linux-x86_64 + + io.netty + netty-codec-socks + + + io.netty + netty-handler-proxy + io.netty netty-resolver-dns diff --git a/client/src/main/java/org/asynchttpclient/channel/ChannelPoolPartitioning.java b/client/src/main/java/org/asynchttpclient/channel/ChannelPoolPartitioning.java index 4cce3c6361..38d704c7fc 100644 --- a/client/src/main/java/org/asynchttpclient/channel/ChannelPoolPartitioning.java +++ b/client/src/main/java/org/asynchttpclient/channel/ChannelPoolPartitioning.java @@ -13,6 +13,7 @@ package org.asynchttpclient.channel; import org.asynchttpclient.proxy.ProxyServer; +import org.asynchttpclient.proxy.ProxyType; import org.asynchttpclient.uri.Uri; import org.asynchttpclient.util.HttpUtils; @@ -23,12 +24,14 @@ class ProxyPartitionKey { private final int proxyPort; private final boolean secured; private final String targetHostBaseUrl; + private final ProxyType proxyType; - public ProxyPartitionKey(String proxyHost, int proxyPort, boolean secured, String targetHostBaseUrl) { + public ProxyPartitionKey(String proxyHost, int proxyPort, boolean secured, String targetHostBaseUrl, ProxyType proxyType) { this.proxyHost = proxyHost; this.proxyPort = proxyPort; this.secured = secured; this.targetHostBaseUrl = targetHostBaseUrl; + this.proxyType = proxyType; } @Override @@ -37,6 +40,7 @@ public int hashCode() { int result = 1; result = prime * result + ((proxyHost == null) ? 0 : proxyHost.hashCode()); result = prime * result + proxyPort; + result = prime * result + proxyType.hashCode(); result = prime * result + (secured ? 1231 : 1237); result = prime * result + ((targetHostBaseUrl == null) ? 0 : targetHostBaseUrl.hashCode()); return result; @@ -60,6 +64,8 @@ public boolean equals(Object obj) { return false; if (secured != other.secured) return false; + if (proxyType != other.proxyType) + return false; if (targetHostBaseUrl == null) { if (other.targetHostBaseUrl != null) return false; @@ -70,12 +76,13 @@ public boolean equals(Object obj) { @Override public String toString() { - return new StringBuilder()// - .append("ProxyPartitionKey(proxyHost=").append(proxyHost)// - .append(", proxyPort=").append(proxyPort)// - .append(", secured=").append(secured)// - .append(", targetHostBaseUrl=").append(targetHostBaseUrl)// - .toString(); + return // + "ProxyPartitionKey(proxyHost=" + proxyHost +// + ", proxyPort=" + proxyPort +// + ", secured=" + secured +// + ", targetHostBaseUrl=" + targetHostBaseUrl +// + ", proxyType=" + proxyType// + ; } } @@ -89,8 +96,8 @@ public Object getPartitionKey(Uri uri, String virtualHost, ProxyServer proxyServ String targetHostBaseUrl = virtualHost != null ? virtualHost : HttpUtils.getBaseUrl(uri); if (proxyServer != null) { return uri.isSecured() ? // - new ProxyPartitionKey(proxyServer.getHost(), proxyServer.getSecuredPort(), true, targetHostBaseUrl) - : new ProxyPartitionKey(proxyServer.getHost(), proxyServer.getPort(), false, targetHostBaseUrl); + new ProxyPartitionKey(proxyServer.getHost(), proxyServer.getSecuredPort(), true, targetHostBaseUrl, proxyServer.getProxyType()) + : new ProxyPartitionKey(proxyServer.getHost(), proxyServer.getPort(), false, targetHostBaseUrl, proxyServer.getProxyType()); } else { return targetHostBaseUrl; } diff --git a/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java b/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java index ff45fb681b..4261216420 100755 --- a/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java +++ b/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java @@ -220,7 +220,7 @@ public String toString() { sb.append("\t\t").append(header.getKey()).append(": ").append(header.getValue()).append("\n"); } sb.append("\tbody=\n").append(getResponseBody()).append("\n")// - .append("}").toString(); + .append("}"); return sb.toString(); } } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index 0a6b0caad8..6b652579b4 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -17,12 +17,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; +import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -34,13 +29,16 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.proxy.Socks4ProxyHandler; +import io.netty.handler.proxy.Socks5ProxyHandler; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.Timer; -import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.*; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ThreadFactory; @@ -51,11 +49,7 @@ import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; -import org.asynchttpclient.AsyncHandler; -import org.asynchttpclient.AsyncHttpClientConfig; -import org.asynchttpclient.ClientStats; -import org.asynchttpclient.HostStats; -import org.asynchttpclient.SslEngineFactory; +import org.asynchttpclient.*; import org.asynchttpclient.channel.ChannelPool; import org.asynchttpclient.channel.ChannelPoolPartitioning; import org.asynchttpclient.channel.NoopChannelPool; @@ -68,6 +62,7 @@ import org.asynchttpclient.netty.request.NettyRequestSender; import org.asynchttpclient.netty.ssl.DefaultSslEngineFactory; import org.asynchttpclient.proxy.ProxyServer; +import org.asynchttpclient.proxy.ProxyType; import org.asynchttpclient.uri.Uri; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +73,7 @@ public class ChannelManager { public static final String PINNED_ENTRY = "entry"; public static final String HTTP_CLIENT_CODEC = "http"; public static final String SSL_HANDLER = "ssl"; + public static final String SOCKS_HANDLER = "socks"; public static final String DEFLATER_HANDLER = "deflater"; public static final String INFLATER_HANDLER = "inflater"; public static final String CHUNKED_WRITER_HANDLER = "chunked-writer"; @@ -391,8 +387,49 @@ public SslHandler addSslHandler(ChannelPipeline pipeline, Uri uri, String virtua return sslHandler; } - public Bootstrap getBootstrap(Uri uri, ProxyServer proxy) { - return uri.isWebSocket() && proxy == null ? wsBootstrap : httpBootstrap; + public Promise getBootstrap(Request request, ProxyServer proxy) { + final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); + + if (request.getUri().isWebSocket() && proxy == null) { + promise.setSuccess(wsBootstrap); + } else { + if (proxy != null && proxy.getProxyType().isSocks()) { + request.getNameResolver().resolve(proxy.getHost()).addListener((Future proxyDnsFuture) -> { + if (proxyDnsFuture.isSuccess()) { + Bootstrap bootstrap = httpBootstrap.clone(); + ChannelHandler handler = bootstrap.config().handler(); + + bootstrap.handler(new ChannelInitializer() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + handler.handlerAdded(ctx); + super.handlerAdded(ctx); + } + + @Override + protected void initChannel(Channel channel) throws Exception { + InetSocketAddress proxyAddress = new InetSocketAddress(proxyDnsFuture.get(), proxy.getPort()); + if (proxy.getProxyType() == ProxyType.SOCKS_V4) { + channel.pipeline().addFirst(ChannelManager.SOCKS_HANDLER, new Socks4ProxyHandler(proxyAddress)); + } else if (proxy.getProxyType() == ProxyType.SOCKS_V5) { + channel.pipeline().addFirst(ChannelManager.SOCKS_HANDLER, new Socks5ProxyHandler(proxyAddress)); + } else { + throw new IllegalArgumentException("Only SOCKS4 and SOCKS5 supported at the moment."); + } + } + }); + + promise.setSuccess(bootstrap); + } else { + promise.setFailure(proxyDnsFuture.cause()); + } + }); + } else { + promise.setSuccess(httpBootstrap); + } + } + + return promise; } public void upgradePipelineForWebSockets(ChannelPipeline pipeline) { diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java index 5140535cea..50e20ade36 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java @@ -17,24 +17,36 @@ import static org.asynchttpclient.util.HttpUtils.getBaseUrl; import io.netty.channel.Channel; import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.proxy.Socks4ProxyHandler; +import io.netty.handler.proxy.Socks5ProxyHandler; import io.netty.handler.ssl.SslHandler; import java.net.ConnectException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.asynchttpclient.Request; +import org.asynchttpclient.RequestBuilderBase; import org.asynchttpclient.handler.AsyncHandlerExtensions; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.SimpleFutureListener; import org.asynchttpclient.netty.future.StackTraceInspector; import org.asynchttpclient.netty.request.NettyRequestSender; import org.asynchttpclient.netty.timeout.TimeoutsHolder; +import org.asynchttpclient.proxy.ProxyServer; +import org.asynchttpclient.proxy.ProxyType; import org.asynchttpclient.uri.Uri; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLException; + /** * Non Blocking connect. */ @@ -94,12 +106,7 @@ public void onSuccess(Channel channel, InetSocketAddress remoteAddress) { Object partitionKeyLock = future.takePartitionKeyLock(); if (partitionKeyLock != null) { - channel.closeFuture().addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - connectionSemaphore.releaseChannelLock(partitionKeyLock); - } - }); + channel.closeFuture().addListener(future -> connectionSemaphore.releaseChannelLock(partitionKeyLock)); } } diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestFactory.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestFactory.java index a26f4fc063..58251febdd 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestFactory.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestFactory.java @@ -45,6 +45,7 @@ import org.asynchttpclient.netty.request.body.NettyMultipartBody; import org.asynchttpclient.netty.request.body.NettyReactiveStreamsBody; import org.asynchttpclient.proxy.ProxyServer; +import org.asynchttpclient.proxy.ProxyType; import org.asynchttpclient.request.body.generator.FileBodyGenerator; import org.asynchttpclient.request.body.generator.InputStreamBodyGenerator; import org.asynchttpclient.request.body.generator.ReactiveStreamsBodyGenerator; @@ -237,7 +238,7 @@ private String requestUri(Uri uri, ProxyServer proxyServer, boolean connect) { // proxy tunnelling, connect need host and explicit port return getAuthority(uri); - } else if (proxyServer != null && !uri.isSecured()) { + } else if (proxyServer != null && !uri.isSecured() && proxyServer.getProxyType() == ProxyType.HTTP) { // proxy over HTTP, need full url return uri.toUrl(); diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 53e07c1ec8..31b8e4d46b 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -13,41 +13,18 @@ */ package org.asynchttpclient.netty.request; -import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT; -import static java.util.Collections.singletonList; -import static org.asynchttpclient.handler.AsyncHandlerExtensionsUtils.toAsyncHandlerExtensions; -import static org.asynchttpclient.util.Assertions.assertNotNull; -import static org.asynchttpclient.util.AuthenticatorUtils.*; -import static org.asynchttpclient.util.HttpConstants.Methods.*; -import static org.asynchttpclient.util.MiscUtils.getCause; -import static org.asynchttpclient.util.ProxyUtils.getProxyServer; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelProgressivePromise; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.DefaultHttpHeaders; -import io.netty.handler.codec.http.HttpHeaderValues; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.*; import io.netty.util.Timer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; - -import org.asynchttpclient.AsyncHandler; -import org.asynchttpclient.AsyncHttpClientConfig; -import org.asynchttpclient.AsyncHttpClientState; -import org.asynchttpclient.ListenableFuture; -import org.asynchttpclient.Realm; +import org.asynchttpclient.*; import org.asynchttpclient.Realm.AuthScheme; -import org.asynchttpclient.Request; import org.asynchttpclient.exception.PoolAlreadyClosedException; import org.asynchttpclient.exception.RemotelyClosedException; import org.asynchttpclient.filter.FilterContext; @@ -58,11 +35,7 @@ import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.OnLastHttpContentCallback; import org.asynchttpclient.netty.SimpleFutureListener; -import org.asynchttpclient.netty.channel.ChannelManager; -import org.asynchttpclient.netty.channel.ChannelState; -import org.asynchttpclient.netty.channel.Channels; -import org.asynchttpclient.netty.channel.ConnectionSemaphore; -import org.asynchttpclient.netty.channel.NettyConnectListener; +import org.asynchttpclient.netty.channel.*; import org.asynchttpclient.netty.timeout.TimeoutsHolder; import org.asynchttpclient.proxy.ProxyServer; import org.asynchttpclient.resolver.RequestHostnameResolver; @@ -71,6 +44,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; + +import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT; +import static java.util.Collections.singletonList; +import static org.asynchttpclient.handler.AsyncHandlerExtensionsUtils.toAsyncHandlerExtensions; +import static org.asynchttpclient.util.Assertions.assertNotNull; +import static org.asynchttpclient.util.AuthenticatorUtils.perConnectionAuthorizationHeader; +import static org.asynchttpclient.util.AuthenticatorUtils.perConnectionProxyAuthorizationHeader; +import static org.asynchttpclient.util.HttpConstants.Methods.CONNECT; +import static org.asynchttpclient.util.HttpConstants.Methods.GET; +import static org.asynchttpclient.util.MiscUtils.getCause; +import static org.asynchttpclient.util.ProxyUtils.getProxyServer; + public final class NettyRequestSender { private static final Logger LOGGER = LoggerFactory.getLogger(NettyRequestSender.class); @@ -108,7 +97,8 @@ public ListenableFuture sendRequest(final Request request,// ProxyServer proxyServer = getProxyServer(config, request); // websockets use connect tunnelling to work with proxies - if (proxyServer != null && (request.getUri().isSecured() || request.getUri().isWebSocket()) && !isConnectDone(request, future)) + if (proxyServer != null && (request.getUri().isSecured() || request.getUri().isWebSocket()) && !isConnectDone(request, future) // + && !proxyServer.getProxyType().isSocks()) if (future != null && future.isConnectAllowed()) // SSL proxy or websocket: CONNECT for sure return sendRequestWithCertainForceConnect(request, asyncHandler, future, performingNextRequest, proxyServer, true); @@ -282,10 +272,6 @@ private ListenableFuture sendRequestWithNewChannel(// future.setInAuth(realm != null && realm.isUsePreemptiveAuth() && realm.getScheme() != AuthScheme.NTLM); future.setInProxyAuth(proxyRealm != null && proxyRealm.isUsePreemptiveAuth() && proxyRealm.getScheme() != AuthScheme.NTLM); - // Do not throw an exception when we need an extra connection for a redirect - // FIXME why? This violate the max connection per host handling, right? - Bootstrap bootstrap = channelManager.getBootstrap(request.getUri(), proxy); - Object partitionKey = future.getPartitionKey(); try { @@ -310,7 +296,15 @@ protected void onSuccess(List addresses) { NettyConnectListener connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, connectionSemaphore, partitionKey); NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, clientState, config); if (!future.isDone()) { - connector.connect(bootstrap, connectListener); + // Do not throw an exception when we need an extra connection for a redirect + // FIXME why? This violate the max connection per host handling, right? + channelManager.getBootstrap(request, proxy).addListener((Future bootstrapFuture) -> { + if (bootstrapFuture.isSuccess()) { + connector.connect(bootstrapFuture.get(), connectListener); + } else { + abort(null, future, bootstrapFuture.cause()); + } + }); } } @@ -332,7 +326,7 @@ private Future> resolveAddresses( Uri uri = request.getUri(); final Promise> promise = ImmediateEventExecutor.INSTANCE.newPromise(); - if (proxy != null && !proxy.isIgnoredForHost(uri.getHost())) { + if (proxy != null && !proxy.isIgnoredForHost(uri.getHost()) && !proxy.getProxyType().isSocks()) { int port = uri.isSecured() ? proxy.getSecuredPort() : proxy.getPort(); InetSocketAddress unresolvedRemoteAddress = InetSocketAddress.createUnresolved(proxy.getHost(), port); scheduleRequestTimeout(future, unresolvedRemoteAddress); diff --git a/client/src/main/java/org/asynchttpclient/proxy/ProxyServer.java b/client/src/main/java/org/asynchttpclient/proxy/ProxyServer.java index 62e5a53932..6c0326652f 100644 --- a/client/src/main/java/org/asynchttpclient/proxy/ProxyServer.java +++ b/client/src/main/java/org/asynchttpclient/proxy/ProxyServer.java @@ -35,13 +35,15 @@ public class ProxyServer { private final int securedPort; private final Realm realm; private final List nonProxyHosts; + private final ProxyType proxyType; - public ProxyServer(String host, int port, int securedPort, Realm realm, List nonProxyHosts) { + public ProxyServer(String host, int port, int securedPort, Realm realm, List nonProxyHosts, ProxyType proxyType) { this.host = host; this.port = port; this.securedPort = securedPort; this.realm = realm; this.nonProxyHosts = nonProxyHosts; + this.proxyType = proxyType; } public String getHost() { @@ -52,6 +54,10 @@ public int getPort() { return port; } + public ProxyType getProxyType() { + return proxyType; + } + public int getSecuredPort() { return securedPort; } @@ -103,6 +109,7 @@ public static class Builder { private int securedPort; private Realm realm; private List nonProxyHosts; + private ProxyType proxyType; public Builder(String host, int port) { this.host = host; @@ -115,6 +122,11 @@ public Builder setSecuredPort(int securedPort) { return this; } + public Builder setProxyType(ProxyType proxyType) { + this.proxyType = proxyType; + return this; + } + public Builder setRealm(Realm realm) { this.realm = realm; return this; @@ -139,7 +151,8 @@ public Builder setNonProxyHosts(List nonProxyHosts) { public ProxyServer build() { List nonProxyHosts = this.nonProxyHosts != null ? Collections.unmodifiableList(this.nonProxyHosts) : Collections.emptyList(); - return new ProxyServer(host, port, securedPort, realm, nonProxyHosts); + ProxyType proxyType = this.proxyType != null ? this.proxyType : ProxyType.HTTP; + return new ProxyServer(host, port, securedPort, realm, nonProxyHosts, proxyType); } } } diff --git a/client/src/main/java/org/asynchttpclient/proxy/ProxyType.java b/client/src/main/java/org/asynchttpclient/proxy/ProxyType.java new file mode 100644 index 0000000000..866d099d34 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/proxy/ProxyType.java @@ -0,0 +1,21 @@ +package org.asynchttpclient.proxy; + +public enum ProxyType { + HTTP, + SOCKS_V4 { + @Override + public boolean isSocks() { + return true; + } + }, + SOCKS_V5 { + @Override + public boolean isSocks() { + return true; + } + }; + + public boolean isSocks() { + return false; + } +} diff --git a/client/src/test/java/org/asynchttpclient/proxy/ProxyTest.java b/client/src/test/java/org/asynchttpclient/proxy/ProxyTest.java index d4a29e87be..74faca3ef7 100644 --- a/client/src/test/java/org/asynchttpclient/proxy/ProxyTest.java +++ b/client/src/test/java/org/asynchttpclient/proxy/ProxyTest.java @@ -43,6 +43,7 @@ import org.asynchttpclient.Response; import org.asynchttpclient.config.AsyncHttpClientConfigDefaults; import org.asynchttpclient.config.AsyncHttpClientConfigHelper; +import org.asynchttpclient.testserver.SOCKSProxy; import org.asynchttpclient.util.ProxyUtils; import org.eclipse.jetty.server.handler.AbstractHandler; import org.testng.annotations.Test; @@ -192,6 +193,25 @@ public void runSequentiallyBecauseNotThreadSafe() throws Exception { testUseProxySelector(); } + @Test(groups = "standalone") + public void runSocksProxy() throws Exception { + new Thread(() -> { + try { + new SOCKSProxy(60000); + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + + try (AsyncHttpClient client = asyncHttpClient()) { + String target = "http://localhost/" + port1 + "/"; + Future f = client.prepareGet(target).setProxyServer(new ProxyServer.Builder("localhost", 8000).setProxyType(ProxyType.SOCKS_V4)).execute(); + + assertEquals(200, f.get(60, TimeUnit.SECONDS).getStatusCode()); + } + } + + // @Test(groups = "standalone") public void testProxyProperties() throws IOException, ExecutionException, TimeoutException, InterruptedException { // FIXME not threadsafe! diff --git a/client/src/test/java/org/asynchttpclient/testserver/SOCKSProxy.java b/client/src/test/java/org/asynchttpclient/testserver/SOCKSProxy.java new file mode 100644 index 0000000000..3a5d225d1b --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/testserver/SOCKSProxy.java @@ -0,0 +1,192 @@ +package org.asynchttpclient.testserver; + +/************************************* + * SOCKS Proxy in JAVA + * By Gareth Owen + * drgowen@gmail.com + * MIT Licence + ************************************/ + +// NOTES : LISTENS ON PORT 8000 + +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.io.*; +import java.net.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; + +public class SOCKSProxy { + + // socks client class - one per client connection + class SocksClient { + SocketChannel client, remote; + boolean connected; + long lastData = 0; + + SocksClient(SocketChannel c) throws IOException { + client = c; + client.configureBlocking(false); + lastData = System.currentTimeMillis(); + } + + public void newRemoteData(Selector selector, SelectionKey sk) throws IOException { + ByteBuffer buf = ByteBuffer.allocate(1024); + if(remote.read(buf) == -1) + throw new IOException("disconnected"); + lastData = System.currentTimeMillis(); + buf.flip(); + client.write(buf); + } + + public void newClientData(Selector selector, SelectionKey sk) throws IOException { + if(!connected) { + ByteBuffer inbuf = ByteBuffer.allocate(512); + if(client.read(inbuf)<1) + return; + inbuf.flip(); + + // read socks header + int ver = inbuf.get(); + if (ver != 4) { + throw new IOException("incorrect version" + ver); + } + int cmd = inbuf.get(); + + // check supported command + if (cmd != 1) { + throw new IOException("incorrect version"); + } + + final int port = inbuf.getShort() & 0xffff; + + final byte ip[] = new byte[4]; + // fetch IP + inbuf.get(ip); + + InetAddress remoteAddr = InetAddress.getByAddress(ip); + + while ((inbuf.get()) != 0) ; // username + + // hostname provided, not IP + if (ip[0] == 0 && ip[1] == 0 && ip[2] == 0 && ip[3] != 0) { // host provided + StringBuilder host = new StringBuilder(); + byte b; + while ((b = inbuf.get()) != 0) { + host.append(b); + } + remoteAddr = InetAddress.getByName(host.toString()); + System.out.println(host.toString() + remoteAddr); + } + + remote = SocketChannel.open(new InetSocketAddress(remoteAddr, port)); + + ByteBuffer out = ByteBuffer.allocate(20); + out.put((byte)0); + out.put((byte) (remote.isConnected() ? 0x5a : 0x5b)); + out.putShort((short) port); + out.put(remoteAddr.getAddress()); + out.flip(); + client.write(out); + + if(!remote.isConnected()) + throw new IOException("connect failed"); + + remote.configureBlocking(false); + remote.register(selector, SelectionKey.OP_READ); + + connected = true; + } else { + ByteBuffer buf = ByteBuffer.allocate(1024); + if(client.read(buf) == -1) + throw new IOException("disconnected"); + lastData = System.currentTimeMillis(); + buf.flip(); + remote.write(buf); + } + } + } + + static ArrayList clients = new ArrayList(); + + // utility function + public SocksClient addClient(SocketChannel s) { + SocksClient cl; + try { + cl = new SocksClient(s); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + clients.add(cl); + return cl; + } + + public SOCKSProxy(int runningTime) throws IOException { + ServerSocketChannel socks = ServerSocketChannel.open(); + socks.socket().bind(new InetSocketAddress(8000)); + socks.configureBlocking(false); + Selector select = Selector.open(); + socks.register(select, SelectionKey.OP_ACCEPT); + + int lastClients = clients.size(); + // select loop + for (long end = System.currentTimeMillis() + runningTime; System.currentTimeMillis() < end;) { + select.select(5000); + + Set keys = select.selectedKeys(); + for (Object key : keys) { + SelectionKey k = (SelectionKey) key; + + if (!k.isValid()) + continue; + + // new connection? + if (k.isAcceptable() && k.channel() == socks) { + // server socket + SocketChannel csock = socks.accept(); + if (csock == null) + continue; + addClient(csock); + csock.register(select, SelectionKey.OP_READ); + } else if (k.isReadable()) { + // new data on a client/remote socket + for (int i = 0; i < clients.size(); i++) { + SocksClient cl = clients.get(i); + try { + if (k.channel() == cl.client) // from client (e.g. socks client) + cl.newClientData(select, k); + else if (k.channel() == cl.remote) { // from server client is connected to (e.g. website) + cl.newRemoteData(select, k); + } + } catch (IOException e) { // error occurred - remove client + cl.client.close(); + if (cl.remote != null) + cl.remote.close(); + k.cancel(); + clients.remove(cl); + } + + } + } + } + + // client timeout check + for (int i = 0; i < clients.size(); i++) { + SocksClient cl = clients.get(i); + if((System.currentTimeMillis() - cl.lastData) > 30000L) { + cl.client.close(); + if(cl.remote != null) + cl.remote.close(); + clients.remove(cl); + } + } + if(clients.size() != lastClients) { + System.out.println(clients.size()); + lastClients = clients.size(); + } + } + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index b1a66bff47..02ed18c103 100644 --- a/pom.xml +++ b/pom.xml @@ -230,6 +230,16 @@ netty-codec ${netty.version} + + io.netty + netty-codec-socks + ${netty.version} + + + io.netty + netty-handler-proxy + ${netty.version} + io.netty netty-common