Skip to content

Commit 1a65f9f

Browse files
committed
Implement update by query
1 parent 3c6dd64 commit 1a65f9f

File tree

5 files changed

+135
-6
lines changed

5 files changed

+135
-6
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.netty.handler.timeout.WriteTimeoutHandler;
2525
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
2626
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
27+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
2728
import reactor.core.publisher.Flux;
2829
import reactor.core.publisher.Mono;
2930
import reactor.netty.http.client.HttpClient;
@@ -521,6 +522,12 @@ public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryReq
521522
.next();
522523
}
523524

525+
@Override
526+
public Mono<BulkByScrollResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) {
527+
return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers) //
528+
.next();
529+
}
530+
524531
/*
525532
* (non-Javadoc)
526533
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)

src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
1919
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
20+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
2021
import reactor.core.publisher.Flux;
2122
import reactor.core.publisher.Mono;
2223

@@ -596,6 +597,44 @@ default Mono<BulkByScrollResponse> deleteBy(DeleteByQueryRequest deleteRequest)
596597
*/
597598
Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest);
598599

600+
/**
601+
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
602+
*
603+
* @param consumer never {@literal null}.
604+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
605+
* * Query API on elastic.co</a>
606+
* @return a {@link Mono} emitting operation response.
607+
*/
608+
default Mono<BulkByScrollResponse> updateBy(Consumer<UpdateByQueryRequest> consumer){
609+
610+
final UpdateByQueryRequest request = new UpdateByQueryRequest();
611+
consumer.accept(request);
612+
return updateBy(request);
613+
}
614+
615+
/**
616+
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
617+
*
618+
* @param updateRequest must not be {@literal null}.
619+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
620+
* * Query API on elastic.co</a>
621+
* @return a {@link Mono} emitting operation response.
622+
*/
623+
default Mono<BulkByScrollResponse> updateBy(UpdateByQueryRequest updateRequest){
624+
return updateBy(HttpHeaders.EMPTY, updateRequest);
625+
}
626+
627+
/**
628+
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
629+
*
630+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
631+
* @param updateRequest must not be {@literal null}.
632+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
633+
* * Query API on elastic.co</a>
634+
* @return a {@link Mono} emitting operation response.
635+
*/
636+
Mono<BulkByScrollResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest);
637+
599638
/**
600639
* Execute a {@link BulkRequest} against the {@literal bulk} API.
601640
*

src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
3434
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
3535
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
36+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
3637
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
3738
import org.springframework.data.elasticsearch.client.util.RequestConverters;
3839

@@ -91,6 +92,10 @@ default Function<DeleteByQueryRequest, Request> deleteByQuery() {
9192
return RequestConverters::deleteByQuery;
9293
}
9394

95+
default Function<UpdateByQueryRequest, Request> updateByQuery() {
96+
return RequestConverters::updateByQuery;
97+
}
98+
9499
default Function<BulkRequest, Request> bulk() {
95100

96101
return request -> {

src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -540,8 +540,10 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
540540
public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) {
541541
String endpoint = endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query");
542542
Request request = new Request(HttpMethod.POST.name(), endpoint);
543-
Params params = new Params(request).withRouting(updateByQueryRequest.getRouting())
544-
.withPipeline(updateByQueryRequest.getPipeline()).withRefresh(updateByQueryRequest.isRefresh())
543+
Params params = new Params(request)
544+
.withRouting(updateByQueryRequest.getRouting())
545+
.withPipeline(updateByQueryRequest.getPipeline())
546+
.withRefresh(updateByQueryRequest.isRefresh())
545547
.withTimeout(updateByQueryRequest.getTimeout())
546548
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards())
547549
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond())
@@ -555,8 +557,8 @@ public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) {
555557
if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
556558
params.putParam("scroll", updateByQueryRequest.getScrollTime());
557559
}
558-
if (updateByQueryRequest.getSize() > 0) {
559-
params.putParam("size", Integer.toString(updateByQueryRequest.getSize()));
560+
if (updateByQueryRequest.getMaxDocs() > 0) {
561+
params.putParam("max_docs", Integer.toString(updateByQueryRequest.getMaxDocs()));
560562
}
561563
request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
562564
return request;

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
import lombok.SneakyThrows;
2121
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
22+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
23+
import org.elasticsearch.script.Script;
24+
import org.elasticsearch.script.ScriptType;
2225
import reactor.core.publisher.Mono;
2326
import reactor.test.StepVerifier;
2427

@@ -27,6 +30,7 @@
2730
import java.util.Collections;
2831
import java.util.HashMap;
2932
import java.util.LinkedHashMap;
33+
import java.util.List;
3034
import java.util.Map;
3135
import java.util.UUID;
3236
import java.util.stream.IntStream;
@@ -102,8 +106,10 @@ protected org.springframework.data.elasticsearch.core.RefreshPolicy refreshPolic
102106
// (Object...)
103107
static final Map<String, Object> DOC_SOURCE;
104108

105-
@Autowired ReactiveElasticsearchClient client;
106-
@Autowired ReactiveElasticsearchOperations operations;
109+
@Autowired
110+
ReactiveElasticsearchClient client;
111+
@Autowired
112+
ReactiveElasticsearchOperations operations;
107113

108114
static {
109115

@@ -451,6 +457,76 @@ public void deleteByEmitResultWhenNothingRemoved() {
451457
.verifyComplete();
452458
}
453459

460+
@Test // #1446
461+
void updateByEmitResultWhenNothingUpdated() {
462+
addSourceDocument().to(INDEX_I);
463+
addSourceDocument().to(INDEX_I);
464+
465+
Map<String, String> source = new LinkedHashMap<>();
466+
source.put("firstname", "crow");
467+
source.put("lastname", "cat");
468+
469+
final Map<String, String> documentToNotUpdate = Collections.unmodifiableMap(source);
470+
add(documentToNotUpdate).to(INDEX_I);
471+
472+
final String script = "ctx._source['firstname'] = params['newFirstname']";
473+
final Map<String, Object> params = Collections.singletonMap("newFirstname", "arrow");
474+
475+
final UpdateByQueryRequest request = new UpdateByQueryRequest(INDEX_I)
476+
.setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("lastname", "fallstar")))
477+
.setAbortOnVersionConflict(true)
478+
.setRefresh(true)
479+
.setScript(new Script(ScriptType.INLINE, "painless", script, params));
480+
481+
client.updateBy(request)
482+
.map(BulkByScrollResponse::getUpdated)
483+
.as(StepVerifier::create)
484+
.expectNext(2L)
485+
.verifyComplete();
486+
487+
final SearchRequest searchUpdatedRequest = new SearchRequest(INDEX_I) //
488+
.source(new SearchSourceBuilder()
489+
.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("firstname", "arrow"))));
490+
491+
client.search(searchUpdatedRequest)
492+
.collectList()
493+
.map(List::size)
494+
.as(StepVerifier::create)
495+
.expectNext(2)
496+
.verifyComplete();
497+
}
498+
499+
@Test // #1446
500+
void updateByShouldUpdateExistingDocument() {
501+
addSourceDocument().to(INDEX_I);
502+
503+
final String script = "ctx._source['firstname'] = params['newFirstname']";
504+
final Map<String, Object> params = Collections.singletonMap("newFirstname", "arrow");
505+
506+
final UpdateByQueryRequest request = new UpdateByQueryRequest(INDEX_I)
507+
.setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("lastname", "non_existing_lastname")))
508+
.setAbortOnVersionConflict(true)
509+
.setRefresh(true)
510+
.setScript(new Script(ScriptType.INLINE, "painless", script, params));
511+
512+
client.updateBy(request)
513+
.map(BulkByScrollResponse::getUpdated)
514+
.as(StepVerifier::create)
515+
.expectNext(0L)
516+
.verifyComplete();
517+
518+
SearchRequest searchUpdatedRequest = new SearchRequest(INDEX_I) //
519+
.source(new SearchSourceBuilder()
520+
.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("firstname", "arrow"))));
521+
522+
client.search(searchUpdatedRequest)
523+
.collectList()
524+
.map(List::size)
525+
.as(StepVerifier::create)
526+
.expectNext(0)
527+
.verifyComplete();
528+
}
529+
454530
@Test // DATAES-510
455531
public void scrollShouldReadWhileEndNotReached() {
456532

0 commit comments

Comments
 (0)