Skip to content

Commit a39c340

Browse files
DATAES-488 - Polishing & Documentation.
Rename VerificationMode -> Verification. Reorder methods in ReactiveElasticsearchClient, add test for DefaultWebClientProvider. Enforce assertions and fix some overall code style issues. Add client reference documentation section.
1 parent 390d7e8 commit a39c340

17 files changed

+356
-190
lines changed

src/main/asciidoc/index.adoc

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
1-
= Spring Data Elasticsearch
2-
BioMed Central Development Team
1+
= Spring Data Elasticsearch - Reference Documentation
2+
BioMed Central Development Team; Oliver Drotbohm; Greg Turnquist; Christoph Strobl;
33
:revnumber: {version}
44
:revdate: {localdate}
5+
:toc:
6+
:toc-placement!:
7+
:linkcss:
8+
:doctype: book
9+
:docinfo: shared
10+
:source-highlighter: prettify
11+
:icons: font
12+
:imagesdir: images
13+
ifdef::backend-epub3[:front-cover-image: image:epub-cover.png[Front Cover,1050,1600]]
514
:spring-data-commons-docs: ../../../../spring-data-commons/src/main/asciidoc
615

7-
(C) 2013-2015 The original author(s).
16+
(C) 2013-2018 The original author(s).
817

918
NOTE: Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.
1019

@@ -19,6 +28,7 @@ include::{spring-data-commons-docs}/repositories.adoc[]
1928
= Reference Documentation
2029

