18
18
import org .jboss .netty .channel .Channel ;
19
19
import org .jboss .netty .channel .ChannelFuture ;
20
20
import org .jboss .netty .channel .ChannelFutureListener ;
21
+ import org .jboss .netty .handler .ssl .SslHandler ;
21
22
import org .slf4j .Logger ;
22
23
import org .slf4j .LoggerFactory ;
23
24
@@ -61,10 +62,12 @@ private void abortChannelPreemption(String partition) {
61
62
channelManager .abortChannelPreemption (partition );
62
63
}
63
64
64
- private void writeRequest (Channel channel , String partition ) {
65
+ private void writeRequest (Channel channel ) {
65
66
66
67
LOGGER .debug ("Request using non cached Channel '{}':\n {}\n " , channel , future .getNettyRequest ().getHttpRequest ());
67
68
69
+ Channels .setAttribute (channel , future );
70
+
68
71
if (future .isDone ()) {
69
72
abortChannelPreemption (partition );
70
73
return ;
@@ -79,8 +82,24 @@ private void writeRequest(Channel channel, String partition) {
79
82
}
80
83
81
84
private void onFutureSuccess (final Channel channel ) throws ConnectException {
82
- Channels .setAttribute (channel , future );
83
- writeRequest (channel , partition );
85
+
86
+ SslHandler sslHandler = channel .getPipeline ().get (SslHandler .class );
87
+
88
+ if (sslHandler != null ) {
89
+ sslHandler .handshake ().addListener (new ChannelFutureListener () {
90
+
91
+ @ Override
92
+ public void operationComplete (ChannelFuture future ) throws Exception {
93
+ if (future .isSuccess ())
94
+ writeRequest (channel );
95
+ else
96
+ onFutureFailure (channel , future .getCause ());
97
+ }
98
+ });
99
+
100
+ } else {
101
+ writeRequest (channel );
102
+ }
84
103
}
85
104
86
105
private void onFutureFailure (Channel channel , Throwable cause ) {
0 commit comments