Skip to content

Commit ce124a2

Browse files
DATAES-512 - Fix request parameters not getting added to the URI as query string parameters.
We now make sure to include request parameters in the constructed URI and add potential request option headers.
1 parent 1ea73d2 commit ce124a2

File tree

3 files changed

+107
-38
lines changed

3 files changed

+107
-38
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.nio.charset.StandardCharsets;
3333
import java.time.Duration;
3434
import java.util.Collection;
35+
import java.util.Map.Entry;
3536
import java.util.Optional;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.function.Function;
@@ -82,6 +83,7 @@
8283
import org.springframework.http.MediaType;
8384
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
8485
import org.springframework.util.Assert;
86+
import org.springframework.util.ObjectUtils;
8587
import org.springframework.util.ReflectionUtils;
8688
import org.springframework.web.client.HttpServerErrorException;
8789
import org.springframework.web.reactive.function.BodyExtractors;
@@ -360,9 +362,31 @@ private <AR extends ActionResponse> Flux<AR> sendRequest(Request request, Class<
360362
private Mono<ClientResponse> sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) {
361363

362364
RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) //
363-
.uri(request.getEndpoint(), request.getParameters()) //
365+
.uri(builder -> {
366+
367+
builder = builder.path(request.getEndpoint());
368+
369+
if (!ObjectUtils.isEmpty(request.getParameters())) {
370+
for (Entry<String, String> entry : request.getParameters().entrySet()) {
371+
builder = builder.queryParam(entry.getKey(), entry.getValue());
372+
}
373+
}
374+
return builder.build();
375+
}) //
364376
.attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) //
365-
.headers(theHeaders -> theHeaders.addAll(headers));
377+
.headers(theHeaders -> {
378+
379+
// add all the headers explicitly set
380+
theHeaders.addAll(headers);
381+
382+
// and now those that might be set on the request.
383+
if (request.getOptions() != null) {
384+
385+
if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) {
386+
request.getOptions().getHeaders().forEach(it -> theHeaders.add(it.getName(), it.getValue()));
387+
}
388+
}
389+
});
366390

367391
if (request.getEntity() != null) {
368392

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
package org.springframework.data.elasticsearch.client.reactive;
1717

1818
import static org.assertj.core.api.Assertions.*;
19-
import static org.mockito.ArgumentMatchers.*;
19+
import static org.mockito.ArgumentMatchers.any;
2020
import static org.mockito.Mockito.*;
2121
import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*;
2222

2323
import reactor.test.StepVerifier;
2424

25+
import java.net.URI;
2526
import java.util.Collections;
26-
import java.util.Map;
2727

2828
import org.elasticsearch.ElasticsearchStatusException;
2929
import org.elasticsearch.action.DocWriteResponse.Result;
@@ -33,7 +33,9 @@
3333
import org.elasticsearch.action.index.IndexRequest;
3434
import org.elasticsearch.action.search.SearchRequest;
3535
import org.elasticsearch.action.update.UpdateRequest;
36+
import org.elasticsearch.common.unit.TimeValue;
3637
import org.elasticsearch.common.xcontent.XContentType;
38+
import org.elasticsearch.index.VersionType;
3739
import org.junit.Before;
3840
import org.junit.Test;
3941
import org.reactivestreams.Publisher;
@@ -61,6 +63,29 @@ public void setUp() {
6163
client = new DefaultReactiveElasticsearchClient(hostProvider);
6264
}
6365

66+
@Test // DATAES-512
67+
public void sendRequestShouldCarryOnRequestParameters() {
68+
69+
hostProvider.when(HOST).receiveDeleteOk();
70+
71+
DeleteRequest request = new DeleteRequest("index", "type", "id");
72+
request.version(1000);
73+
request.versionType(VersionType.EXTERNAL);
74+
request.timeout(TimeValue.timeValueMinutes(10));
75+
76+
client.delete(request) //
77+
.then() //
78+
.as(StepVerifier::create) //
79+
.verifyComplete();
80+
81+
URI uri = hostProvider.when(HOST).captureUri();
82+
83+
assertThat(uri.getQuery()) //
84+
.contains("version=1000") //
85+
.contains("version_type=external") //
86+
.contains("timeout=10m");
87+
}
88+
6489
// --> PING
6590

6691
@Test
@@ -74,9 +99,8 @@ public void pingShouldHitMainEndpoint() {
7499
.as(StepVerifier::create) //
75100
.verifyComplete();
76101

77-
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
78-
verify(requestBodyUriSpec).uri(eq("/"), any(Map.class));
79-
});
102+
URI uri = hostProvider.when(HOST).captureUri();
103+
assertThat(uri.getRawPath()).isEqualTo("/");
80104
}
81105