2130
:leveloffset: +1
31+
include::reference/elasticsearch-clients.adoc[]
2232
include::reference/data-elasticsearch.adoc[]
2333
include::reference/elasticsearch-misc.adoc[]
2434
:leveloffset: -1
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
[[elasticsearch.clients]]
2+
= Elasticsearch Clients
3+
4+
This chapter illustrates configuration and usage of supported Elasticsearch client implementations.
5+
6+
Spring data Elasticsearch operates upon an Elasticsearch client that is connected to a single Elasticsearch node or a cluster.
7+
8+
WARNING: The well known `TransportClient` is deprecated as of Elasticsearch 7.0.0 and is expected to be removed in Elasticsearch 8.0.
9+
10+
[[elasticsearch.clients.rest]]
11+
== High Level REST Client
12+
13+
The Java High Level REST Client provides a straight forward replacement for the `TransportClient` as it accepts and returns
14+
the very same request/response objects and therefore depends on the Elasticsearch core project.
15+
Asynchronous calls are operated upon a client managed thread pool and require a callback to be notified when the request is done.
16+
17+
.High Level REST Client
18+
====
19+
[source,java]
20+
----
21+
static class Config {
22+
23+
@Bean
24+
RestHighLevelClient client() {
25+
26+
ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1>
27+
.connectedTo("localhost:9200", "localhost:9201")
28+
.build();
29+
30+
return RestClients.create(clientConfiguration).rest(); <2>
31+
}
32+
}
33+
34+
// ...
35+
36+
IndexRequest request = new IndexRequest("spring-data", "elasticsearch", randomID())
37+
.source(singletonMap("feature", "high-level-rest-client"))
38+
.setRefreshPolicy(IMMEDIATE);
39+
40+
IndexResponse response = client.index(request);
41+
----
42+
<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enbale SSL.
43+
<2> Next to the `rest()` client it is also possible to obtain the `lowLevelRest()` client.
44+
====
45+
46+
[[elasticsearch.clients.reactive]]
47+
== Reactive Client
48+
49+
The `ReactiveElasticsearchClient` is a non official driver based on `WebClient`.
50+
It uses the request/response objects provided by the Elasticsearch core project.
51+
Calls are directly operated on the reactive stack, **not** wrapping async (thread pool bound) responses into reactive types.
52+
53+
.Reactive REST Client
54+
====
55+
[source,java]
56+
----
57+
static class Config {
58+
59+
@Bean
60+
ReactiveElasticsearchClient client() {
61+
62+
ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1>
63+
.connectedTo("localhost:9200", "localhost:9291")
64+
.build();
65+
66+
return ReactiveRestClients.create(clientConfiguration);
67+
}
68+
}
69+
70+
// ...
71+
72+
Mono<IndexResponse> response = client.index(request ->
73+
74+
request.index("spring-data")
75+
.type("elasticsearch")
76+
.id(randomID())
77+
.source(singletonMap("feature", "reactive-client"))
78+
.setRefreshPolicy(IMMEDIATE);
79+
);
80+
----
81+
<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enbale SSL.
82+
====
83+

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
import org.springframework.data.elasticsearch.client.ClientConfiguration;
6565
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
6666
import org.springframework.data.elasticsearch.client.NoReachableHostException;
67-
import org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode;
67+
import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification;
6868
import org.springframework.data.elasticsearch.client.util.RequestConverters;
6969
import org.springframework.http.HttpHeaders;
7070
import org.springframework.http.HttpMethod;
@@ -153,9 +153,7 @@ private static WebClientProvider getWebClientProvider(ClientConfiguration client
153153

154154
Optional<SSLContext> sslContext = clientConfiguration.getSslContext();
155155

156-
sslContext.ifPresent(it -> {
157-
sslConfig.sslContext(new JdkSslContext(it, true, ClientAuth.NONE));
158-
});
156+
sslContext.ifPresent(it -> sslConfig.sslContext(new JdkSslContext(it, true, ClientAuth.NONE)));
159157
}));
160158
provider = WebClientProvider.create("https", connector);
161159
} else {
@@ -273,13 +271,13 @@ public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest)
273271
@Override
274272
public Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback) {
275273

276-
return this.hostProvider.getActive(VerificationMode.LAZY) //
274+
return this.hostProvider.getActive(Verification.LAZY) //
277275
.flatMap(callback::doWithClient) //
278276
.onErrorResume(throwable -> {
279277

280278
if (throwable instanceof ConnectException) {
281279

282-
return hostProvider.getActive(VerificationMode.ACTIVE) //
280+
return hostProvider.getActive(Verification.ACTIVE) //
283281
.flatMap(callback::doWithClient);
284282
}
285283

@@ -357,9 +355,7 @@ private <T> Publisher<? extends T> readResponseBody(Request request, ClientRespo
357355

358356
return response.body(BodyExtractors.toMono(byte[].class)) //
359357
.map(it -> new String(it, StandardCharsets.UTF_8)) //
360-
.flatMap(content -> {
361-
return doDecode(response, responseType, content);
362-
});
358+
.flatMap(content -> doDecode(response, responseType, content));
363359
}
364360

365361
private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseType, String content) {

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
* Default {@link WebClientProvider} that uses cached {@link WebClient} instances per {@code hostAndPort}.
3232
*
3333
* @author Mark Paluch
34+
* @author Christoph Strobl
3435
* @since 4.0
3536
*/
3637
class DefaultWebClientProvider implements WebClientProvider {
@@ -42,19 +43,32 @@ class DefaultWebClientProvider implements WebClientProvider {
4243
private final Consumer<Throwable> errorListener;
4344
private final HttpHeaders headers;
4445

46+
/**
47+
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
48+
*
49+
* @param scheme must not be {@literal null}.
50+
* @param connector can be {@literal null}.
51+
*/
4552
DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector) {
4653
this(scheme, connector, e -> {}, HttpHeaders.EMPTY);
4754
}
4855

56+
/**
57+
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
58+
*
59+
* @param scheme must not be {@literal null}.
60+
* @param connector can be {@literal null}.
61+
* @param errorListener must not be {@literal null}.
62+
* @param headers must not be {@literal null}.
63+
*/
4964
private DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector,
5065
Consumer<Throwable> errorListener, HttpHeaders headers) {
51-
this(new ConcurrentHashMap<>(), scheme, connector, errorListener, headers);
52-
}
5366

54-
private DefaultWebClientProvider(Map<InetSocketAddress, WebClient> cachedClients, String scheme,
55-
@Nullable ClientHttpConnector connector, Consumer<Throwable> errorListener, HttpHeaders headers) {
67+
Assert.notNull(scheme, "Scheme must not be null! A common scheme would be 'http'.");
68+
Assert.notNull(errorListener, "ErrorListener must not be null! You may want use a no-op one 'e -> {}' instead.");
69+
Assert.notNull(headers, "headers must not be null! Think about using 'HttpHeaders.EMPTY' as an alternative.");
5670

57-
this.cachedClients = cachedClients;
71+
this.cachedClients = new ConcurrentHashMap<>();
5872
this.scheme = scheme;
5973
this.connector = connector;
6074
this.errorListener = errorListener;
@@ -70,18 +84,7 @@ public WebClient get(InetSocketAddress endpoint) {
7084

7185
Assert.notNull(endpoint, "Endpoint must not be empty!");
7286

73-
return this.cachedClients.computeIfAbsent(endpoint, key -> {
74-
75-
Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders()));
76-
77-
if (connector != null) {
78-
builder.clientConnector(connector);
79-
}
80-
81-
String baseUrl = String.format("%s://%s:%d", this.scheme, key.getHostString(), key.getPort());
82-
return builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener))
83-
.build();
84-
});
87+
return this.cachedClients.computeIfAbsent(endpoint, this::createWebClientForSocketAddress);
8588
}
8689

