Skip to content

Commit e605cad

Browse files
authored
DATAES-767 - Fix ReactiveElasticsearch handling of 4xx HTTP responses.
Original PR: spring-projects#445
1 parent 07ee01f commit e605cad

File tree

7 files changed

+156
-48
lines changed

7 files changed

+156
-48
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import javax.net.ssl.SSLContext;
5050

5151
import org.apache.http.util.EntityUtils;
52+
import org.elasticsearch.ElasticsearchException;
5253
import org.elasticsearch.ElasticsearchStatusException;
5354
import org.elasticsearch.action.ActionRequest;
5455
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@@ -115,6 +116,7 @@
115116
import org.springframework.util.ObjectUtils;
116117
import org.springframework.util.ReflectionUtils;
117118
import org.springframework.util.StringUtils;
119+
import org.springframework.web.client.HttpClientErrorException;
118120
import org.springframework.web.client.HttpServerErrorException;
119121
import org.springframework.web.reactive.function.BodyExtractors;
120122
import org.springframework.web.reactive.function.client.ClientRequest;
@@ -764,6 +766,12 @@ private <T> Publisher<? extends T> readResponseBody(String logId, Request reques
764766
return handleServerError(request, response);
765767
}
766768

769+
if (response.statusCode().is4xxClientError()) {
770+
771+
ClientLogger.logRawResponse(logId, response.statusCode());
772+
return handleClientError(logId, request, response, responseType);
773+
}
774+
767775
return response.body(BodyExtractors.toMono(byte[].class)) //
768776
.map(it -> new String(it, StandardCharsets.UTF_8)) //
769777
.doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
@@ -800,13 +808,68 @@ private static XContentParser createParser(String mediaType, String content) thr
800808
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
801809
}
802810

