Skip to content

Commit 011d2d5

Browse files
committed
DATAES-870 - Consume response body to release connection directly.
We now ensure that response bodies from ClientResponse get released as part of our result handling. This is to prevent cancel signals issuing the connection release so that the connection release can be synchronized (awaited) before any subsequent requests get issued. Connection release should be part of the Framework but the fallback interferes with Reactor Netty's HttpClient therefore we're ensuring proper resource disposal.
1 parent 5b1e179 commit 011d2d5

File tree

6 files changed

+26
-16
lines changed

6 files changed

+26
-16
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,7 @@ private static WebClientProvider getWebClientProvider(ClientConfiguration client
240240
Duration connectTimeout = clientConfiguration.getConnectTimeout();
241241
Duration soTimeout = clientConfiguration.getSocketTimeout();
242242

243-
// DATAES-870: previously: HttpClient httpClient = HttpClient.create();
244-
// disable pooling of connections (see https://github.com/spring-projects/spring-framework/issues/22464)
245-
// otherwise we get errors: "Connection prematurely closed BEFORE response; nested exception is
246-
// java.lang.RuntimeException: Connection prematurely closed BEFORE response"
247-
HttpClient httpClient = HttpClient.newConnection().compress(true);
243+
HttpClient httpClient = HttpClient.create().compress(true);
248244

249245
if (!connectTimeout.isNegative()) {
250246
httpClient = httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis()));
@@ -305,7 +301,7 @@ private static WebClientProvider getWebClientProvider(ClientConfiguration client
305301
public Mono<Boolean> ping(HttpHeaders headers) {
306302

307303
return sendRequest(new MainRequest(), requestCreator.ping(), RawActionResponse.class, headers) //
308-
.map(response -> response.statusCode().is2xxSuccessful()) //
304+
.flatMap(response -> response.releaseBody().thenReturn(response.statusCode().is2xxSuccessful())) //
309305
.onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
310306
}
311307

@@ -355,7 +351,7 @@ public Flux<GetResult> multiGet(HttpHeaders headers, MultiGetRequest multiGetReq
355351
public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
356352

357353
return sendRequest(getRequest, requestCreator.exists(), RawActionResponse.class, headers) //
358-
.map(response -> response.statusCode().is2xxSuccessful()) //
354+
.flatMap(response -> response.releaseBody().thenReturn(response.statusCode().is2xxSuccessful())) //
359355
.next();
360356
}
361357

@@ -555,7 +551,7 @@ public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
555551
public Mono<Boolean> existsIndex(HttpHeaders headers, GetIndexRequest request) {
556552

557553
return sendRequest(request, requestCreator.indexExists(), RawActionResponse.class, headers) //
558-
.map(response -> response.statusCode().is2xxSuccessful()) //
554+
.flatMap(response -> response.releaseBody().thenReturn(response.statusCode().is2xxSuccessful())) //
559555
.next();
560556
}
561557

src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.data.elasticsearch.client.reactive;
1717

18-
import org.springframework.http.HttpHeaders;
1918
import reactor.core.publisher.Flux;
2019
import reactor.core.publisher.Mono;
2120
import reactor.util.function.Tuple2;
@@ -33,6 +32,7 @@
3332
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
3433
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
3534
import org.springframework.data.elasticsearch.client.NoReachableHostException;
35+
import org.springframework.http.HttpHeaders;
3636
import org.springframework.lang.Nullable;
3737
import org.springframework.web.reactive.function.client.ClientResponse;
3838
import org.springframework.web.reactive.function.client.WebClient;
@@ -121,15 +121,15 @@ private Mono<InetSocketAddress> findActiveForSate(State state) {
121121
.map(ElasticsearchHost::getEndpoint).next();
122122
}
123123

124-
private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, ClientResponse> tuple2) {
124+
private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, State> tuple2) {
125125

126-
State state = tuple2.getT2().statusCode().isError() ? State.OFFLINE : State.ONLINE;
126+
State state = tuple2.getT2();
127127
ElasticsearchHost elasticsearchHost = new ElasticsearchHost(tuple2.getT1(), state);
128128
hosts.put(tuple2.getT1(), elasticsearchHost);
129129
return elasticsearchHost;
130130
}
131131