8790
/*
@@ -100,7 +103,7 @@ public HttpHeaders getDefaultHeaders() {
100103
@Override
101104
public WebClientProvider withDefaultHeaders(HttpHeaders headers) {
102105

103-
Assert.notNull(headers, "HttpHeaders must not be null");
106+
Assert.notNull(headers, "HttpHeaders must not be null.");
104107

105108
HttpHeaders merged = new HttpHeaders();
106109
merged.addAll(this.headers);
@@ -125,9 +128,21 @@ public Consumer<Throwable> getErrorListener() {
125128
@Override
126129
public WebClientProvider withErrorListener(Consumer<Throwable> errorListener) {
127130

128-
Assert.notNull(errorListener, "Error listener must not be null");
131+
Assert.notNull(errorListener, "Error listener must not be null.");
129132

130133
Consumer<Throwable> listener = this.errorListener.andThen(errorListener);
131134
return new DefaultWebClientProvider(this.scheme, this.connector, listener, this.headers);
132135
}
136+
137+
protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) {
138+
139+
Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders()));
140+
141+
if (connector != null) {
142+
builder = builder.clientConnector(connector);
143+
}
144+
145+
String baseUrl = String.format("%s://%s:%d", this.scheme, socketAddress.getHostString(), socketAddress.getPort());
146+
return builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build();
147+
}
133148
}

src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,41 @@
3737
public interface HostProvider {
3838

3939
/**
40-
* Lookup an active host in {@link VerificationMode#LAZY lazy} mode utilizing cached {@link ElasticsearchHost}.
40+
* Create a new {@link HostProvider} best suited for the given {@link WebClientProvider} and number of hosts.
41+
*
42+
* @param clientProvider must not be {@literal null} .
43+
* @param endpoints must not be {@literal null} nor empty.
44+
* @return new instance of {@link HostProvider}.
45+
*/
46+
static HostProvider provider(WebClientProvider clientProvider, InetSocketAddress... endpoints) {
47+
48+
Assert.notNull(clientProvider, "WebClientProvider must not be null");
49+
Assert.notEmpty(endpoints, "Please provide at least one endpoint to connect to.");
50+
51+
if (endpoints.length == 1) {
52+
return new SingleNodeHostProvider(clientProvider, endpoints[0]);
53+
} else {
54+
return new MultiNodeHostProvider(clientProvider, endpoints);
55+
}
56+
}
57+
58+
/**
59+
* Lookup an active host in {@link Verification#LAZY lazy} mode utilizing cached {@link ElasticsearchHost}.
4160
*
4261
* @return the {@link Mono} emitting the active host or {@link Mono#error(Throwable) an error} if none found.
4362
*/
4463
default Mono<InetSocketAddress> lookupActiveHost() {
45-
return lookupActiveHost(VerificationMode.LAZY);
64+
return lookupActiveHost(Verification.LAZY);
4665
}
4766

