89
89
import org .jboss .netty .handler .codec .http .HttpHeaders ;
90
90
import org .jboss .netty .handler .codec .http .HttpMethod ;
91
91
import org .jboss .netty .handler .codec .http .HttpRequest ;
92
- import org .jboss .netty .handler .codec .http .HttpRequestEncoder ;
93
92
import org .jboss .netty .handler .codec .http .HttpResponse ;
94
- import org .jboss .netty .handler .codec .http .HttpResponseDecoder ;
95
93
import org .jboss .netty .handler .codec .http .HttpVersion ;
96
94
import org .jboss .netty .handler .codec .http .websocketx .BinaryWebSocketFrame ;
97
95
import org .jboss .netty .handler .codec .http .websocketx .CloseWebSocketFrame ;
@@ -166,8 +164,11 @@ public class NettyAsyncHttpProvider extends SimpleChannelUpstreamHandler impleme
166
164
static {
167
165
REMOTELY_CLOSED_EXCEPTION .setStackTrace (new StackTraceElement [0 ]);
168
166
}
169
- private final static String HTTP_HANDLER = "httpHandler" ;
170
- protected final static String SSL_HANDLER = "sslHandler" ;
167
+ public final static String HTTP_HANDLER = "httpHandler" ;
168
+ public final static String SSL_HANDLER = "sslHandler" ;
169
+ public final static String HTTP_PROCESSOR = "httpProcessor" ;
170
+ public final static String WS_PROCESSOR = "wsProcessor" ;
171
+
171
172
private final static String HTTPS = "https" ;
172
173
private final static String HTTP = "http" ;
173
174
private static final String WEBSOCKET = "ws" ;
@@ -318,7 +319,7 @@ public ChannelPipeline getPipeline() throws Exception {
318
319
pipeline .addLast ("inflater" , new HttpContentDecompressor ());
319
320
}
320
321
pipeline .addLast ("chunkedWriter" , new ChunkedWriteHandler ());
321
- pipeline .addLast ("httpProcessor" , NettyAsyncHttpProvider .this );
322
+ pipeline .addLast (HTTP_PROCESSOR , NettyAsyncHttpProvider .this );
322
323
return pipeline ;
323
324
}
324
325
});
@@ -338,9 +339,8 @@ public ChannelPipeline getPipeline() throws Exception {
338
339
/* @Override */
339
340
public ChannelPipeline getPipeline () throws Exception {
340
341
ChannelPipeline pipeline = pipeline ();
341
- pipeline .addLast ("http-decoder" , new HttpResponseDecoder ());
342
- pipeline .addLast ("http-encoder" , new HttpRequestEncoder ());
343
- pipeline .addLast ("httpProcessor" , NettyAsyncHttpProvider .this );
342
+ pipeline .addLast (HTTP_HANDLER , createHttpClientCodec ());
343
+ pipeline .addLast (WS_PROCESSOR , NettyAsyncHttpProvider .this );
344
344
return pipeline ;
345
345
}
346
346
});
@@ -378,7 +378,7 @@ public ChannelPipeline getPipeline() throws Exception {
378
378
pipeline .addLast ("inflater" , new HttpContentDecompressor ());
379
379
}
380
380
pipeline .addLast ("chunkedWriter" , new ChunkedWriteHandler ());
381
- pipeline .addLast ("httpProcessor" , NettyAsyncHttpProvider .this );
381
+ pipeline .addLast (HTTP_PROCESSOR , NettyAsyncHttpProvider .this );
382
382
return pipeline ;
383
383
}
384
384
});
@@ -395,9 +395,8 @@ public ChannelPipeline getPipeline() throws Exception {
395
395
abort (cl .future (), ex );
396
396
}
397
397
398
- pipeline .addLast ("http-decoder" , new HttpResponseDecoder ());
399
- pipeline .addLast ("http-encoder" , new HttpRequestEncoder ());
400
- pipeline .addLast ("httpProcessor" , NettyAsyncHttpProvider .this );
398
+ pipeline .addLast (HTTP_HANDLER , createHttpsClientCodec ());
399
+ pipeline .addLast (WS_PROCESSOR , NettyAsyncHttpProvider .this );
401
400
402
401
return pipeline ;
403
402
}
@@ -665,8 +664,8 @@ else if (uri.getRawQuery() != null)
665
664
path = uri .getRawPath ();
666
665
nettyRequest = new DefaultHttpRequest (HttpVersion .HTTP_1_1 , m , path );
667
666
}
668
- boolean webSocket = isWebSocket (uri );
669
- if (webSocket ) {
667
+ boolean webSocket = isWebSocket (uri . getScheme () );
668
+ if (! m . equals ( HttpMethod . CONNECT ) && webSocket ) {
670
669
nettyRequest .addHeader (HttpHeaders .Names .UPGRADE , HttpHeaders .Values .WEBSOCKET );
671
670
nettyRequest .addHeader (HttpHeaders .Names .CONNECTION , HttpHeaders .Values .UPGRADE );
672
671
nettyRequest .addHeader (HttpHeaders .Names .ORIGIN , "http://" + uri .getHost () + ":" + uri .getPort ());
@@ -979,7 +978,9 @@ private <T> ListenableFuture<T> doConnect(final Request request, final AsyncHand
979
978
}
980
979
981
980
ProxyServer proxyServer = ProxyUtils .getProxyServer (config , request );
982
- boolean useProxy = proxyServer != null ;
981
+
982
+ boolean resultOfAConnect = f != null && f .getNettyRequest () != null && f .getNettyRequest ().getMethod ().equals (HttpMethod .CONNECT );
983
+ boolean useProxy = proxyServer != null && !resultOfAConnect ;
983
984
984
985
URI uri ;
985
986
if (useRawUrl ) {
@@ -1059,7 +1060,8 @@ private <T> ListenableFuture<T> doConnect(final Request request, final AsyncHand
1059
1060
}
1060
1061
1061
1062
ChannelFuture channelFuture ;
1062
- ClientBootstrap bootstrap = request .getUrl ().startsWith (WEBSOCKET ) ? (useSSl ? secureWebSocketBootstrap : webSocketBootstrap ) : (useSSl ? secureBootstrap : plainBootstrap );
1063
+ ClientBootstrap bootstrap = (request .getUrl ().startsWith (WEBSOCKET ) && !useProxy ) ?
1064
+ (useSSl ? secureWebSocketBootstrap : webSocketBootstrap ) : (useSSl ? secureBootstrap : plainBootstrap );
1063
1065
bootstrap .setOption ("connectTimeoutMillis" , config .getConnectionTimeoutInMs ());
1064
1066
1065
1067
// Do no enable this with win.
@@ -1204,7 +1206,7 @@ public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) thr
1204
1206
return ;
1205
1207
}
1206
1208
1207
- Protocol p = (ctx .getPipeline ().get (HttpClientCodec . class ) != null ? httpProtocol : webSocketProtocol );
1209
+ Protocol p = (ctx .getPipeline ().get (HTTP_PROCESSOR ) != null ? httpProtocol : webSocketProtocol );
1208
1210
p .handle (ctx , e );
1209
1211
}
1210
1212
@@ -1409,6 +1411,10 @@ private void upgradeProtocol(ChannelPipeline p, String scheme) throws IOExceptio
1409
1411
} else {
1410
1412
p .addFirst (HTTP_HANDLER , createHttpClientCodec ());
1411
1413
}
1414
+
1415
+ if (isWebSocket (scheme )) {
1416
+ p .replace (HTTP_PROCESSOR , WS_PROCESSOR , NettyAsyncHttpProvider .this );
1417
+ }
1412
1418
}
1413
1419
1414
1420
public void channelClosed (ChannelHandlerContext ctx , ChannelStateEvent e ) throws Exception {
@@ -2315,8 +2321,8 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
2315
2321
return ;
2316
2322
}
2317
2323
2318
- ctx .getPipeline ().replace ("http-encoder" , "ws-encoder" , new WebSocket08FrameEncoder (true ));
2319
- ctx .getPipeline ().get ( HttpResponseDecoder . class ). replace ( "ws-decoder" , new WebSocket08FrameDecoder (false , false ));
2324
+ ctx .getPipeline ().replace (HTTP_HANDLER , "ws-encoder" , new WebSocket08FrameEncoder (true ));
2325
+ ctx .getPipeline ().addBefore ( WS_PROCESSOR , "ws-decoder" , new WebSocket08FrameDecoder (false , false ));
2320
2326
2321
2327
invokeOnSucces (ctx , h );
2322
2328
future .done ();
@@ -2437,8 +2443,8 @@ public Timeout newTimeoutInMs(TimerTask task, long delayInMs) {
2437
2443
return hashedWheelTimer .newTimeout (task , delayInMs , TimeUnit .MILLISECONDS );
2438
2444
}
2439
2445
2440
- private static boolean isWebSocket (URI uri ) {
2441
- return WEBSOCKET .equalsIgnoreCase (uri . getScheme ()) || WEBSOCKET_SSL .equalsIgnoreCase (uri . getScheme () );
2446
+ private static boolean isWebSocket (String scheme ) {
2447
+ return WEBSOCKET .equalsIgnoreCase (scheme ) || WEBSOCKET_SSL .equalsIgnoreCase (scheme );
2442
2448
}
2443
2449
2444
2450
private static boolean isSecure (String scheme ) {
0 commit comments