From f61462c0208ca17cc3c9e09a1dbb0a4f5d4417e6 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 5 Mar 2019 10:41:49 +0100 Subject: [PATCH 1/7] Close Index Response --- .../test/indices.open/10_basic.yml | 37 +++ .../indices/close/CloseIndexResponse.java | 249 ++++++++++++++++++ .../metadata/MetaDataIndexStateService.java | 68 +++-- .../close/CloseIndexResponseTests.java | 143 +++++++++- .../MetaDataIndexStateServiceTests.java | 29 +- .../MetaDataIndexStateServiceUtils.java | 4 +- .../indices/cluster/ClusterStateChanges.java | 6 +- .../indices/state/CloseIndexIT.java | 52 +++- 8 files changed, 530 insertions(+), 58 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml index 0e08690868d9e..0491f7de6ed67 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml @@ -82,3 +82,40 @@ - is_true: acknowledged - match: { acknowledged: true } - match: { shards_acknowledged: true } +--- +"Close index response with result per index": + - skip: + version: " - 7.99.99" + reason: "close index response reports result per index starting version 8.0.0" + + - do: + indices.create: + index: index_1 + body: + settings: + number_of_replicas: 0 + + - do: + indices.create: + index: index_2 + body: + settings: + number_of_replicas: 0 + + - do: + indices.create: + index: index_3 + body: + settings: + number_of_replicas: 0 + + - do: + indices.close: + index: "index_*" + + - match: { acknowledged: true } + - match: { shards_acknowledged: true } + - match: { indices.index_1.closed: true } + - match: { indices.index_2.closed: true } + - match: { indices.index_3.closed: true } + diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java index 9f93034479475..188e20d30e3fa 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java @@ -18,20 +18,44 @@ */ package org.elasticsearch.action.admin.indices.close; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; public class CloseIndexResponse extends ShardsAcknowledgedResponse { + private List indices; + CloseIndexResponse() { } public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged) { + this(acknowledged, shardsAcknowledged, emptyList()); + } + + public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged, final List indices) { super(acknowledged, shardsAcknowledged); + this.indices = Objects.requireNonNull(indices); + } + + public List getIndices() { + return indices; } @Override @@ -40,6 +64,11 @@ public void readFrom(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_7_1_0)) { readShardsAcknowledged(in); } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + indices = unmodifiableList(in.readList(IndexResult::new)); + } else { + indices = unmodifiableList(emptyList()); + } } @Override @@ -48,5 +77,225 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_1_0)) { writeShardsAcknowledged(out); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeList(indices); + } + } + + protected void addCustomFields(final XContentBuilder builder, final Params params) throws IOException { + super.addCustomFields(builder, params); + builder.startObject("indices"); + for (IndexResult index : indices) { + index.toXContent(builder, params); + } + builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class IndexResult implements Writeable, ToXContentFragment { + + private final Index index; + private final @Nullable Exception exception; + private final @Nullable ShardResult[] shards; + + public IndexResult(final Index index) { + this(index, null, null); + } + + public IndexResult(final Index index, final Exception failure) { + this(index, Objects.requireNonNull(failure), null); + } + + public IndexResult(final Index index, final ShardResult[] shards) { + this(index, null, Objects.requireNonNull(shards)); + } + + private IndexResult(final Index index, @Nullable final Exception exception, @Nullable final ShardResult[] shards) { + this.index = Objects.requireNonNull(index); + this.exception = exception; + this.shards = shards; + } + + IndexResult(final StreamInput in) throws IOException { + this.index = new Index(in); + this.exception = in.readException(); + this.shards = in.readOptionalArray(ShardResult::new, ShardResult[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + index.writeTo(out); + out.writeException(exception); + out.writeOptionalArray(shards); + } + + public Index getIndex() { + return index; + } + + public Exception getException() { + return exception; + } + + public ShardResult[] getShards() { + return shards; + } + + public boolean hasFailures() { + if (exception != null) { + return true; + } + if (shards != null) { + for (ShardResult shard : shards) { + if (shard.hasFailures()) { + return true; + } + } + } + return false; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(index.getName()); + { + if (hasFailures()) { + builder.field("closed", false); + if (exception != null) { + builder.startObject("exception"); + ElasticsearchException.generateFailureXContent(builder, params, exception, true); + builder.endObject(); + } else { + builder.startObject("shards"); + for (ShardResult shards : shards) { + if (shards.hasFailures()) { + shards.toXContent(builder, params); + } + } + builder.endObject(); + } + } else { + builder.field("closed", true); + } + } + return builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + } + + public static class ShardResult implements Writeable, ToXContentFragment { + + private final int id; + private final ShardResult.Failure[] failures; + + public ShardResult(final int id, final Failure... failures) { + this.id = id; + this.failures = failures; + } + + ShardResult(final StreamInput in) throws IOException { + this.id = in.readVInt(); + this.failures = in.readOptionalArray(Failure::readFailure, ShardResult.Failure[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeVInt(id); + out.writeOptionalArray(failures); + } + + public boolean hasFailures() { + return failures != null && failures.length > 0; + } + + public int getId() { + return id; + } + + public Failure[] getFailures() { + return failures; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(String.valueOf(id)); + { + builder.startArray("failures"); + if (failures != null) { + for (Failure failure : failures) { + builder.startObject(); + failure.toXContent(builder, params); + builder.endObject(); + } + } + builder.endArray(); + } + return builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class Failure extends DefaultShardOperationFailedException implements Writeable { + + private @Nullable String nodeId; + + private Failure() { + } + + public Failure(final String index, final int shardId, final Throwable reason) { + this(index, shardId, reason, null); + } + + public Failure(final String index, final int shardId, final Throwable reason, final String nodeId) { + super(index, shardId, reason); + this.nodeId = nodeId; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + nodeId = in.readOptionalString(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(nodeId); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + if (nodeId != null) { + builder.field("node", nodeId); + } + return super.toXContent(builder, params); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + static Failure readFailure(final StreamInput in) throws IOException { + final Failure failure = new Failure(); + failure.readFrom(in); + return failure; + } + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index a004d0a5a2324..f044bb0225eca 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -29,10 +29,11 @@ import org.elasticsearch.action.NotifyOnceListener; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.ShardResult; import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.action.support.ActiveShardsObserver; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; @@ -152,18 +153,9 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta ActionListener.wrap(results -> clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) { - boolean acknowledged = true; - @Override public ClusterState execute(final ClusterState currentState) throws Exception { final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results); - for (Map.Entry result : results.entrySet()) { - IndexMetaData updatedMetaData = updatedState.metaData().index(result.getKey()); - if (updatedMetaData != null && updatedMetaData.getState() != IndexMetaData.State.CLOSE) { - acknowledged = false; - break; - } - } return allocationService.reroute(updatedState, "indices closed"); } @@ -176,27 +168,29 @@ public void onFailure(final String source, final Exception e) { public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { - final String[] indices = results.entrySet().stream() - .filter(result -> result.getValue().isAcknowledged()) - .map(result -> result.getKey().getName()) - .filter(index -> newState.routingTable().hasIndex(index)) + final List indices = new ArrayList<>(results.values()); + final boolean acknowledged = indices.stream().noneMatch(IndexResult::hasFailures); + final String[] waitForIndices = indices.stream() + .filter(result -> result.hasFailures() == false) + .filter(result -> newState.routingTable().hasIndex(result.getIndex())) + .map(result -> result.getIndex().getName()) .toArray(String[]::new); - if (indices.length > 0) { - activeShardsObserver.waitForActiveShards(indices, request.waitForActiveShards(), + if (waitForIndices.length > 0) { + activeShardsObserver.waitForActiveShards(waitForIndices, request.waitForActiveShards(), request.ackTimeout(), shardsAcknowledged -> { if (shardsAcknowledged == false) { logger.debug("[{}] indices closed, but the operation timed out while waiting " + - "for enough shards to be started.", Arrays.toString(indices)); + "for enough shards to be started.", Arrays.toString(waitForIndices)); } // acknowledged maybe be false but some indices may have been correctly closed, so // we maintain a kind of coherency by overriding the shardsAcknowledged value // (see ShardsAcknowledgedResponse constructor) boolean shardsAcked = acknowledged ? shardsAcknowledged : false; - listener.onResponse(new CloseIndexResponse(acknowledged, shardsAcked)); + listener.onResponse(new CloseIndexResponse(acknowledged, shardsAcked, indices)); }, listener::onFailure); } else { - listener.onResponse(new CloseIndexResponse(acknowledged, false)); + listener.onResponse(new CloseIndexResponse(acknowledged, false, indices)); } } }), @@ -292,11 +286,11 @@ class WaitForClosedBlocksApplied extends AbstractRunnable { private final Map blockedIndices; private final CloseIndexClusterStateUpdateRequest request; - private final ActionListener> listener; + private final ActionListener> listener; private WaitForClosedBlocksApplied(final Map blockedIndices, final CloseIndexClusterStateUpdateRequest request, - final ActionListener> listener) { + final ActionListener> listener) { if (blockedIndices == null || blockedIndices.isEmpty()) { throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null"); } @@ -312,7 +306,7 @@ public void onFailure(final Exception e) { @Override protected void doRun() throws Exception { - final Map results = ConcurrentCollections.newConcurrentMap(); + final Map results = ConcurrentCollections.newConcurrentMap(); final CountDown countDown = new CountDown(blockedIndices.size()); final ClusterState state = clusterService.state(); blockedIndices.forEach((index, block) -> { @@ -325,47 +319,49 @@ protected void doRun() throws Exception { }); } - private void waitForShardsReadyForClosing(final Index index, final ClusterBlock closingBlock, - final ClusterState state, final Consumer onResponse) { + private void waitForShardsReadyForClosing(final Index index, + final ClusterBlock closingBlock, + final ClusterState state, + final Consumer onResponse) { final IndexMetaData indexMetaData = state.metaData().index(index); if (indexMetaData == null) { logger.debug("index {} has been blocked before closing and is now deleted, ignoring", index); - onResponse.accept(new AcknowledgedResponse(true)); + onResponse.accept(new IndexResult(index)); return; } final IndexRoutingTable indexRoutingTable = state.routingTable().index(index); if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) { assert state.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); logger.debug("index {} has been blocked before closing and is already closed, ignoring", index); - onResponse.accept(new AcknowledgedResponse(true)); + onResponse.accept(new IndexResult(index)); return; } final ImmutableOpenIntMap shards = indexRoutingTable.getShards(); - final AtomicArray results = new AtomicArray<>(shards.size()); + final AtomicArray results = new AtomicArray<>(shards.size()); final CountDown countDown = new CountDown(shards.size()); for (IntObjectCursor shard : shards) { final IndexShardRoutingTable shardRoutingTable = shard.value; - final ShardId shardId = shardRoutingTable.shardId(); + final int shardId = shardRoutingTable.shardId().id(); sendVerifyShardBeforeCloseRequest(shardRoutingTable, closingBlock, new NotifyOnceListener() { @Override public void innerOnResponse(final ReplicationResponse replicationResponse) { - ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo(); - results.setOnce(shardId.id(), new AcknowledgedResponse(shardInfo.getFailed() == 0)); + results.setOnce(shardId, new ShardResult(shardId, Arrays.stream(replicationResponse.getShardInfo().getFailures()) + .map(f -> new ShardResult.Failure(f.index(), f.shardId(), f.getCause(), f.nodeId())) + .toArray(ShardResult.Failure[]::new))); processIfFinished(); } @Override public void innerOnFailure(final Exception e) { - results.setOnce(shardId.id(), new AcknowledgedResponse(false)); + results.setOnce(shardId, new ShardResult(shardId, new ShardResult.Failure(index.getName(), shardId, e))); processIfFinished(); } private void processIfFinished() { if (countDown.countDown()) { - final boolean acknowledged = results.asList().stream().allMatch(AcknowledgedResponse::isAcknowledged); - onResponse.accept(new AcknowledgedResponse(acknowledged)); + onResponse.accept(new IndexResult(index, results.toArray(new ShardResult[results.length()]))); } } }); @@ -398,7 +394,7 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar */ static ClusterState closeRoutingTable(final ClusterState currentState, final Map blockedIndices, - final Map results) { + final Map results) { // Remove the index routing table of closed indices if the cluster is in a mixed version // that does not support the replication of closed indices @@ -409,9 +405,9 @@ static ClusterState closeRoutingTable(final ClusterState currentState, final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); final Set closedIndices = new HashSet<>(); - for (Map.Entry result : results.entrySet()) { + for (Map.Entry result : results.entrySet()) { final Index index = result.getKey(); - final boolean acknowledged = result.getValue().isAcknowledged(); + final boolean acknowledged = result.getValue().hasFailures() == false; try { if (acknowledged == false) { logger.debug("verification of shards before closing {} failed", index); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java index 6e855e492620c..6138dc7d97452 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java @@ -19,15 +19,28 @@ package org.elasticsearch.action.admin.indices.close; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.transport.ActionNotFoundTransportException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class CloseIndexResponseTests extends ESTestCase { @@ -53,11 +66,28 @@ public void testBwcSerialization() throws Exception { final AcknowledgedResponse deserializedResponse = new AcknowledgedResponse(); try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(out.getVersion()); deserializedResponse.readFrom(in); } assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); } } + { + final CloseIndexResponse response = randomResponse(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(Version.V_7_1_0); + response.writeTo(out); + + final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(out.getVersion()); + deserializedResponse.readFrom(in); + } + assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); + assertThat(deserializedResponse.isShardsAcknowledged(), equalTo(response.isShardsAcknowledged())); + assertThat(deserializedResponse.getIndices().isEmpty(), is(true)); + } + } { final AcknowledgedResponse response = new AcknowledgedResponse(randomBoolean()); try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -71,16 +101,125 @@ public void testBwcSerialization() throws Exception { assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); } } + { + final CloseIndexResponse response = randomResponse(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + response.writeTo(out); + + final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(Version.V_7_1_0); + deserializedResponse.readFrom(in); + } + assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); + assertThat(deserializedResponse.isShardsAcknowledged(), equalTo(response.isShardsAcknowledged())); + assertThat(deserializedResponse.getIndices().isEmpty(), is(true)); + } + } } private CloseIndexResponse randomResponse() { - final boolean acknowledged = randomBoolean(); + boolean acknowledged = true; + final String[] indicesNames = generateRandomStringArray(10, 10, false, true); + + final List indexResults = new ArrayList<>(); + for (String indexName : indicesNames) { + final Index index = new Index(indexName, "_na_"); + if (randomBoolean()) { + indexResults.add(new CloseIndexResponse.IndexResult(index)); + } else { + if (randomBoolean()) { + acknowledged = false; + indexResults.add(new CloseIndexResponse.IndexResult(index, randomException(index, 0))); + } else { + final int nbShards = randomIntBetween(1, 5); + CloseIndexResponse.ShardResult[] shards = new CloseIndexResponse.ShardResult[nbShards]; + for (int i = 0; i < nbShards; i++) { + CloseIndexResponse.ShardResult.Failure[] failures = null; + if (randomBoolean()) { + acknowledged = false; + failures = new CloseIndexResponse.ShardResult.Failure[randomIntBetween(1, 3)]; + for (int j = 0; j < failures.length; j++) { + String nodeId = randomAlphaOfLength(5); + failures[j] = new CloseIndexResponse.ShardResult.Failure(indexName, i, randomException(index, i), nodeId); + } + } + shards[i] = new CloseIndexResponse.ShardResult(i, failures); + } + indexResults.add(new CloseIndexResponse.IndexResult(index, shards)); + } + } + + } + final boolean shardsAcknowledged = acknowledged ? randomBoolean() : false; - return new CloseIndexResponse(acknowledged, shardsAcknowledged); + return new CloseIndexResponse(acknowledged, shardsAcknowledged, indexResults); + } + + private static ElasticsearchException randomException(final Index index, final int id) { + return randomFrom( + new IndexNotFoundException(index), + new ActionNotFoundTransportException("test"), + new NoShardAvailableActionException(new ShardId(index, id))); } private static void assertCloseIndexResponse(final CloseIndexResponse actual, final CloseIndexResponse expected) { assertThat(actual.isAcknowledged(), equalTo(expected.isAcknowledged())); assertThat(actual.isShardsAcknowledged(), equalTo(expected.isShardsAcknowledged())); + + for (int i = 0; i < expected.getIndices().size(); i++) { + CloseIndexResponse.IndexResult expectedIndexResult = expected.getIndices().get(i); + CloseIndexResponse.IndexResult actualIndexResult = actual.getIndices().get(i); + assertThat(actualIndexResult.getIndex(), equalTo(expectedIndexResult.getIndex())); + assertThat(actualIndexResult.hasFailures(), equalTo(expectedIndexResult.hasFailures())); + + if (expectedIndexResult.hasFailures() == false) { + assertThat(actualIndexResult.getException(), nullValue()); + if (actualIndexResult.getShards() != null) { + assertThat(Arrays.stream(actualIndexResult.getShards()) + .allMatch(shardResult -> shardResult.hasFailures() == false), is(true)); + } + } + + if (expectedIndexResult.getException() != null) { + assertThat(actualIndexResult.getShards(), nullValue()); + assertThat(actualIndexResult.getException(), notNullValue()); + assertThat(actualIndexResult.getException().getMessage(), equalTo(expectedIndexResult.getException().getMessage())); + assertThat(actualIndexResult.getException().getClass(), equalTo(expectedIndexResult.getException().getClass())); + assertArrayEquals(actualIndexResult.getException().getStackTrace(), expectedIndexResult.getException().getStackTrace()); + } else { + assertThat(actualIndexResult.getException(), nullValue()); + } + + if (expectedIndexResult.getShards() != null) { + assertThat(actualIndexResult.getShards().length, equalTo(expectedIndexResult.getShards().length)); + + for (int j = 0; j < expectedIndexResult.getShards().length; j++) { + CloseIndexResponse.ShardResult expectedShardResult = expectedIndexResult.getShards()[j]; + CloseIndexResponse.ShardResult actualShardResult = actualIndexResult.getShards()[j]; + assertThat(actualShardResult.getId(), equalTo(expectedShardResult.getId())); + assertThat(actualShardResult.hasFailures(), equalTo(expectedShardResult.hasFailures())); + + if (expectedShardResult.hasFailures()) { + assertThat(actualShardResult.getFailures().length, equalTo(expectedShardResult.getFailures().length)); + + for (int k = 0; k < expectedShardResult.getFailures().length; k++) { + CloseIndexResponse.ShardResult.Failure expectedFailure = expectedShardResult.getFailures()[k]; + CloseIndexResponse.ShardResult.Failure actualFailure = actualShardResult.getFailures()[k]; + assertThat(actualFailure.getNodeId(), equalTo(expectedFailure.getNodeId())); + assertThat(actualFailure.index(), equalTo(expectedFailure.index())); + assertThat(actualFailure.shardId(), equalTo(expectedFailure.shardId())); + assertThat(actualFailure.getCause().getMessage(), equalTo(expectedFailure.getCause().getMessage())); + assertThat(actualFailure.getCause().getClass(), equalTo(expectedFailure.getCause().getClass())); + assertArrayEquals(actualFailure.getCause().getStackTrace(), expectedFailure.getCause().getStackTrace()); + } + } else { + assertThat(actualShardResult.getFailures(), nullValue()); + } + } + } else { + assertThat(actualIndexResult.getShards(), nullValue()); + } + } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 8189e0b2b047d..d6660029e3dca 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -21,7 +21,8 @@ import com.google.common.collect.ImmutableList; import org.elasticsearch.Version; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.RestoreInProgress; @@ -81,7 +82,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { public void testCloseRoutingTable() { final Set nonBlockedIndices = new HashSet<>(); final Map blockedIndices = new HashMap<>(); - final Map results = new HashMap<>(); + final Map results = new HashMap<>(); ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTable")).build(); for (int i = 0; i < randomIntBetween(1, 25); i++) { @@ -93,8 +94,13 @@ public void testCloseRoutingTable() { } else { final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock(); state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock); - blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock); - results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean())); + final Index index = state.metaData().index(indexName).getIndex(); + blockedIndices.put(index, closingBlock); + if (randomBoolean()) { + results.put(index, new CloseIndexResponse.IndexResult(index)); + } else { + results.put(index, new CloseIndexResponse.IndexResult(index, new Exception("test"))); + } } } @@ -106,7 +112,7 @@ public void testCloseRoutingTable() { assertThat(updatedState.blocks().hasIndexBlockWithId(nonBlockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(false)); } for (Index blockedIndex : blockedIndices.keySet()) { - if (results.get(blockedIndex).isAcknowledged()) { + if (results.get(blockedIndex).hasFailures() == false) { assertIsClosed(blockedIndex.getName(), updatedState); } else { assertIsOpened(blockedIndex.getName(), updatedState); @@ -118,7 +124,7 @@ public void testCloseRoutingTable() { public void testCloseRoutingTableRemovesRoutingTable() { final Set nonBlockedIndices = new HashSet<>(); final Map blockedIndices = new HashMap<>(); - final Map results = new HashMap<>(); + final Map results = new HashMap<>(); final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock(); ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTableRemovesRoutingTable")).build(); @@ -130,8 +136,13 @@ public void testCloseRoutingTableRemovesRoutingTable() { nonBlockedIndices.add(state.metaData().index(indexName).getIndex()); } else { state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock); - blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock); - results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean())); + final Index index = state.metaData().index(indexName).getIndex(); + blockedIndices.put(index, closingBlock); + if (randomBoolean()) { + results.put(index, new CloseIndexResponse.IndexResult(index)); + } else { + results.put(index, new CloseIndexResponse.IndexResult(index, new Exception("test"))); + } } } @@ -151,7 +162,7 @@ public void testCloseRoutingTableRemovesRoutingTable() { assertThat(state.blocks().hasIndexBlockWithId(nonBlockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(false)); } for (Index blockedIndex : blockedIndices.keySet()) { - if (results.get(blockedIndex).isAcknowledged()) { + if (results.get(blockedIndex).hasFailures() == false) { IndexMetaData indexMetaData = state.metaData().index(blockedIndex); assertThat(indexMetaData.getState(), is(IndexMetaData.State.CLOSE)); Settings indexSettings = indexMetaData.getSettings(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java index 5ee6a7c60da3d..cf94f6b2d1554 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java @@ -18,7 +18,7 @@ */ package org.elasticsearch.cluster.metadata; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.index.Index; @@ -43,7 +43,7 @@ public static ClusterState addIndexClosedBlocks(final Index[] indices, final Map */ public static ClusterState closeRoutingTable(final ClusterState state, final Map blockedIndices, - final Map results) { + final Map results) { return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index c1e32be9d29af..c7ef8b766a8be 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -40,7 +41,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils; @@ -227,8 +227,8 @@ public ClusterState closeIndices(ClusterState state, CloseIndexRequest request) final Map blockedIndices = new HashMap<>(); ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, blockedIndices, state); - newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices, blockedIndices.keySet().stream() - .collect(Collectors.toMap(Function.identity(), index -> new AcknowledgedResponse(true)))); + newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices, + blockedIndices.keySet().stream().collect(Collectors.toMap(Function.identity(), CloseIndexResponse.IndexResult::new))); return allocationService.reroute(newState, "indices closed"); } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 62d72c3f71954..362cca92459ac 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequestBuilder; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; @@ -35,6 +36,7 @@ import org.elasticsearch.test.ESIntegTestCase; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; @@ -51,6 +53,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class CloseIndexIT extends ESIntegTestCase { @@ -94,7 +97,7 @@ public void testCloseIndex() throws Exception { indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); + assertBusy(() -> closeIndices(indexName)); assertIndexIsClosed(indexName); assertAcked(client().admin().indices().prepareOpen(indexName)); @@ -109,13 +112,17 @@ public void testCloseAlreadyClosedIndex() throws Exception { indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(1, 10)) .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); } - // First close should be acked - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); + // First close should be fully acked + assertBusy(() -> closeIndices(indexName)); assertIndexIsClosed(indexName); // Second close should be acked too final ActiveShardCount activeShardCount = randomFrom(ActiveShardCount.NONE, ActiveShardCount.DEFAULT, ActiveShardCount.ALL); - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(activeShardCount))); + assertBusy(() -> { + CloseIndexResponse response = client().admin().indices().prepareClose(indexName).setWaitForActiveShards(activeShardCount).get(); + assertAcked(response); + assertTrue(response.getIndices().isEmpty()); + }); assertIndexIsClosed(indexName); } @@ -129,7 +136,7 @@ public void testCloseUnassignedIndex() throws Exception { assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN)); assertThat(clusterState.routingTable().allShards().stream().allMatch(ShardRouting::unassigned), is(true)); - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE))); + assertBusy(() -> closeIndices(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE))); assertIndexIsClosed(indexName); } @@ -177,7 +184,7 @@ public void testCloseWhileIndexingDocuments() throws Exception { indexer.setAssertNoFailuresOnStop(false); waitForDocs(randomIntBetween(10, 50), indexer); - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); + assertBusy(() -> closeIndices(indexName)); indexer.stop(); nbDocs += indexer.totalIndexedDocs(); @@ -324,9 +331,42 @@ public void testCloseIndexWaitForActiveShards() throws Exception { assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.GREEN)); assertTrue(closeIndexResponse.isAcknowledged()); assertTrue(closeIndexResponse.isShardsAcknowledged()); + assertThat(closeIndexResponse.getIndices().get(0), notNullValue()); + assertThat(closeIndexResponse.getIndices().get(0).hasFailures(), is(false)); + assertThat(closeIndexResponse.getIndices().get(0).getIndex().getName(), equalTo(indexName)); assertIndexIsClosed(indexName); } + private static void closeIndices(final String... indices) { + closeIndices(client().admin().indices().prepareClose(indices)); + } + + private static void closeIndices(final CloseIndexRequestBuilder requestBuilder) { + final CloseIndexResponse response = requestBuilder.get(); + assertThat(response.isAcknowledged(), is(true)); + assertThat(response.isShardsAcknowledged(), is(true)); + + final String[] indices = requestBuilder.request().indices(); + if (indices != null) { + assertThat(response.getIndices().size(), equalTo(indices.length)); + for (String index : indices) { + CloseIndexResponse.IndexResult indexResult = response.getIndices().stream() + .filter(result -> index.equals(result.getIndex().getName())).findFirst().get(); + assertThat(indexResult, notNullValue()); + assertThat(indexResult.hasFailures(), is(false)); + assertThat(indexResult.getException(), nullValue()); + assertThat(indexResult.getShards(), notNullValue()); + Arrays.stream(indexResult.getShards()).forEach(shardResult -> { + assertThat(shardResult.hasFailures(), is(false)); + assertThat(shardResult.getFailures(), notNullValue()); + assertThat(shardResult.getFailures().length, equalTo(0)); + }); + } + } else { + assertThat(response.getIndices().size(), equalTo(0)); + } + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { From 18dbcdfc948962f1bcb0e60cb4e398ebd3830d0b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 18 Apr 2019 14:20:47 -0400 Subject: [PATCH 2/7] extract failure explicitly --- .../action/admin/indices/close/CloseIndexResponse.java | 2 +- .../cluster/metadata/MetaDataIndexStateService.java | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java index 188e20d30e3fa..4ed283b0aa4d9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java @@ -196,7 +196,7 @@ public static class ShardResult implements Writeable, ToXContentFragment { private final int id; private final ShardResult.Failure[] failures; - public ShardResult(final int id, final Failure... failures) { + public ShardResult(final int id, final Failure[] failures) { this.id = id; this.failures = failures; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index f044bb0225eca..6b848ee460f8e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -347,15 +347,17 @@ private void waitForShardsReadyForClosing(final Index index, sendVerifyShardBeforeCloseRequest(shardRoutingTable, closingBlock, new NotifyOnceListener() { @Override public void innerOnResponse(final ReplicationResponse replicationResponse) { - results.setOnce(shardId, new ShardResult(shardId, Arrays.stream(replicationResponse.getShardInfo().getFailures()) + ShardResult.Failure[] failures = Arrays.stream(replicationResponse.getShardInfo().getFailures()) .map(f -> new ShardResult.Failure(f.index(), f.shardId(), f.getCause(), f.nodeId())) - .toArray(ShardResult.Failure[]::new))); + .toArray(ShardResult.Failure[]::new); + results.setOnce(shardId, new ShardResult(shardId, failures)); processIfFinished(); } @Override public void innerOnFailure(final Exception e) { - results.setOnce(shardId, new ShardResult(shardId, new ShardResult.Failure(index.getName(), shardId, e))); + ShardResult.Failure failure = new ShardResult.Failure(index.getName(), shardId, e); + results.setOnce(shardId, new ShardResult(shardId, new ShardResult.Failure[]{failure})); processIfFinished(); } From f6bf77ee17e8c393bf71951a76f71fc7f9bfbe87 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 20 May 2019 15:11:24 -0400 Subject: [PATCH 3/7] shards -> shard --- .../action/admin/indices/close/CloseIndexResponse.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java index 9b8a409975e05..a84af2050bae2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java @@ -171,9 +171,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.endObject(); } else { builder.startObject("shards"); - for (ShardResult shards : shards) { - if (shards.hasFailures()) { - shards.toXContent(builder, params); + for (ShardResult shard : shards) { + if (shard.hasFailures()) { + shard.toXContent(builder, params); } } builder.endObject(); From 434696b11931b378863b47ed8b03760d5c9b12f2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 20 May 2019 16:20:33 -0400 Subject: [PATCH 4/7] close should fail if block disappeared --- .../metadata/MetaDataIndexStateService.java | 30 +++++++++++------ .../MetaDataIndexStateServiceTests.java | 33 +++++++++++++++++-- .../MetaDataIndexStateServiceUtils.java | 2 +- 3 files changed, 52 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 5d9e796af1b20..0cceab8135d58 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -53,6 +53,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.collect.ImmutableOpenIntMap; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -73,6 +74,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -150,13 +152,17 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) .execute(new WaitForClosedBlocksApplied(blockedIndices, request, - ActionListener.wrap(results -> + ActionListener.wrap(verifyResults -> clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) { + private final List indices = new ArrayList<>(); @Override public ClusterState execute(final ClusterState currentState) throws Exception { - final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results); - return allocationService.reroute(updatedState, "indices closed"); + Tuple> closingResult = + closeRoutingTable(currentState, blockedIndices, verifyResults); + assert verifyResults.size() == closingResult.v2().size(); + indices.addAll(closingResult.v2()); + return allocationService.reroute(closingResult.v1(), "indices closed"); } @Override @@ -168,7 +174,6 @@ public void onFailure(final String source, final Exception e) { public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { - final List indices = new ArrayList<>(results.values()); final boolean acknowledged = indices.stream().noneMatch(IndexResult::hasFailures); final String[] waitForIndices = indices.stream() .filter(result -> result.hasFailures() == false) @@ -394,9 +399,9 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar /** * Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing. */ - static ClusterState closeRoutingTable(final ClusterState currentState, - final Map blockedIndices, - final Map results) { + static Tuple> closeRoutingTable(final ClusterState currentState, + final Map blockedIndices, + final Map verifyResult) { // Remove the index routing table of closed indices if the cluster is in a mixed version // that does not support the replication of closed indices @@ -407,7 +412,8 @@ static ClusterState closeRoutingTable(final ClusterState currentState, final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); final Set closedIndices = new HashSet<>(); - for (Map.Entry result : results.entrySet()) { + Map closingResults = new HashMap<>(verifyResult); + for (Map.Entry result : verifyResult.entrySet()) { final Index index = result.getKey(); final boolean acknowledged = result.getValue().hasFailures() == false; try { @@ -422,7 +428,11 @@ static ClusterState closeRoutingTable(final ClusterState currentState, continue; } final ClusterBlock closingBlock = blockedIndices.get(index); + assert closingBlock != null; if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) { + // we should report error in this case as the index can be left as open. + closingResults.put(result.getKey(), new IndexResult(result.getKey(), new IllegalStateException( + "verification of shards before closing " + index + " succeeded but block has been removed in the meantime"))); logger.debug("verification of shards before closing {} succeeded but block has been removed in the meantime", index); continue; } @@ -448,9 +458,9 @@ static ClusterState closeRoutingTable(final ClusterState currentState, logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index); } } - logger.info("completed closing of indices {}", closedIndices); - return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build(); + return Tuple.tuple(ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build(), + closingResults.values()); } public void openIndex(final OpenIndexClusterStateUpdateRequest request, diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 95e80b2feac14..b655a98379553 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.test.ESTestCase; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -70,6 +71,7 @@ import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -103,7 +105,7 @@ public void testCloseRoutingTable() { } } - final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); + final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results).v1(); assertThat(updatedState.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size())); for (Index nonBlockedIndex : nonBlockedIndices) { @@ -153,7 +155,7 @@ public void testCloseRoutingTableRemovesRoutingTable() { new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())), Version.V_7_2_0))) .build(); - state = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); + state = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results).v1(); assertThat(state.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size())); for (Index nonBlockedIndex : nonBlockedIndices) { @@ -340,6 +342,33 @@ public void testIsIndexVerifiedBeforeClosed() { } } + public void testCloseFailedIfBlockDisappeared() { + ClusterState state = ClusterState.builder(new ClusterName("failedIfBlockDisappeared")).build(); + Map blockedIndices = new HashMap<>(); + int numIndices = between(1, 10); + Set disappearedIndices = new HashSet<>(); + Map verifyResults = new HashMap<>(); + for (int i = 0; i < numIndices; i++) { + String indexName = "test-" + i; + state = addOpenedIndex(indexName, randomIntBetween(1, 3), randomIntBetween(0, 3), state); + Index index = state.metaData().index(indexName).getIndex(); + state = MetaDataIndexStateService.addIndexClosedBlocks(new Index[]{index}, blockedIndices, state); + if (randomBoolean()) { + state = ClusterState.builder(state) + .blocks(ClusterBlocks.builder().blocks(state.blocks()).removeIndexBlocks(indexName).build()) + .build(); + disappearedIndices.add(index); + } + verifyResults.put(index, new IndexResult(index)); + } + Collection closingResults = + MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, unmodifiableMap(verifyResults)).v2(); + assertThat(closingResults, hasSize(numIndices)); + Set failedIndices = closingResults.stream().filter(IndexResult::hasFailures) + .map(IndexResult::getIndex).collect(Collectors.toSet()); + assertThat(failedIndices, equalTo(disappearedIndices)); + } + public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas, int closedIndexShards, int closedIndexReplicas, Settings clusterSettings) { ImmutableOpenMap.Builder dataNodes = ImmutableOpenMap.builder(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java index cf94f6b2d1554..7c94a42bd0cb5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java @@ -44,6 +44,6 @@ public static ClusterState addIndexClosedBlocks(final Index[] indices, final Map public static ClusterState closeRoutingTable(final ClusterState state, final Map blockedIndices, final Map results) { - return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); + return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results).v1(); } } From a90aaeddeddb4053aad006bf19c89554b046f303 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 20 May 2019 16:21:44 -0400 Subject: [PATCH 5/7] shards -> failedShards --- .../action/admin/indices/close/CloseIndexResponse.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java index a84af2050bae2..f74420b1400aa 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java @@ -170,7 +170,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa ElasticsearchException.generateFailureXContent(builder, params, exception, true); builder.endObject(); } else { - builder.startObject("shards"); + builder.startObject("failedShards"); for (ShardResult shard : shards) { if (shard.hasFailures()) { shard.toXContent(builder, params); From 1c9ddf0bc691a597e580148dcc6bb491280f1404 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 22 May 2019 09:20:51 -0400 Subject: [PATCH 6/7] remove ctor --- .../action/admin/indices/close/CloseIndexResponse.java | 6 +----- .../admin/indices/close/TransportCloseIndexAction.java | 4 +++- .../cluster/metadata/MetaDataIndexStateService.java | 3 ++- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java index f74420b1400aa..ea7d14655c594 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java @@ -45,13 +45,9 @@ public class CloseIndexResponse extends ShardsAcknowledgedResponse { CloseIndexResponse() { } - public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged) { - this(acknowledged, shardsAcknowledged, emptyList()); - } - public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged, final List indices) { super(acknowledged, shardsAcknowledged); - this.indices = Objects.requireNonNull(indices); + this.indices = unmodifiableList(Objects.requireNonNull(indices)); } public List getIndices() { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index a6f4b6f3d0c4a..3c231d13845b2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -40,6 +40,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Collections; + /** * Close index action */ @@ -109,7 +111,7 @@ protected void masterOperation(final Task task, final ActionListener listener) throws Exception { final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); if (concreteIndices == null || concreteIndices.length == 0) { - listener.onResponse(new CloseIndexResponse(true, false)); + listener.onResponse(new CloseIndexResponse(true, false, Collections.emptyList())); return; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 0cceab8135d58..ef4583e98e544 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -75,6 +75,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -147,7 +148,7 @@ public ClusterState execute(final ClusterState currentState) { public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { if (oldState == newState) { assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed"; - listener.onResponse(new CloseIndexResponse(true, false)); + listener.onResponse(new CloseIndexResponse(true, false, Collections.emptyList())); } else { assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) From de4e32db5036687a104bf7894c82fa78919d972a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 22 May 2019 10:18:19 -0400 Subject: [PATCH 7/7] serialize tests --- .../close/CloseIndexResponseTests.java | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java index 9554a99d7f58c..40c34af51598d 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java @@ -29,15 +29,17 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.transport.ActionNotFoundTransportException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.elasticsearch.test.VersionUtils.getPreviousVersion; import static org.elasticsearch.test.VersionUtils.randomVersionBetween; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -61,7 +63,7 @@ public void testBwcSerialization() throws Exception { { final CloseIndexResponse response = randomResponse(); try (BytesStreamOutput out = new BytesStreamOutput()) { - out.setVersion(randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_1_0)); + out.setVersion(randomVersionBetween(random(), Version.V_7_0_0, getPreviousVersion(Version.V_7_2_0))); response.writeTo(out); final AcknowledgedResponse deserializedResponse = new AcknowledgedResponse(); @@ -72,22 +74,6 @@ public void testBwcSerialization() throws Exception { assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); } } - { - final CloseIndexResponse response = randomResponse(); - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.setVersion(Version.V_7_1_0); - response.writeTo(out); - - final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); - try (StreamInput in = out.bytes().streamInput()) { - in.setVersion(out.getVersion()); - deserializedResponse.readFrom(in); - } - assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); - assertThat(deserializedResponse.isShardsAcknowledged(), equalTo(response.isShardsAcknowledged())); - assertThat(deserializedResponse.getIndices().isEmpty(), is(true)); - } - } { final AcknowledgedResponse response = new AcknowledgedResponse(randomBoolean()); try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -95,7 +81,7 @@ public void testBwcSerialization() throws Exception { final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); try (StreamInput in = out.bytes().streamInput()) { - in.setVersion(randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_1_0)); + in.setVersion(randomVersionBetween(random(), Version.V_7_0_0, getPreviousVersion(Version.V_7_2_0))); deserializedResponse.readFrom(in); } assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); @@ -104,16 +90,21 @@ public void testBwcSerialization() throws Exception { { final CloseIndexResponse response = randomResponse(); try (BytesStreamOutput out = new BytesStreamOutput()) { + Version version = randomVersionBetween(random(), Version.V_7_2_0, Version.CURRENT); + out.setVersion(version); response.writeTo(out); - final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); try (StreamInput in = out.bytes().streamInput()) { - in.setVersion(Version.V_7_1_0); + in.setVersion(version); deserializedResponse.readFrom(in); } assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); assertThat(deserializedResponse.isShardsAcknowledged(), equalTo(response.isShardsAcknowledged())); - assertThat(deserializedResponse.getIndices().isEmpty(), is(true)); + if (version.onOrAfter(Version.V_8_0_0)) { + assertThat(deserializedResponse.getIndices(), hasSize(response.getIndices().size())); + } else { + assertThat(deserializedResponse.getIndices(), empty()); + } } } }