4867
/**
49-
* Lookup an active host in using the given {@link VerificationMode}.
68+
* Lookup an active host in using the given {@link Verification}.
5069
*
51-
* @param verificationMode
70+
* @param verification
5271
* @return the {@link Mono} emitting the active host or {@link Mono#error(Throwable) an error}
5372
* ({@link NoReachableHostException}) if none found.
5473
*/
55-
Mono<InetSocketAddress> lookupActiveHost(VerificationMode verificationMode);
74+
Mono<InetSocketAddress> lookupActiveHost(Verification verification);
5675

5776
/**
5877
* Get the {@link WebClient} connecting to an active host utilizing cached {@link ElasticsearchHost}.
@@ -61,25 +80,25 @@ default Mono<InetSocketAddress> lookupActiveHost() {
6180
* found.
6281
*/
6382
default Mono<WebClient> getActive() {
64-
return getActive(VerificationMode.LAZY);
83+
return getActive(Verification.LAZY);
6584
}
6685

6786
/**
6887
* Get the {@link WebClient} connecting to an active host.
6988
*
70-
* @param verificationMode must not be {@literal null}.
89+
* @param verification must not be {@literal null}.
7190
* @return the {@link Mono} emitting the client for an active host or {@link Mono#error(Throwable) an error} if none
7291
* found.
7392
*/
74-
default Mono<WebClient> getActive(VerificationMode verificationMode) {
75-
return lookupActiveHost(verificationMode).map(this::createWebClient);
93+
default Mono<WebClient> getActive(Verification verification) {
94+
return lookupActiveHost(verification).map(this::createWebClient);
7695
}
7796

7897
/**
7998
* Creates a {@link WebClient} for {@link InetSocketAddress endpoint}.
8099
*
81-
* @param baseUrl
82-
* @return
100+
* @param endpoint must not be {@literal null}.
101+
* @return a {@link WebClient} using the the given endpoint as {@literal base url}.
83102
*/
84103
WebClient createWebClient(InetSocketAddress endpoint);
85104

@@ -91,29 +110,12 @@ default Mono<WebClient> getActive(VerificationMode verificationMode) {
91110
Mono<ClusterInformation> clusterInfo();
92111

93112
/**
94-
* Create a new {@link HostProvider} best suited for the given {@link WebClientProvider} and number of hosts.
113+
* {@link Verification} allows to influence the lookup strategy for active hosts.
95114
*
96-
* @param clientProvider must not be {@literal null} .
97-
* @param hosts must not be {@literal null} nor empty.
98-
* @return new instance of {@link HostProvider}.
99-
*/
100-
static HostProvider provider(WebClientProvider clientProvider, InetSocketAddress... endpoints) {
101-
102-
Assert.notNull(clientProvider, "WebClientProvider must not be null");
103-
Assert.notEmpty(endpoints, "Please provide at least one endpoint to connect to.");
104-
105-
if (endpoints.length == 1) {
106-
return new SingleNodeHostProvider(clientProvider, endpoints[0]);
107-
} else {
108-
return new MultiNodeHostProvider(clientProvider, endpoints);
109-
}
110-
}
111-
112-
/**
113115
* @author Christoph Strobl
114116
* @since 4.0
115117
*/
116-
enum VerificationMode {
118+
enum Verification {
117119

118120
/**
119121
* Actively check for cluster node health.
@@ -127,9 +129,9 @@ enum VerificationMode {
127129
}
128130

129131
/**
130-
* Value object accumulating information about cluster an Elasticsearch cluster.
132+
* Value object accumulating information about an Elasticsearch cluster.
131133
*
132-
* @author Christoph Strobll
134+
* @author Christoph Strobl
133135
* @since 4.0.
134136
*/
135137
class ClusterInformation {

0 commit comments

Comments
 (0)