Skip to content

Commit f9466fd

Browse files
sohaibiftikharnik9000
authored andcommitted
HLRC: Add delete by query API (#32782)
Adds the delete-by-query API to the High Level REST Client.
1 parent 685dc33 commit f9466fd

File tree

17 files changed

+547
-60
lines changed

17 files changed

+547
-60
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import org.elasticsearch.index.VersionType;
109109
import org.elasticsearch.index.rankeval.RankEvalRequest;
110110
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
111+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
111112
import org.elasticsearch.index.reindex.ReindexRequest;
112113
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
113114
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
@@ -877,6 +878,32 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I
877878
return request;
878879
}
879880

881+
static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
882+
String endpoint =
883+
endpoint(deleteByQueryRequest.indices(), deleteByQueryRequest.getDocTypes(), "_delete_by_query");
884+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
885+
Params params = new Params(request)
886+
.withRouting(deleteByQueryRequest.getRouting())
887+
.withRefresh(deleteByQueryRequest.isRefresh())
888+
.withTimeout(deleteByQueryRequest.getTimeout())
889+
.withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards(), ActiveShardCount.DEFAULT)
890+
.withIndicesOptions(deleteByQueryRequest.indicesOptions());
891+
if (deleteByQueryRequest.isAbortOnVersionConflict() == false) {
892+
params.putParam("conflicts", "proceed");
893+
}
894+
if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
895+
params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize()));
896+
}
897+
if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
898+
params.putParam("scroll", deleteByQueryRequest.getScrollTime());
899+
}
900+
if (deleteByQueryRequest.getSize() > 0) {
901+
params.putParam("size", Integer.toString(deleteByQueryRequest.getSize()));
902+
}
903+
request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
904+
return request;
905+
}
906+
880907
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
881908
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
882909
.addPathPart(rolloverRequest.getNewIndexName()).build();

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.index.rankeval.RankEvalRequest;
6767
import org.elasticsearch.index.rankeval.RankEvalResponse;
6868
import org.elasticsearch.index.reindex.BulkByScrollResponse;
69+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
6970
import org.elasticsearch.index.reindex.ReindexRequest;
7071
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
7172
import org.elasticsearch.plugins.spi.NamedXContentProvider;
@@ -474,6 +475,35 @@ public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, Reques
474475
);
475476
}
476477

