@@ -129,12 +129,12 @@ void openAsync(final Request request,
129
129
port = getPort (scheme , uri .getPort ());
130
130
}
131
131
132
- final String partitionId = getPartitionId (request , proxy );
132
+ final String partitionId = getPartitionId (request . getInetAddress (), request , proxy );
133
133
Endpoint endpoint = endpointMap .get (partitionId );
134
134
if (endpoint == null ) {
135
135
final boolean isSecure = Utils .isSecure (scheme );
136
136
endpoint = new AhcEndpoint (partitionId ,
137
- isSecure , host , port , request .getLocalAddress (),
137
+ isSecure , request . getInetAddress (), host , port , request .getLocalAddress (),
138
138
defaultConnectionHandler );
139
139
140
140
endpointMap .put (partitionId , endpoint );
@@ -164,11 +164,11 @@ Connection openSync(final Request request)
164
164
165
165
final boolean isSecure = Utils .isSecure (scheme );
166
166
167
- final String partitionId = getPartitionId (request , proxy );
167
+ final String partitionId = getPartitionId (request . getInetAddress (), request , proxy );
168
168
Endpoint endpoint = endpointMap .get (partitionId );
169
169
if (endpoint == null ) {
170
170
endpoint = new AhcEndpoint (partitionId ,
171
- isSecure , host , port , request .getLocalAddress (),
171
+ isSecure , request . getInetAddress (), host , port , request .getLocalAddress (),
172
172
defaultConnectionHandler );
173
173
174
174
endpointMap .put (partitionId , endpoint );
@@ -219,10 +219,11 @@ static boolean isKeepAlive(final Connection connection) {
219
219
return !IS_NOT_KEEP_ALIVE .isSet (connection );
220
220
}
221
221
222
- private static String getPartitionId (Request request ,
222
+ private static String getPartitionId (InetAddress overrideAddress , Request request ,
223
223
ProxyServer proxyServer ) {
224
- return request .getConnectionPoolPartitioning ()
225
- .getPartitionKey (request .getUri (), proxyServer ).toString ();
224
+ return (overrideAddress != null ? overrideAddress .toString () + "_" : "" ) +
225
+ request .getConnectionPoolPartitioning ()
226
+ .getPartitionKey (request .getUri (), proxyServer ).toString ();
226
227
}
227
228
228
229
private static int getPort (final String scheme , final int p ) {
@@ -244,19 +245,21 @@ private class AhcEndpoint extends Endpoint<SocketAddress> {
244
245
245
246
private final String partitionId ;
246
247
private final boolean isSecure ;
248
+ private final InetAddress remoteOverrideAddress ;
247
249
private final String host ;
248
250
private final int port ;
249
251
private final InetAddress localAddress ;
250
252
private final ConnectorHandler <SocketAddress > connectorHandler ;
251
253
252
254
private AhcEndpoint (final String partitionId ,
253
255
final boolean isSecure ,
254
- final String host , final int port ,
256
+ final InetAddress remoteOverrideAddress , final String host , final int port ,
255
257
final InetAddress localAddress ,
256
258
final ConnectorHandler <SocketAddress > connectorHandler ) {
257
259
258
260
this .partitionId = partitionId ;
259
261
this .isSecure = isSecure ;
262
+ this .remoteOverrideAddress = remoteOverrideAddress ;
260
263
this .host = host ;
261
264
this .port = port ;
262
265
this .localAddress = localAddress ;
@@ -275,12 +278,19 @@ public Object getId() {
275
278
@ Override
276
279
public GrizzlyFuture <Connection > connect () {
277
280
return (GrizzlyFuture <Connection >) connectorHandler .connect (
278
- new InetSocketAddress ( host , port ),
281
+ buildRemoteSocketAddress ( ),
279
282
localAddress != null
280
283
? new InetSocketAddress (localAddress , 0 )
281
284
: null );
282
285
}
283
286
287
+ private InetSocketAddress buildRemoteSocketAddress ()
288
+ {
289
+ return remoteOverrideAddress != null
290
+ ? new InetSocketAddress (remoteOverrideAddress , port )
291
+ : new InetSocketAddress (host , port );
292
+ }
293
+
284
294
@ Override
285
295
protected void onConnect (final Connection connection ,
286
296
final SingleEndpointPool <SocketAddress > pool ) {
0 commit comments