82106
@Test // DATAES-488
@@ -116,9 +140,8 @@ public void infoShouldHitMainEndpoint() {
116140
.as(StepVerifier::create) //
117141
.verifyComplete();
118142

119-
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
120-
verify(requestBodyUriSpec).uri(eq("/"), any(Map.class));
121-
});
143+
URI uri = hostProvider.when(HOST).captureUri();
144+
assertThat(uri.getRawPath()).isEqualTo("/");
122145
}
123146

124147
@Test // DATAES-488
@@ -151,9 +174,8 @@ public void getShouldHitGetEndpoint() {
151174
.verifyComplete();
152175

153176
verify(hostProvider.client(HOST)).method(HttpMethod.GET);
154-
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
155-
verify(requestBodyUriSpec).uri(eq("/twitter/_all/1"), any(Map.class));
156-
});
177+
URI uri = hostProvider.when(HOST).captureUri();
178+
assertThat(uri.getRawPath()).isEqualTo("/twitter/_all/1");
157179
}
158180

159181
@Test // DATAES-488
@@ -204,10 +226,11 @@ public void multiGetShouldHitMGetEndpoint() {
204226
verify(hostProvider.client(HOST)).method(HttpMethod.POST);
205227

206228
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
207-
208-
verify(requestBodyUriSpec).uri(eq("/_mget"), any(Map.class));
209229
verify(requestBodyUriSpec).body(any(Publisher.class), any(Class.class));
210230
});
231+
232+
URI uri = hostProvider.when(HOST).captureUri();
233+
assertThat(uri.getRawPath()).isEqualTo("/_mget");
211234
}
212235

213236
@Test // DATAES-488
@@ -287,9 +310,8 @@ public void existsShouldHitGetEndpoint() {
287310

288311
verify(hostProvider.client(HOST)).method(HttpMethod.HEAD);
289312

290-
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
291-
verify(requestBodyUriSpec).uri(eq("/twitter/_all/1"), any(Map.class));
292-
});
313+
URI uri = hostProvider.when(HOST).captureUri();
314+
assertThat(uri.getRawPath()).isEqualTo("/twitter/_all/1");
293315
}
294316

295317
@Test // DATAES-488
@@ -329,10 +351,11 @@ public void indexNewShouldHitCreateEndpoint() {
329351

330352
verify(hostProvider.client(HOST)).method(HttpMethod.PUT);
331353
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
332-
333-
verify(requestBodyUriSpec).uri(eq("/twitter/10/_create"), any(Map.class));
334354
verify(requestBodyUriSpec).contentType(MediaType.APPLICATION_JSON);
335355
});
356+
357+
URI uri = hostProvider.when(HOST).captureUri();
358+
assertThat(uri.getRawPath()).isEqualTo("/twitter/10/_create");
336359
}
337360

338361
@Test // DATAES-488
@@ -347,10 +370,11 @@ public void indexExistingShouldHitEndpointCorrectly() {
347370

348371
verify(hostProvider.client(HOST)).method(HttpMethod.PUT);
349372
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
350-
351-
verify(requestBodyUriSpec).uri(eq("/twitter/10"), any(Map.class));
352373
verify(requestBodyUriSpec).contentType(MediaType.APPLICATION_JSON);
353374
});
375+
376+
URI uri = hostProvider.when(HOST).captureUri();
377+
assertThat(uri.getRawPath()).isEqualTo("/twitter/10");
354378
}
355379

356380
@Test // DATAES-488
@@ -401,10 +425,11 @@ public void updateShouldHitEndpointCorrectly() {
401425

402426
verify(hostProvider.client(HOST)).method(HttpMethod.POST);
403427
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
404-
405-
verify(requestBodyUriSpec).uri(eq("/twitter/doc/1/_update"), any(Map.class));
406428
verify(requestBodyUriSpec).contentType(MediaType.APPLICATION_JSON);
407429
});
430+
431+
URI uri = hostProvider.when(HOST).captureUri();
432+
assertThat(uri.getRawPath()).isEqualTo("/twitter/doc/1/_update");
408433
}
409434

410435
@Test // DATAES-488
@@ -450,9 +475,8 @@ public void deleteShouldHitEndpointCorrectly() {
450475
.verifyComplete();
451476

452477
verify(hostProvider.client(HOST)).method(HttpMethod.DELETE);
453-
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
454-
verify(requestBodyUriSpec).uri(eq("/twitter/doc/1"), any(Map.class));
455-
});
478+
URI uri = hostProvider.when(HOST).captureUri();
479+
assertThat(uri.getRawPath()).isEqualTo("/twitter/doc/1");
456480
}
457481

