Skip to content

Commit 5b1e179

Browse files
committed
DATAES-870 - Polishing.
Simplify single-node flow.
1 parent 6332534 commit 5b1e179

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.data.elasticsearch.client.reactive;
1717

18-
import org.springframework.http.HttpHeaders;
1918
import reactor.core.publisher.Mono;
2019

2120
import java.net.InetSocketAddress;
@@ -25,6 +24,7 @@
2524
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
2625
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
2726
import org.springframework.data.elasticsearch.client.NoReachableHostException;
27+
import org.springframework.http.HttpHeaders;
2828
import org.springframework.web.reactive.function.client.WebClient;
2929

3030
/**
@@ -60,20 +60,20 @@ public Mono<ClusterInformation> clusterInfo() {
6060
.head().uri("/")
6161
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
6262
.exchange() //
63-
.flatMap(it -> {
63+
.map(it -> {
6464
if (it.statusCode().isError()) {
6565
state = ElasticsearchHost.offline(endpoint);
6666
} else {
6767
state = ElasticsearchHost.online(endpoint);
6868
}
69-
return Mono.just(state);
69+
return state;
7070
}).onErrorResume(throwable -> {
7171

7272
state = ElasticsearchHost.offline(endpoint);
7373
clientProvider.getErrorListener().accept(throwable);
7474
return Mono.just(state);
7575
}) //
76-
.flatMap(it -> Mono.just(new ClusterInformation(Collections.singleton(it))));
76+
.map(it -> new ClusterInformation(Collections.singleton(it)));
7777
}
7878

7979
/*
@@ -96,14 +96,16 @@ public Mono<InetSocketAddress> lookupActiveHost(Verification verification) {
9696
return Mono.just(endpoint);
9797
}
9898

99-
return clusterInfo().flatMap(it -> {
99+
return clusterInfo().handle((information, sink) -> {
100100

101-
ElasticsearchHost host = it.getNodes().iterator().next();
101+
ElasticsearchHost host = information.getNodes().iterator().next();
102102
if (host.isOnline()) {
103-
return Mono.just(host.getEndpoint());
103+
104+
sink.next(host.getEndpoint());
105+
return;
104106
}
105107

106-
return Mono.error(() -> new NoReachableHostException(Collections.singleton(host)));
108+
sink.error(new NoReachableHostException(Collections.singleton(host)));
107109
});
108110
}
109111

0 commit comments

Comments
 (0)