803-
private static <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {
811+
private <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {
804812

805813
return Mono.error(
806814
new HttpServerErrorException(response.statusCode(), String.format("%s request to %s returned error code %s.",
807815
request.getMethod(), request.getEndpoint(), response.statusCode().value())));
808816
}
809817

818+
private <T> Publisher<? extends T> handleClientError(String logId, Request request, ClientResponse response,
819+
Class<T> responseType) {
820+
821+
return response.body(BodyExtractors.toMono(byte[].class)) //
822+
.map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
823+
.flatMap(content -> {
824+
String mediaType = response.headers().contentType().map(MediaType::toString)
825+
.orElse(XContentType.JSON.mediaType());
826+
try {
827+
ElasticsearchException exception = getElasticsearchException(response, content, mediaType);
828+
if (exception != null) {
829+
StringBuilder sb = new StringBuilder();
830+
buildExceptionMessages(sb, exception);
831+
return Mono.error(new HttpClientErrorException(response.statusCode(), sb.toString()));
832+
}
833+
} catch (Exception e) {
834+
return Mono
835+
.error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value())));
836+
}
837+
return Mono.just(content);
838+
})
839+
.doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
840+
.flatMap(content -> doDecode(response, responseType, content));
841+
}
842+
843+
// region ElasticsearchException helper
844+
@Nullable
845+
private ElasticsearchException getElasticsearchException(ClientResponse response, String content, String mediaType)
846+
throws IOException {
847+
848+
XContentParser parser = createParser(mediaType, content);
849+
// we have a JSON object with an error and a status field
850+
XContentParser.Token token = parser.nextToken(); // Skip START_OBJECT
851+
852+
do {
853+
token = parser.nextToken();
854+
855+
if (parser.currentName().equals("error")) {
856+
return ElasticsearchException.failureFromXContent(parser);
857+
}
858+
} while (token == XContentParser.Token.FIELD_NAME);
859+
return null;
860+
}
861+
862+
private static void buildExceptionMessages(StringBuilder sb, Throwable t) {
863+
864+
sb.append(t.getMessage());
865+
for (Throwable throwable : t.getSuppressed()) {
866+
sb.append(", ");
867+
buildExceptionMessages(sb, throwable);
868+
}
869+
}
870+
// endregion
871+
872+
// region internal classes
810873
/**
811874
* Reactive client {@link ReactiveElasticsearchClient.Status} implementation.
812875
*
@@ -867,4 +930,5 @@ void updateScrollId(String scrollId) {
867930
}
868931
}
869932
}
933+
// endregion
870934
}

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchExceptionTranslator.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.util.CollectionUtils;
3535
import org.springframework.util.ObjectUtils;
3636
import org.springframework.util.StringUtils;
37+
import org.springframework.web.client.HttpClientErrorException;
3738

3839
/**
3940
* @author Christoph Strobl
@@ -63,6 +64,15 @@ public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
6364
return new UncategorizedElasticsearchException(ex.getMessage(), ex);
6465
}
6566

67+
if (ex instanceof HttpClientErrorException) {
68+
HttpClientErrorException httpClientErrorException = (HttpClientErrorException) ex;
69+
70+
if (isSeqNoConflict(httpClientErrorException)) {
71+
return new OptimisticLockingFailureException("Cannot index a document due to seq_no+primary_term conflict",
72+
httpClientErrorException);
73+
}
74+
}
75+
6676
if (ex instanceof ValidationException) {
6777
return new DataIntegrityViolationException(ex.getMessage(), ex);
6878
}
@@ -75,7 +85,7 @@ public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
7585
return null;
7686
}
7787

78-
private boolean isSeqNoConflict(ElasticsearchException exception) {
88+
private boolean isSeqNoConflict(Exception exception) {
7989

8090
if (exception instanceof ElasticsearchStatusException) {
8191
ElasticsearchStatusException statusException = (ElasticsearchStatusException) exception;
@@ -90,6 +100,13 @@ private boolean isSeqNoConflict(ElasticsearchException exception) {
90100
&& versionConflictEngineException.getMessage().contains("version conflict, required seqNo");
91101
}
92102

103+
if (exception instanceof HttpClientErrorException) {
104+
HttpClientErrorException httpClientErrorException = (HttpClientErrorException) exception;
105+
106+
return httpClientErrorException.getMessage() != null
107+
&& httpClientErrorException.getMessage().contains("version conflict, required seqNo");
108+
109+
}
93110
return false;
94111
}
95112

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.assertj.core.api.Assertions.*;
1919

2020
import lombok.SneakyThrows;
21+
import org.springframework.web.client.HttpClientErrorException;
2122
import reactor.test.StepVerifier;
2223

2324
import java.io.IOException;
@@ -147,7 +148,7 @@ public void getOnNonExistingIndexShouldThrowException() {
147148

148149
client.get(new GetRequest(INDEX_I, "nonono")) //
149150
.as(StepVerifier::create) //
150-
.expectError(ElasticsearchStatusException.class) //
151+
.expectError(HttpClientErrorException.class) //
151152
.verify();
152153
}
153154

@@ -304,7 +305,7 @@ public void indexShouldErrorForExistingDocuments() {
304305

305306
client.index(request) //
306307
.as(StepVerifier::create) //
307-
.verifyError(ElasticsearchStatusException.class);
308+
.verifyError(HttpClientErrorException.class);
308309
}
309310

310311
@Test // DATAES-488
@@ -353,7 +354,7 @@ public void updateShouldErrorNonExistingDocumentWhenNotUpserted() {
353354

354355
client.update(request) //
355356
.as(StepVerifier::create) //
356-
.verifyError(ElasticsearchStatusException.class);
357+
.verifyError(HttpClientErrorException.class);
357358
}
358359

359360
@Test // DATAES-488
@@ -514,7 +515,7 @@ public void createExistingIndexErrors() throws IOException {
514515

515516
client.indices().createIndex(request -> request.index(INDEX_I)) //
516517
.as(StepVerifier::create) //
517-
.verifyError(ElasticsearchStatusException.class);
518+
.verifyError(HttpClientErrorException.class);
518519
}
519520

520521
@Test // DATAES-569
@@ -529,12 +530,12 @@ public void deleteExistingIndex() throws IOException {
529530
assertThat(syncClient.indices().exists(new GetIndexRequest(INDEX_I), RequestOptions.DEFAULT)).isFalse();
530531
}
531532

532-
@Test // DATAES-569
533+
@Test // DATAES-569, DATAES-767
533534
public void deleteNonExistingIndexErrors() {
534535

535536
client.indices().deleteIndex(request -> request.indices(INDEX_I)) //
536537
.as(StepVerifier::create) //
537-
.verifyError(ElasticsearchStatusException.class);
538+
.verifyError(HttpClientErrorException.class);
538539
}
539540

540541
@Test // DATAES-569
@@ -547,12 +548,12 @@ public void openExistingIndex() throws IOException {
547548
.verifyComplete();
548549
}
549550

550-
@Test // DATAES-569
551+
@Test // DATAES-569, DATAES-767
551552
public void openNonExistingIndex() {
552553

553554
client.indices().openIndex(request -> request.indices(INDEX_I)) //
554555
.as(StepVerifier::create) //
555-
.verifyError(ElasticsearchStatusException.class);
556+
.verifyError(HttpClientErrorException.class);
556557
}
557558

558559
@Test // DATAES-569
@@ -565,12 +566,12 @@ public void closeExistingIndex() throws IOException {
565566
.verifyComplete();
566567
}
567568

568-
@Test // DATAES-569
569+
@Test // DATAES-569, DATAES-767
569570
public void closeNonExistingIndex() {
570571

571572
client.indices().closeIndex(request -> request.indices(INDEX_I)) //
572573
.as(StepVerifier::create) //
573-
.verifyError(ElasticsearchStatusException.class);
574+
.verifyError(HttpClientErrorException.class);
574575
}
575576

576577
@Test // DATAES-569
@@ -583,12 +584,12 @@ public void refreshIndex() throws IOException {
583584
.verifyComplete();
584585
}
585586

586-
@Test // DATAES-569
587+
@Test // DATAES-569, DATAES-767
587588
public void refreshNonExistingIndex() {
588589

589590
client.indices().refreshIndex(request -> request.indices(INDEX_I)) //
590591
.as(StepVerifier::create) //
591-
.verifyError(ElasticsearchStatusException.class);
592+
.verifyError(HttpClientErrorException.class);
592593
}
593594

594595
@Test // DATAES-569
@@ -604,15 +605,15 @@ public void updateMapping() throws IOException {
604605
.verifyComplete();
605606
}
606607

607-
@Test // DATAES-569
608+
@Test // DATAES-569, DATAES-767
608609
public void updateMappingNonExistingIndex() {
609610

610611
Map<String, Object> jsonMap = Collections.singletonMap("properties",
611612
Collections.singletonMap("message", Collections.singletonMap("type", "text")));
612613

613614
client.indices().updateMapping(request -> request.indices(INDEX_I).type(TYPE_I).source(jsonMap)) //
614615
.as(StepVerifier::create) //
615-
.verifyError(ElasticsearchStatusException.class);
616+
.verifyError(HttpClientErrorException.class);
616617
}
617618

618619
@Test // DATAES-569
@@ -625,12 +626,12 @@ public void flushIndex() throws IOException {
625626
.verifyComplete();
626627
}
627628

628-
@Test // DATAES-569
629+
@Test // DATAES-569, DATAES-767
629630
public void flushNonExistingIndex() {
630631

631632
client.indices().flushIndex(request -> request.indices(INDEX_I)) //
632633
.as(StepVerifier::create) //
633-
.verifyError(ElasticsearchStatusException.class);
634+
.verifyError(HttpClientErrorException.class);
634635
}
635636

636637
@Test // DATAES-684

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Mockito.*;
2020
import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*;
2121

22+
import org.springframework.web.client.HttpClientErrorException;
2223
import reactor.core.publisher.Mono;
2324
import reactor.test.StepVerifier;
2425

@@ -459,15 +460,15 @@ public void updateShouldEmitResponseCorrectly() {
459460
.verifyComplete();
460461
}
461462

462-
@Test // DATAES-488
463+
@Test // DATAES-488, DATAES-767
463464
public void updateShouldEmitErrorWhenNotFound() {
464465

465466
hostProvider.when(HOST) //
466467
.updateFail();
467468

468469
client.update(new UpdateRequest("twitter", "doc", "1").doc(Collections.singletonMap("user", "cstrobl")))
469470
.as(StepVerifier::create) //
470-
.expectError(ElasticsearchStatusException.class) //
471+
.expectError(HttpClientErrorException.class) //
471472
.verify();
472473
}
473474

src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchExceptionTranslatorTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
import org.junit.jupiter.api.Test;
2525
import org.springframework.dao.DataAccessException;
2626
import org.springframework.dao.OptimisticLockingFailureException;
27+
import org.springframework.http.HttpStatus;
28+
import org.springframework.web.client.HttpClientErrorException;
2729

2830
/**
2931
* @author Roman Puchkovskiy
32+
* @author Peter-Josef Meisch
3033
*/
3134
class ElasticsearchExceptionTranslatorTests {
3235
private final ElasticsearchExceptionTranslator translator = new ElasticsearchExceptionTranslator();
@@ -56,4 +59,16 @@ void shouldConvertVersionConflictEngineExceptionWithSeqNoConflictToOptimisticLoc
5659
assertThat(translated.getMessage()).startsWith("Cannot index a document due to seq_no+primary_term conflict");
5760
assertThat(translated.getCause()).isSameAs(ex);
5861
}
62+
63+
@Test // DATAES-767
64+
void shouldConvertHttpClientErrorExceptionWithSeqNoConflictToOptimisticLockingFailureException() {
65+
HttpClientErrorException ex = new HttpClientErrorException(HttpStatus.BAD_REQUEST,
66+
"Elasticsearch exception [type=version_conflict_engine_exception, reason=[WPUUsXEB6uuA6j8_A7AB]: version conflict, required seqNo [34], primary term [16]. current document has seqNo [35] and primary term [16]]");
67+
68+
DataAccessException translated = translator.translateExceptionIfPossible(ex);
69+
70+
assertThat(translated).isInstanceOf(OptimisticLockingFailureException.class);
71+
assertThat(translated.getMessage()).startsWith("Cannot index a document due to seq_no+primary_term conflict");
72+
assertThat(translated.getCause()).isSameAs(ex);
73+
}
5974
}

0 commit comments

Comments
 (0)