458482
@Test // DATAES-488
@@ -484,9 +508,8 @@ public void searchShouldHitSearchEndpoint() {
484508
client.search(new SearchRequest("twitter")).as(StepVerifier::create).verifyComplete();
485509

486510
verify(hostProvider.client(HOST)).method(HttpMethod.POST);
487-
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
488-
verify(requestBodyUriSpec).uri(eq("/twitter/_search"), any(Map.class));
489-
});
511+
URI uri = hostProvider.when(HOST).captureUri();
512+
assertThat(uri.getRawPath()).isEqualTo("/twitter/_search");
490513
}
491514

492515
@Test // DATAES-488

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,28 @@
1717

1818
import static org.mockito.Mockito.*;
1919

20+
import org.mockito.ArgumentCaptor;
21+
import org.springframework.http.HttpMethod;
22+
import org.springframework.web.util.DefaultUriBuilderFactory;
23+
import org.springframework.web.util.UriBuilder;
2024
import reactor.core.publisher.Mono;
2125

2226
import java.io.IOException;
2327
import java.net.InetSocketAddress;
28+
import java.net.URI;
2429
import java.nio.charset.StandardCharsets;
2530
import java.util.Arrays;
2631
import java.util.Collections;
2732
import java.util.LinkedHashMap;
33+
import java.util.LinkedHashSet;
2834
import java.util.List;
2935
import java.util.Map;
3036
import java.util.Optional;
37+
import java.util.Set;
3138
import java.util.concurrent.ConcurrentHashMap;
3239
import java.util.concurrent.CopyOnWriteArrayList;
3340
import java.util.function.Consumer;
41+
import java.util.function.Function;
3442
import java.util.function.Supplier;
3543

3644
import org.mockito.Mockito;
@@ -215,21 +223,20 @@ public WebClient get(InetSocketAddress endpoint) {
215223
Mockito.when(headersUriSpec.uri(any(), any(Map.class))).thenReturn(headersUriSpec);
216224
Mockito.when(headersUriSpec.headers(any(Consumer.class))).thenReturn(headersUriSpec);
217225
Mockito.when(headersUriSpec.attribute(anyString(), anyString())).thenReturn(headersUriSpec);
226+
Mockito.when(headersUriSpec.uri(any(Function.class))).thenReturn(headersUriSpec);
218227

219-
RequestBodyUriSpec bodyUriSpec = mock(RequestBodyUriSpec.class);
220-
Mockito.when(webClient.method(any())).thenReturn(bodyUriSpec);
221-
Mockito.when(bodyUriSpec.body(any())).thenReturn(headersUriSpec);
222-
Mockito.when(bodyUriSpec.uri(any(), any(Map.class))).thenReturn(bodyUriSpec);
223-
Mockito.when(bodyUriSpec.attribute(anyString(), anyString())).thenReturn(bodyUriSpec);
224-
Mockito.when(bodyUriSpec.headers(any(Consumer.class))).thenReturn(bodyUriSpec);
228+
RequestBodyUriSpec bodySpy = spy(WebClient.create().method(HttpMethod.POST));
229+
230+
Mockito.when(webClient.method(any())).thenReturn(bodySpy);
231+
Mockito.when(bodySpy.body(any())).thenReturn(headersUriSpec);
225232

226233
ClientResponse response = mock(ClientResponse.class);
227234
Mockito.when(headersUriSpec.exchange()).thenReturn(Mono.just(response));
228-
Mockito.when(bodyUriSpec.exchange()).thenReturn(Mono.just(response));
235+
Mockito.when(bodySpy.exchange()).thenReturn(Mono.just(response));
229236
Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED);
230237

231238
headersUriSpecMap.putIfAbsent(key, headersUriSpec);
232-
bodyUriSpecMap.putIfAbsent(key, bodyUriSpec);
239+
bodyUriSpecMap.putIfAbsent(key, bodySpy);
233240
responseMap.putIfAbsent(key, response);
234241

235242
return webClient;
@@ -273,6 +280,21 @@ public interface Send extends Receive, Client {
273280

274281
Receive exchange(Consumer<RequestBodyUriSpec> bodySpec);
275282

283+
default URI captureUri() {
284+
285+
Set<URI> capturingSet = new LinkedHashSet();
286+
287+
exchange(requestBodyUriSpec -> {
288+
289+
ArgumentCaptor<Function<UriBuilder, URI>> fkt = ArgumentCaptor.forClass(Function.class);
290+
verify(requestBodyUriSpec).uri(fkt.capture());
291+
292+
capturingSet.add(fkt.getValue().apply(new DefaultUriBuilderFactory().builder()));
293+
});
294+
295+
return capturingSet.iterator().next();
296+
}
297+
276298
default Receive receiveJsonFromFile(String file) {
277299

278300
return receive(Receive::json) //

0 commit comments

Comments
 (0)