132-
private Flux<Tuple2<InetSocketAddress, ClientResponse>> nodes(@Nullable State state) {
132+
private Flux<Tuple2<InetSocketAddress, State>> nodes(@Nullable State state) {
133133

134134
return Flux.fromIterable(hosts()) //
135135
.filter(entry -> state == null || entry.getState().equals(state)) //
@@ -144,7 +144,8 @@ private Flux<Tuple2<InetSocketAddress, ClientResponse>> nodes(@Nullable State st
144144
clientProvider.getErrorListener().accept(throwable);
145145
});
146146

147-
return Mono.just(host).zipWith(exchange);
147+
return Mono.just(host).zipWith(exchange
148+
.flatMap(it -> it.releaseBody().thenReturn(it.statusCode().isError() ? State.OFFLINE : State.ONLINE)));
148149
}) //
149150
.onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
150151
}

src/main/java/org/springframework/data/elasticsearch/client/reactive/RawActionResponse.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
*/
1616
package org.springframework.data.elasticsearch.client.reactive;
1717

18-
import org.elasticsearch.common.io.stream.StreamOutput;
18+
import reactor.core.publisher.Mono;
1919

2020
import java.io.IOException;
2121

2222
import org.elasticsearch.action.ActionResponse;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
2324

2425
import org.springframework.http.HttpStatus;
2526
import org.springframework.http.client.reactive.ClientHttpResponse;
@@ -73,4 +74,13 @@ public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
7374
@Override
7475
public void writeTo(StreamOutput out) throws IOException {
7576
}
77+
78+
/**
79+
* Ensure the response body is released to properly release the underlying connection.
80+
*
81+
* @return
82+
*/
83+
public Mono<Void> releaseBody() {
84+
return delegate.releaseBody();
85+
}
7686
}

src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@ public Mono<ClusterInformation> clusterInfo() {
6060
.head().uri("/")
6161
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
6262
.exchange() //
63-
.map(it -> {
63+
.flatMap(it -> {
6464
if (it.statusCode().isError()) {
6565
state = ElasticsearchHost.offline(endpoint);
6666
} else {
6767
state = ElasticsearchHost.online(endpoint);
6868
}
69-
return state;
69+
return it.releaseBody().thenReturn(state);
7070
}).onErrorResume(throwable -> {
7171

7272
state = ElasticsearchHost.offline(endpoint);

src/test/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProviderUnitTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,11 @@ public void triesDeadHostsIfNoActiveFound() {
117117
mock.when(HOST_2).get(requestHeadersUriSpec -> {
118118

119119
ClientResponse response1 = mock(ClientResponse.class);
120+
when(response1.releaseBody()).thenReturn(Mono.empty());
120121
Receive.error(response1);
121122

122123
ClientResponse response2 = mock(ClientResponse.class);
124+
when(response2.releaseBody()).thenReturn(Mono.empty());
123125
Receive.ok(response2);
124126

125127
when(requestHeadersUriSpec.exchange()).thenReturn(Mono.just(response1), Mono.just(response2));

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ public WebClient get(InetSocketAddress endpoint) {
243243
Mockito.when(headersUriSpec.exchange()).thenReturn(Mono.just(response));
244244
Mockito.when(bodySpy.exchange()).thenReturn(Mono.just(response));
245245
Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED);
246+
Mockito.when(response.releaseBody()).thenReturn(Mono.empty());
246247

247248
headersUriSpecMap.putIfAbsent(key, headersUriSpec);
248249
bodyUriSpecMap.putIfAbsent(key, bodySpy);

0 commit comments

Comments
 (0)