Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,22 @@ static Request submitReindex(ReindexRequest reindexRequest) throws IOException {
return prepareReindexRequest(reindexRequest, false);
}

static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
return prepareDeleteByQueryRequest(deleteByQueryRequest, true);
}

static Request submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
return prepareDeleteByQueryRequest(deleteByQueryRequest, false);
}

static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
return prepareUpdateByQueryRequest(updateByQueryRequest, true);
}

static Request submitUpdateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
return prepareUpdateByQueryRequest(updateByQueryRequest, false);
}

private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Expand Down Expand Up @@ -608,7 +620,8 @@ private static Request prepareDeleteByQueryRequest(DeleteByQueryRequest deleteBy
return request;
}

static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
static Request prepareUpdateByQueryRequest(UpdateByQueryRequest updateByQueryRequest,
boolean waitForCompletion) throws IOException {
String endpoint = endpoint(updateByQueryRequest.indices(), "_update_by_query");
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params()
Expand All @@ -619,6 +632,7 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards())
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond())
.withIndicesOptions(updateByQueryRequest.indicesOptions())
.withWaitForCompletion(waitForCompletion)
.withSlices(updateByQueryRequest.getSlices());
if (updateByQueryRequest.isAbortOnVersionConflict() == false) {
params.putParam("conflicts", "proceed");
Expand All @@ -637,10 +651,6 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I
return request;
}

static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
return prepareDeleteByQueryRequest(deleteByQueryRequest, true);
}

static Request rethrottleReindex(RethrottleRequest rethrottleRequest) {
return rethrottle(rethrottleRequest, "_reindex");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,21 @@ public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQue
);
}

/**
* Submits a update by query task.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
* Update By Query API on elastic.co</a>
* @param updateByQueryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the submission response
*/
public final TaskSubmissionResponse submitUpdateByQueryTask(UpdateByQueryRequest updateByQueryRequest,
RequestOptions options) throws IOException {
return performRequestAndParseEntity(
updateByQueryRequest, RequestConverters::submitUpdateByQuery, options, TaskSubmissionResponse::fromXContent, emptySet()
);
}

/**
* Asynchronously executes an update by query request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
Expand All @@ -31,6 +34,7 @@
import org.elasticsearch.client.cluster.RemoteInfoResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -42,6 +46,7 @@
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -51,18 +56,25 @@
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;

public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {

protected static final String CONFLICT_PIPELINE_ID = "conflict_pipeline";

private static RestHighLevelClient restHighLevelClient;
private static boolean async = Booleans.parseBoolean(System.getProperty("tests.rest.async", "false"));

Expand Down Expand Up @@ -216,6 +228,29 @@ protected static void clusterUpdateSettings(Settings persistentSettings,
request, highLevelClient().cluster()::putSettings, highLevelClient().cluster()::putSettingsAsync).isAcknowledged());
}

protected void putConflictPipeline() throws IOException {
final XContentBuilder pipelineBuilder = jsonBuilder()
.startObject()
.startArray("processors")
.startObject()
.startObject("set")
.field("field", "_version")
.field("value", 1)
.endObject()
.endObject()
.startObject()
.startObject("set")
.field("field", "_id")
.field("value", "1")
.endObject()
.endObject()
.endArray()
.endObject();
final PutPipelineRequest putPipelineRequest = new PutPipelineRequest(CONFLICT_PIPELINE_ID, BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType());
assertTrue(highLevelClient().ingest().putPipeline(putPipelineRequest, RequestOptions.DEFAULT).isAcknowledged());
}

@Override
protected Settings restClientSettings() {
final String user = Objects.requireNonNull(System.getProperty("tests.rest.cluster.username"));
Expand Down Expand Up @@ -280,4 +315,36 @@ protected static void setupRemoteClusterConfig(String remoteClusterName) throws
protected static Map<String, Object> toMap(Response response) throws IOException {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
}

protected static TaskId findTaskToRethrottle(String actionName, String description) throws IOException {
long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest();
request.setActions(actionName);
request.setDetailed(true);
do {
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
list.rethrowFailures("Finding tasks to rethrottle");
List<TaskGroup> taskGroups =
list.getTaskGroups().stream()
.filter(taskGroup -> taskGroup.getTaskInfo().getDescription().equals(description)).collect(Collectors.toList());
assertThat("tasks are left over from the last execution of this test",
taskGroups, hasSize(lessThan(2)));
if (0 == taskGroups.size()) {
// The parent task hasn't started yet
continue;
}
TaskGroup taskGroup = taskGroups.get(0);
assertThat(taskGroup.getChildTasks(), empty());
return taskGroup.getTaskInfo().getTaskId();
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
}

protected static CheckedRunnable<Exception> checkTaskCompletionStatus(RestClient client, String taskId) {
return () -> {
Response response = client.performRequest(new Request("GET", "/_tasks/" + taskId));
assertTrue((boolean) entityAsMap(response).get("completed"));
};
}
}
Loading