From c042301b9f60978a782a93f436e48e3914f52634 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 14 Dec 2018 09:00:58 +0100 Subject: [PATCH 1/3] [Close Index API] Propagate tasks ids between Freeze, Close and Verify actions --- .../CloseIndexClusterStateUpdateRequest.java | 10 +++++++ .../close/TransportCloseIndexAction.java | 7 +++++ .../metadata/MetaDataIndexStateService.java | 30 +++++++++---------- .../action/TransportFreezeIndexAction.java | 7 +++++ 4 files changed, 39 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java index 8ad79f1676eb1..882aa41364093 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java @@ -25,7 +25,17 @@ */ public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { + private long taskId; + public CloseIndexClusterStateUpdateRequest() { + } + + public Long taskId() { + return taskId; + } + public CloseIndexClusterStateUpdateRequest taskId(final long taskId) { + this.taskId = taskId; + return this; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index 605f0ed9217ac..113a2e7f510dc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -99,6 +99,12 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta @Override protected void masterOperation(final CloseIndexRequest request, final ClusterState state, final ActionListener listener) { + throw new UnsupportedOperationException("The task parameter is required"); + } + + @Override + protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state, + final ActionListener listener) throws Exception { final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); if (concreteIndices == null || concreteIndices.length == 0) { listener.onResponse(new AcknowledgedResponse(true)); @@ -108,6 +114,7 @@ protected void masterOperation(final CloseIndexRequest request, final ClusterSta final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest() .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()) + .taskId(task.getId()) .indices(concreteIndices); indexStateService.closeIndices(closeRequest, new ActionListener() { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index cda8f9c6f0ac6..49ca70dde9051 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -46,7 +46,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.collect.ImmutableOpenIntMap; @@ -63,6 +62,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -120,9 +120,6 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina throw new IllegalArgumentException("Index name is required"); } - final TimeValue timeout = request.ackTimeout(); - final TimeValue masterTimeout = request.masterNodeTimeout(); - clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices), new ClusterStateUpdateTask(Priority.URGENT) { @@ -141,7 +138,7 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta } else { assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) - .execute(new WaitForClosedBlocksApplied(blockedIndices, timeout, + .execute(new WaitForClosedBlocksApplied(blockedIndices, request, ActionListener.wrap(closedBlocksResults -> clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) { @Override @@ -176,7 +173,7 @@ public void onFailure(final String source, final Exception e) { @Override public TimeValue timeout() { - return masterTimeout; + return request.masterNodeTimeout(); } } ); @@ -246,18 +243,18 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterSta class WaitForClosedBlocksApplied extends AbstractRunnable { private final Set blockedIndices; - private final @Nullable TimeValue timeout; + private final CloseIndexClusterStateUpdateRequest request; private final ActionListener> listener; private WaitForClosedBlocksApplied(final Set blockedIndices, - final @Nullable TimeValue timeout, + final CloseIndexClusterStateUpdateRequest request, final ActionListener> listener) { if (blockedIndices == null || blockedIndices.isEmpty()) { throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices"); } this.blockedIndices = blockedIndices; + this.request = request; this.listener = listener; - this.timeout = timeout; } @Override @@ -271,7 +268,7 @@ protected void doRun() throws Exception { final CountDown countDown = new CountDown(blockedIndices.size()); final ClusterState state = clusterService.state(); for (Index blockedIndex : blockedIndices) { - waitForShardsReadyForClosing(blockedIndex, state, timeout, response -> { + waitForShardsReadyForClosing(blockedIndex, state, response -> { results.put(blockedIndex, response); if (countDown.countDown()) { listener.onResponse(unmodifiableMap(results)); @@ -280,7 +277,7 @@ protected void doRun() throws Exception { } } - private void waitForShardsReadyForClosing(final Index index, final ClusterState state, @Nullable final TimeValue timeout, + private void waitForShardsReadyForClosing(final Index index, final ClusterState state, final Consumer onResponse) { final IndexMetaData indexMetaData = state.metaData().index(index); if (indexMetaData == null) { @@ -302,7 +299,7 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState for (IntObjectCursor shard : shards) { final IndexShardRoutingTable shardRoutingTable = shard.value; final ShardId shardId = shardRoutingTable.shardId(); - sendVerifyShardBeforeCloseRequest(shardRoutingTable, timeout, new NotifyOnceListener() { + sendVerifyShardBeforeCloseRequest(shardRoutingTable, new NotifyOnceListener() { @Override public void innerOnResponse(final ReplicationResponse replicationResponse) { ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo(); @@ -326,7 +323,7 @@ private void processIfFinished() { } } - private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, @Nullable final TimeValue timeout, + private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, final ActionListener listener) { final ShardId shardId = shardRoutingTable.shardId(); if (shardRoutingTable.primaryShard().unassigned()) { @@ -338,8 +335,11 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar } final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId); - if (timeout != null) { - shardRequest.timeout(timeout); + if (request.ackTimeout() != null) { + shardRequest.timeout(request.ackTimeout()); + } + if (request.taskId() != null) { + shardRequest.setParentTask(new TaskId(clusterService.localNode().getId(), request.taskId())); } // TODO propagate a task id from the parent CloseIndexRequest to the ShardCloseRequests transportVerifyShardBeforeCloseAction.execute(shardRequest, listener); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java index 36cce46d47c46..a8871d5da629a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java @@ -109,6 +109,12 @@ private Index[] resolveIndices(FreezeRequest request, ClusterState state) { @Override protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener listener) { + throw new UnsupportedOperationException("The task parameter is required"); + } + + @Override + protected void masterOperation(Task task, TransportFreezeIndexAction.FreezeRequest request, ClusterState state, + ActionListener listener) throws Exception { final Index[] concreteIndices = resolveIndices(request, state); if (concreteIndices.length == 0) { listener.onResponse(new FreezeResponse(true, true)); @@ -118,6 +124,7 @@ protected void masterOperation(FreezeRequest request, ClusterState state, Action final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest() .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()) + .taskId(task.getId()) .indices(concreteIndices); indexStateService.closeIndices(closeRequest, new ActionListener() { From d239dc822f9a690be752bd4ca5991bbeb0737964 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 17 Dec 2018 11:54:00 +0100 Subject: [PATCH 2/3] Apply feedback --- .../close/CloseIndexClusterStateUpdateRequest.java | 12 ++++-------- .../indices/close/TransportCloseIndexAction.java | 3 +-- .../close/TransportVerifyShardBeforeCloseAction.java | 4 +++- .../cluster/metadata/MetaDataIndexStateService.java | 6 ++---- .../TransportVerifyShardBeforeCloseActionTests.java | 3 ++- .../core/action/TransportFreezeIndexAction.java | 3 +-- 6 files changed, 13 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java index 882aa41364093..bb0f98ac07b7e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java @@ -25,17 +25,13 @@ */ public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { - private long taskId; + private final long taskId; - public CloseIndexClusterStateUpdateRequest() { + public CloseIndexClusterStateUpdateRequest(final long taskId) { + this.taskId = taskId; } - public Long taskId() { + public long taskId() { return taskId; } - - public CloseIndexClusterStateUpdateRequest taskId(final long taskId) { - this.taskId = taskId; - return this; - } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index 113a2e7f510dc..bb3db084b0c53 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -111,10 +111,9 @@ protected void masterOperation(final Task task, final CloseIndexRequest request, return; } - final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest() + final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()) - .taskId(task.getId()) .indices(concreteIndices); indexStateService.closeIndices(closeRequest, new ActionListener() { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 1d283cbe004d0..f603f92a7189e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -141,8 +142,9 @@ public static class ShardRequest extends ReplicationRequest { ShardRequest(){ } - public ShardRequest(final ShardId shardId) { + public ShardRequest(final ShardId shardId, final TaskId parentTaskId) { super(shardId); + setParentTask(parentTaskId); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 49ca70dde9051..6ceda4bf57d13 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -333,14 +333,12 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar listener.onResponse(response); return; } + final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId()); final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, parentTaskId); if (request.ackTimeout() != null) { shardRequest.timeout(request.ackTimeout()); } - if (request.taskId() != null) { - shardRequest.setParentTask(new TaskId(clusterService.localNode().getId(), request.taskId())); - } // TODO propagate a task id from the parent CloseIndexRequest to the ShardCloseRequests transportVerifyShardBeforeCloseAction.execute(shardRequest, listener); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 2764eee798e6b..9e1e5c4ac8f3a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; @@ -130,7 +131,7 @@ public static void afterClass() { private void executeOnPrimaryOrReplica() throws Exception { final TransportVerifyShardBeforeCloseAction.ShardRequest request = - new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId()); + new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), new TaskId("_node_id", randomNonNegativeLong())); if (randomBoolean()) { assertNotNull(action.shardOperationOnPrimary(request, indexShard)); } else { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java index a8871d5da629a..3031ec5b2a409 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java @@ -121,10 +121,9 @@ protected void masterOperation(Task task, TransportFreezeIndexAction.FreezeReque return; } - final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest() + final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()) - .taskId(task.getId()) .indices(concreteIndices); indexStateService.closeIndices(closeRequest, new ActionListener() { From 9d220000bdca31a1afaff5620860c2479570ed27 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 19 Dec 2018 16:31:43 +0100 Subject: [PATCH 3/3] Fix conflict --- .../close/TransportVerifyShardBeforeCloseActionTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 9e1e5c4ac8f3a..c0da96ed1efb7 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -205,7 +205,8 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0)); final PlainActionFuture listener = new PlainActionFuture<>(); - TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId); + TransportVerifyShardBeforeCloseAction.ShardRequest request = + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, new TaskId(clusterService.localNode().getId(), 0L)); ReplicationOperation.Replicas proxy = action.newReplicasProxy(primaryTerm); ReplicationOperation operation =