Skip to content

Commit b30f125

Browse files
committed
DATAES-893 - Adopt to changes in Project Reactor.
1 parent f19bf64 commit b30f125

File tree

1 file changed

+13
-10
lines changed

1 file changed

+13
-10
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
import io.netty.handler.ssl.JdkSslContext;
2323
import io.netty.handler.timeout.ReadTimeoutHandler;
2424
import io.netty.handler.timeout.WriteTimeoutHandler;
25-
import reactor.core.publisher.EmitterProcessor;
2625
import reactor.core.publisher.Flux;
27-
import reactor.core.publisher.FluxSink;
26+
import reactor.core.publisher.FluxIdentityProcessor;
2827
import reactor.core.publisher.Mono;
28+
import reactor.core.publisher.Processors;
29+
import reactor.core.publisher.Sinks;
2930
import reactor.netty.http.client.HttpClient;
3031
import reactor.netty.transport.ProxyProvider;
3132

@@ -106,6 +107,7 @@
106107
import org.elasticsearch.search.aggregations.Aggregation;
107108
import org.elasticsearch.search.suggest.Suggest;
108109
import org.reactivestreams.Publisher;
110+
109111
import org.springframework.data.elasticsearch.client.ClientConfiguration;
110112
import org.springframework.data.elasticsearch.client.ClientLogger;
111113
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
@@ -465,12 +467,10 @@ public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest)
465467
searchRequest.scroll(scrollTimeout);
466468
}
467469

468-
EmitterProcessor<ActionRequest> outbound = EmitterProcessor.create(false);
469-
FluxSink<ActionRequest> request = outbound.sink();
470-
471-
EmitterProcessor<SearchResponse> inbound = EmitterProcessor.create(false);
470+
Sinks.StandaloneFluxSink<ActionRequest> request = Sinks.unicast();
471+
FluxIdentityProcessor<SearchResponse> inbound = Processors.unicast();
472472

473-
Flux<SearchResponse> exchange = outbound.startWith(searchRequest).flatMap(it -> {
473+
Flux<SearchResponse> exchange = request.asFlux().flatMap(it -> {
474474

475475
if (it instanceof SearchRequest) {
476476
return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers);
@@ -495,7 +495,7 @@ public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest)
495495
if (isEmpty(searchResponse.getHits())) {
496496

497497
inbound.onComplete();
498-
outbound.onComplete();
498+
request.complete();
499499

500500
} else {
501501

@@ -509,10 +509,13 @@ public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest)
509509
}).map(SearchResponse::getHits) //
510510
.flatMap(Flux::fromIterable);
511511

512-
return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound));
512+
return searchHits.doOnSubscribe(ignore -> {
513+
exchange.subscribe(inbound);
514+
request.next(searchRequest);
515+
});
513516

514517
}, state -> cleanupScroll(headers, state), //
515-
state -> cleanupScroll(headers, state), //
518+
(state, ex) -> cleanupScroll(headers, state), //
516519
state -> cleanupScroll(headers, state)); //
517520
}
518521

0 commit comments

Comments
 (0)