Skip to content

Commit 2e709a7

Browse files
christophstroblmp911de
authored andcommitted
DATAES-569 - Add index operations to reactive client.
Original pull request: spring-projects#277.
1 parent 807cfe7 commit 2e709a7

File tree

5 files changed

+793
-19
lines changed

5 files changed

+793
-19
lines changed

src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public static void logRequest(String logId, String method, String endpoint, Obje
9393
public static void logRawResponse(String logId, HttpStatus statusCode) {
9494

9595
if (isEnabled()) {
96-
WIRE_LOGGER.trace("[{}] Received raw response: ", logId, statusCode);
96+
WIRE_LOGGER.trace("[{}] Received raw response: {}", logId, statusCode);
9797
}
9898
}
9999

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 148 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@
4848
import org.elasticsearch.ElasticsearchStatusException;
4949
import org.elasticsearch.action.ActionRequest;
5050
import org.elasticsearch.action.ActionResponse;
51+
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
52+
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
53+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
54+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
55+
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
56+
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
57+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
58+
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
59+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
60+
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
5161
import org.elasticsearch.action.delete.DeleteRequest;
5262
import org.elasticsearch.action.delete.DeleteResponse;
5363
import org.elasticsearch.action.get.GetRequest;
@@ -63,6 +73,7 @@
6373
import org.elasticsearch.action.search.SearchRequest;
6474
import org.elasticsearch.action.search.SearchResponse;
6575
import org.elasticsearch.action.search.SearchScrollRequest;
76+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
6677
import org.elasticsearch.action.update.UpdateRequest;
6778
import org.elasticsearch.action.update.UpdateResponse;
6879
import org.elasticsearch.client.Request;
@@ -80,13 +91,13 @@
8091
import org.elasticsearch.search.SearchHit;
8192
import org.elasticsearch.search.SearchHits;
8293
import org.reactivestreams.Publisher;
83-
8494
import org.springframework.data.elasticsearch.ElasticsearchException;
8595
import org.springframework.data.elasticsearch.client.ClientConfiguration;
8696
import org.springframework.data.elasticsearch.client.ClientLogger;
8797
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
8898
import org.springframework.data.elasticsearch.client.NoReachableHostException;
8999
import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification;
100+
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
90101
import org.springframework.data.elasticsearch.client.util.RequestConverters;
91102
import org.springframework.data.util.Lazy;
92103
import org.springframework.http.HttpHeaders;
@@ -115,7 +126,7 @@
115126
* @see ClientConfiguration
116127
* @see ReactiveRestClients
117128
*/
118-
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient {
129+
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices {
119130

120131
private final HostProvider hostProvider;
121132

@@ -279,6 +290,15 @@ public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest)
279290
return sendRequest(indexRequest, RequestCreator.index(), IndexResponse.class, headers).publishNext();
280291
}
281292

293+
/*
294+
* (non-Javadoc)
295+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices()
296+
*/
297+
@Override
298+
public Indices indices() {
299+
return this;
300+
}
301+
282302
/*
283303
* (non-Javadoc)
284304
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.update.UpdateRequest)
@@ -403,6 +423,97 @@ public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryReq
403423
.publishNext();
404424
}
405425

426+
// --> INDICES
427+
428+
/*
429+
* (non-Javadoc)
430+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#existsIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.get.GetIndexRequest)
431+
*/
432+
@Override
433+
public Mono<Boolean> existsIndex(HttpHeaders headers, GetIndexRequest request) {
434+
435+
return sendRequest(request, RequestCreator.indexExists(), RawActionResponse.class, headers) //
436+
.map(response -> response.statusCode().is2xxSuccessful()) //
437+
.next();
438+
}
439+
440+
/*
441+
* (non-Javadoc)
442+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#deleteIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest)
443+
*/
444+
@Override
445+
public Mono<Void> deleteIndex(HttpHeaders headers, DeleteIndexRequest request) {
446+
447+
return sendRequest(request, RequestCreator.indexDelete(), AcknowledgedResponse.class, headers) //
448+
.then();
449+
}
450+
451+
/*
452+
* (non-Javadoc)
453+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#createIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.create.CreateIndexRequest)
454+
*/
455+
@Override
456+
public Mono<Void> createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest) {
457+
458+
return sendRequest(createIndexRequest, RequestCreator.indexCreate(), AcknowledgedResponse.class, headers) //
459+
.then();
460+
}
461+
462+
/*
463+
* (non-Javadoc)
464+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#openIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.open.OpenIndexRequest)
465+
*/
466+
@Override
467+
public Mono<Void> openIndex(HttpHeaders headers, OpenIndexRequest request) {
468+
469+
return sendRequest(request, RequestCreator.indexOpen(), AcknowledgedResponse.class, headers) //
470+
.then();
471+
}
472+
473+
/*
474+
* (non-Javadoc)
475+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#closeIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.close.CloseIndexRequest)
476+
*/
477+
@Override
478+
public Mono<Void> closeIndex(HttpHeaders headers, CloseIndexRequest closeIndexRequest) {
479+
480+
return sendRequest(closeIndexRequest, RequestCreator.indexClose(), AcknowledgedResponse.class, headers) //
481+
.then();
482+
}
483+
484+
/*
485+
* (non-Javadoc)
486+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#refreshIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.refresh.RefreshRequest)
487+
*/
488+
@Override
489+
public Mono<Void> refreshIndex(HttpHeaders headers, RefreshRequest refreshRequest) {
490+
491+
return sendRequest(refreshRequest, RequestCreator.indexRefresh(), RefreshResponse.class, headers) //
492+
.then();
493+
}
494+
495+
/*
496+
* (non-Javadoc)
497+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#updateMapping(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest)
498+
*/
499+
@Override
500+
public Mono<Void> updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
501+
502+
return sendRequest(putMappingRequest, RequestCreator.putMapping(), AcknowledgedResponse.class, headers) //
503+
.then();
504+
}
505+
506+
/*
507+
* (non-Javadoc)
508+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#flushIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.flush.FlushRequest)
509+
*/
510+
@Override
511+
public Mono<Void> flushIndex(HttpHeaders headers, FlushRequest flushRequest) {
512+
513+
return sendRequest(flushRequest, RequestCreator.flushIndex(), FlushResponse.class, headers) //
514+
.then();
515+
}
516+
406517
/*
407518
* (non-Javadoc)
408519
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback)
@@ -630,6 +741,41 @@ static Function<DeleteByQueryRequest, Request> deleteByQuery() {
630741
}
631742
};
632743
}
744+
745+
// --> INDICES
746+
747+
static Function<GetIndexRequest, Request> indexExists() {
748+
return RequestConverters::indexExists;
749+
}
750+
751+
static Function<DeleteIndexRequest, Request> indexDelete() {
752+
return RequestConverters::indexDelete;
753+
}
754+
755+
static Function<CreateIndexRequest, Request> indexCreate() {
756+
return RequestConverters::indexCreate;
757+
}
758+
759+
static Function<OpenIndexRequest, Request> indexOpen() {
760+
return RequestConverters::indexOpen;
761+
}
762+
763+
static Function<CloseIndexRequest, Request> indexClose() {
764+
return RequestConverters::indexClose;
765+
}
766+
767+
static Function<RefreshRequest, Request> indexRefresh() {
768+
return RequestConverters::indexRefresh;
769+
}
770+
771+
static Function<PutMappingRequest, Request> putMapping() {
772+
return RequestConverters::putMapping;
773+
}
774+
775+
static Function<FlushRequest, Request> flushIndex() {
776+
return RequestConverters::flushIndex;
777+
}
778+
633779
}
634780

635781
/**

0 commit comments

Comments
 (0)