478+
/**
479+
* Executes a delete by query request.
480+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
481+
* Delete By Query API on elastic.co</a>
482+
* @param deleteByQueryRequest the request
483+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
484+
* @return the response
485+
* @throws IOException in case there is a problem sending the request or parsing back the response
486+
*/
487+
public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException {
488+
return performRequestAndParseEntity(
489+
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, emptySet()
490+
);
491+
}
492+
493+
/**
494+
* Asynchronously executes a delete by query request.
495+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
496+
* Delete By Query API on elastic.co</a>
497+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
498+
* @param listener the listener to be notified upon request completion
499+
*/
500+
public final void deleteByQueryAsync(DeleteByQueryRequest reindexRequest, RequestOptions options,
501+
ActionListener<BulkByScrollResponse> listener) {
502+
performRequestAsyncAndParseEntity(
503+
reindexRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
504+
);
505+
}
506+
477507
/**
478508
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
479509
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.action.get.MultiGetResponse;
3737
import org.elasticsearch.action.index.IndexRequest;
3838
import org.elasticsearch.action.index.IndexResponse;
39+
import org.elasticsearch.action.search.SearchRequest;
3940
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
4041
import org.elasticsearch.action.update.UpdateRequest;
4142
import org.elasticsearch.action.update.UpdateResponse;
@@ -50,6 +51,7 @@
5051
import org.elasticsearch.index.get.GetResult;
5152
import org.elasticsearch.index.query.IdsQueryBuilder;
5253
import org.elasticsearch.index.reindex.BulkByScrollResponse;
54+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
5355
import org.elasticsearch.index.reindex.ReindexRequest;
5456
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
5557
import org.elasticsearch.rest.RestStatus;
@@ -823,6 +825,52 @@ public void testUpdateByQuery() throws IOException {
823825
}
824826
}
825827

828+
public void testDeleteByQuery() throws IOException {
829+
final String sourceIndex = "source1";
830+
{
831+
// Prepare
832+
Settings settings = Settings.builder()
833+
.put("number_of_shards", 1)
834+
.put("number_of_replicas", 0)
835+
.build();
836+
createIndex(sourceIndex, settings);
837+
assertEquals(
838+
RestStatus.OK,
839+
highLevelClient().bulk(
840+
new BulkRequest()
841+
.add(new IndexRequest(sourceIndex, "type", "1")
842+
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
843+
.add(new IndexRequest(sourceIndex, "type", "2")
844+
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
845+
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
846+
RequestOptions.DEFAULT
847+
).status()
848+
);
849+
}
850+
{
851+
// test1: delete one doc
852+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
853+
deleteByQueryRequest.indices(sourceIndex);
854+
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type"));
855+
deleteByQueryRequest.setRefresh(true);
856+
BulkByScrollResponse bulkResponse =
857+
execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync);
858+
assertEquals(1, bulkResponse.getTotal());
859+
assertEquals(1, bulkResponse.getDeleted());
860+
assertEquals(0, bulkResponse.getNoops());
861+
assertEquals(0, bulkResponse.getVersionConflicts());
862+
assertEquals(1, bulkResponse.getBatches());
863+
assertTrue(bulkResponse.getTook().getMillis() > 0);
864+
assertEquals(1, bulkResponse.getBatches());
865+
assertEquals(0, bulkResponse.getBulkFailures().size());
866+
assertEquals(0, bulkResponse.getSearchFailures().size());
867+
assertEquals(
868+
1,
869+
highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().totalHits
870+
);
871+
}
872+
}
873+
826874
public void testBulkProcessorIntegration() throws IOException {
827875
int nbItems = randomIntBetween(10, 100);
828876
boolean[] errors = new boolean[nbItems];

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@
128128
import org.elasticsearch.index.rankeval.RankEvalSpec;
129129
import org.elasticsearch.index.rankeval.RatedRequest;
130130
import org.elasticsearch.index.rankeval.RestRankEvalAction;
131+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
131132
import org.elasticsearch.index.reindex.ReindexRequest;
132133
import org.elasticsearch.index.reindex.RemoteInfo;
133134
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
@@ -536,6 +537,53 @@ public void testUpdateByQuery() throws IOException {
536537
assertToXContentBody(updateByQueryRequest, request.getEntity());
537538
}
538539

540+
public void testDeleteByQuery() throws IOException {
541+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
542+
deleteByQueryRequest.indices(randomIndicesNames(1, 5));
543+
Map<String, String> expectedParams = new HashMap<>();
544+
if (randomBoolean()) {
545+
deleteByQueryRequest.setDocTypes(generateRandomStringArray(5, 5, false, false));
546+
}
547+
if (randomBoolean()) {
548+
int batchSize = randomInt(100);
549+
deleteByQueryRequest.setBatchSize(batchSize);
550+
expectedParams.put("scroll_size", Integer.toString(batchSize));
551+
}
552+
if (randomBoolean()) {
553+
deleteByQueryRequest.setRouting("=cat");
554+
expectedParams.put("routing", "=cat");
555+
}
556+
if (randomBoolean()) {
557+
int size = randomIntBetween(100, 1000);
558+
deleteByQueryRequest.setSize(size);
559+
expectedParams.put("size", Integer.toString(size));
560+
}
561+
if (randomBoolean()) {
562+
deleteByQueryRequest.setAbortOnVersionConflict(false);
563+
expectedParams.put("conflicts", "proceed");
564+
}
565+
if (randomBoolean()) {
566+
String ts = randomTimeValue();
567+
deleteByQueryRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll"));
568+
expectedParams.put("scroll", ts);
569+
}
570+
if (randomBoolean()) {
571+
deleteByQueryRequest.setQuery(new TermQueryBuilder("foo", "fooval"));
572+
}
573+
setRandomIndicesOptions(deleteByQueryRequest::setIndicesOptions, deleteByQueryRequest::indicesOptions, expectedParams);
574+
setRandomTimeout(deleteByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
575+
Request request = RequestConverters.deleteByQuery(deleteByQueryRequest);
576+
StringJoiner joiner = new StringJoiner("/", "/", "");
577+
joiner.add(String.join(",", deleteByQueryRequest.indices()));
578+
if (deleteByQueryRequest.getDocTypes().length > 0)
579+
joiner.add(String.join(",", deleteByQueryRequest.getDocTypes()));
580+
joiner.add("_delete_by_query");
581+
assertEquals(joiner.toString(), request.getEndpoint());
582+
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
583+
assertEquals(expectedParams, request.getParameters());
584+
assertToXContentBody(deleteByQueryRequest, request.getEntity());
585+
}
586+
539587
public void testPutMapping() throws IOException {
540588
PutMappingRequest putMappingRequest = new PutMappingRequest();
541589

@@ -2754,7 +2802,7 @@ public void testXPackPutWatch() throws Exception {
27542802
request.getEntity().writeTo(bos);
27552803
assertThat(bos.toString("UTF-8"), is(body));
27562804
}
2757-
2805+
27582806
public void testGraphExplore() throws Exception {
27592807
Map<String, String> expectedParams = new HashMap<>();
27602808

@@ -2782,7 +2830,7 @@ public void testGraphExplore() throws Exception {
27822830
assertEquals(expectedParams, request.getParameters());
27832831
assertThat(request.getEntity().getContentType().getValue(), is(XContentType.JSON.mediaTypeWithoutParameters()));
27842832
assertToXContentBody(graphExploreRequest, request.getEntity());
2785-
}
2833+
}
27862834

27872835
public void testXPackDeleteWatch() {
27882836
DeleteWatchRequest deleteWatchRequest = new DeleteWatchRequest();

client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,6 @@ public void testApiNamingConventions() throws Exception {
667667
"cluster.remote_info",
668668
"count",
669669
"create",
670-
"delete_by_query",
671670
"exists_source",
672671
"get_source",
673672
"indices.delete_alias",

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.elasticsearch.index.query.MatchAllQueryBuilder;
6666
import org.elasticsearch.index.query.TermQueryBuilder;
6767
import org.elasticsearch.index.reindex.BulkByScrollResponse;
68+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
6869
import org.elasticsearch.index.reindex.ReindexRequest;
6970
import org.elasticsearch.index.reindex.RemoteInfo;
7071
import org.elasticsearch.index.reindex.ScrollableHitSource;
@@ -1030,6 +1031,113 @@ public void onFailure(Exception e) {
10301031
}
10311032
}
10321033

1034+
public void testDeleteByQuery() throws Exception {
1035+
RestHighLevelClient client = highLevelClient();
1036+
{
1037+
String mapping =
1038+
"\"doc\": {\n" +
1039+
" \"properties\": {\n" +
1040+
" \"user\": {\n" +
1041+
" \"type\": \"text\"\n" +
1042+
" },\n" +
1043+
" \"field1\": {\n" +
1044+
" \"type\": \"integer\"\n" +
1045+
" },\n" +
1046+
" \"field2\": {\n" +
1047+
" \"type\": \"integer\"\n" +
1048+
" }\n" +
1049+
" }\n" +
1050+
" }";
1051+
createIndex("source1", Settings.EMPTY, mapping);
1052+
createIndex("source2", Settings.EMPTY, mapping);
1053+
}
1054+
{
1055+
// tag::delete-by-query-request
1056+
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2"); // <1>
1057+
// end::delete-by-query-request
1058+
// tag::delete-by-query-request-conflicts
1059+
request.setConflicts("proceed"); // <1>
1060+
// end::delete-by-query-request-conflicts
1061+
// tag::delete-by-query-request-typeOrQuery
1062+
request.setDocTypes("doc"); // <1>
1063+
request.setQuery(new TermQueryBuilder("user", "kimchy")); // <2>
1064+
// end::delete-by-query-request-typeOrQuery
1065+
// tag::delete-by-query-request-size
1066+
request.setSize(10); // <1>
1067+
// end::delete-by-query-request-size
1068+
// tag::delete-by-query-request-scrollSize
1069+
request.setBatchSize(100); // <1>
1070+
// end::delete-by-query-request-scrollSize
1071+
// tag::delete-by-query-request-timeout
1072+
request.setTimeout(TimeValue.timeValueMinutes(2)); // <1>
1073+
// end::delete-by-query-request-timeout
1074+
// tag::delete-by-query-request-refresh
1075+
request.setRefresh(true); // <1>
1076+
// end::delete-by-query-request-refresh
1077+
// tag::delete-by-query-request-slices
1078+
request.setSlices(2); // <1>
1079+
// end::delete-by-query-request-slices
1080+
// tag::delete-by-query-request-scroll
1081+
request.setScroll(TimeValue.timeValueMinutes(10)); // <1>
1082+
// end::delete-by-query-request-scroll
1083+
// tag::delete-by-query-request-routing
1084+
request.setRouting("=cat"); // <1>
1085+
// end::delete-by-query-request-routing
1086+
// tag::delete-by-query-request-indicesOptions
1087+
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // <1>
1088+
// end::delete-by-query-request-indicesOptions
1089+
1090+
// tag::delete-by-query-execute
1091+
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
1092+
// end::delete-by-query-execute
1093+
assertSame(0, bulkResponse.getSearchFailures().size());
1094+
assertSame(0, bulkResponse.getBulkFailures().size());
1095+
// tag::delete-by-query-response
1096+
TimeValue timeTaken = bulkResponse.getTook(); // <1>
1097+
boolean timedOut = bulkResponse.isTimedOut(); // <2>
1098+
long totalDocs = bulkResponse.getTotal(); // <3>
1099+
long deletedDocs = bulkResponse.getDeleted(); // <4>
1100+
long batches = bulkResponse.getBatches(); // <5>
1101+
long noops = bulkResponse.getNoops(); // <6>
1102+
long versionConflicts = bulkResponse.getVersionConflicts(); // <7>
1103+
long bulkRetries = bulkResponse.getBulkRetries(); // <8>
1104+
long searchRetries = bulkResponse.getSearchRetries(); // <9>
1105+
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <10>
1106+
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <11>
1107+
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures(); // <12>
1108+
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures(); // <13>
1109+
// end::delete-by-query-response
1110+
}
1111+
{
1112+
DeleteByQueryRequest request = new DeleteByQueryRequest();
1113+
request.indices("source1");
1114+
1115+
// tag::delete-by-query-execute-listener
1116+
ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
1117+
@Override
1118+
public void onResponse(BulkByScrollResponse bulkResponse) {
1119+
// <1>
1120+
}
1121+
1122+
@Override
1123+
public void onFailure(Exception e) {
1124+
// <2>
1125+
}
1126+
};
1127+
// end::delete-by-query-execute-listener
1128+
1129+
// Replace the empty listener by a blocking listener in test
1130+
final CountDownLatch latch = new CountDownLatch(1);
1131+
listener = new LatchedActionListener<>(listener, latch);
1132+
1133+
// tag::delete-by-query-execute-async
1134+
client.deleteByQueryAsync(request, RequestOptions.DEFAULT, listener); // <1>
1135+
// end::delete-by-query-execute-async
1136+
1137+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
1138+
}
1139+
}
1140+
10331141
public void testGet() throws Exception {
10341142
RestHighLevelClient client = highLevelClient();
10351143
{

0 commit comments

Comments
 (0)