From 07d914498c74af26fbe7b7af2884357295424371 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 18 Dec 2018 15:05:53 +0100 Subject: [PATCH 01/13] [Close Index API] Add UUID to ClusterBlock --- ...TransportVerifyShardBeforeCloseAction.java | 40 +++++-- .../cluster/block/ClusterBlock.java | 60 ++++++---- .../cluster/block/ClusterBlocks.java | 26 ++++- .../metadata/MetaDataIndexStateService.java | 71 +++++++----- ...portVerifyShardBeforeCloseActionTests.java | 11 +- .../cluster/block/ClusterBlockTests.java | 104 ++++++++++++------ .../MetaDataIndexStateServiceTests.java | 66 ++++++----- .../MetaDataIndexStateServiceUtils.java | 16 ++- .../indices/cluster/ClusterStateChanges.java | 7 +- .../indices/state/CloseIndexIT.java | 70 +++++++++++- .../DedicatedClusterSnapshotRestoreIT.java | 2 +- .../SharedClusterSnapshotRestoreIT.java | 4 +- .../hamcrest/ElasticsearchAssertions.java | 22 +++- 13 files changed, 356 insertions(+), 143 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index de5e372dd1625..309825ee7b460 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -28,9 +28,10 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; @@ -39,11 +40,13 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.util.Objects; + public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction< TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> { public static final String NAME = CloseIndexAction.NAME + "[s]"; - public static final ClusterBlock EXPECTED_BLOCK = MetaDataIndexStateService.INDEX_CLOSED_BLOCK; @Inject public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService, @@ -79,25 +82,25 @@ protected void acquireReplicaOperationPermit(final IndexShard replica, @Override protected PrimaryResult shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary) throws Exception { - executeShardOperation(primary); + executeShardOperation(shardRequest, primary); return new PrimaryResult<>(shardRequest, new ReplicationResponse()); } @Override protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception { - executeShardOperation(replica); + executeShardOperation(shardRequest, replica); return new ReplicaResult(); } - private void executeShardOperation(final IndexShard indexShard) { + private void executeShardOperation(final ShardRequest request,final IndexShard indexShard) { final ShardId shardId = indexShard.shardId(); if (indexShard.getActiveOperationsCount() != 0) { throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing"); } final ClusterBlocks clusterBlocks = clusterService.state().blocks(); - if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), EXPECTED_BLOCK) == false) { - throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + EXPECTED_BLOCK + " before closing"); + if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { + throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing"); } final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); @@ -111,16 +114,35 @@ private void executeShardOperation(final IndexShard indexShard) { public static class ShardRequest extends ReplicationRequest { + private ClusterBlock clusterBlock; + ShardRequest(){ } - public ShardRequest(final ShardId shardId) { + public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock) { super(shardId); + this.clusterBlock = Objects.requireNonNull(clusterBlock); } @Override public String toString() { - return "verify shard before close {" + shardId + "}"; + return "verify shard " + shardId + " before close with block " + clusterBlock; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + clusterBlock = ClusterBlock.readClusterBlock(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + clusterBlock.writeTo(out); + } + + public ClusterBlock clusterBlock() { + return clusterBlock; } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java index fafd397722025..25e828dfe18f8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.block; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -30,29 +31,31 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.Locale; +import java.util.Objects; public class ClusterBlock implements Streamable, ToXContentFragment { private int id; - + private String uuid; private String description; - private EnumSet levels; - private boolean retryable; - private boolean disableStatePersistence = false; - private boolean allowReleaseResources; - private RestStatus status; - ClusterBlock() { + private ClusterBlock() { + } + + public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, + boolean allowReleaseResources, RestStatus status, EnumSet levels) { + this(id, null, description, retryable, disableStatePersistence, allowReleaseResources, status, levels); } - public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, boolean allowReleaseResources, - RestStatus status, EnumSet levels) { + public ClusterBlock(int id, String uuid, String description, boolean retryable, boolean disableStatePersistence, + boolean allowReleaseResources, RestStatus status, EnumSet levels) { this.id = id; + this.uuid = uuid; this.description = description; this.retryable = retryable; this.disableStatePersistence = disableStatePersistence; @@ -65,6 +68,10 @@ public int id() { return this.id; } + public String uuid() { + return uuid; + } + public String description() { return this.description; } @@ -104,6 +111,9 @@ public boolean disableStatePersistence() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Integer.toString(id)); + if (uuid != null) { + builder.field("uuid", uuid); + } builder.field("description", description); builder.field("retryable", retryable); if (disableStatePersistence) { @@ -127,6 +137,11 @@ public static ClusterBlock readClusterBlock(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException { id = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + uuid = in.readOptionalString(); + } else { + uuid = null; + } description = in.readString(); final int len = in.readVInt(); ArrayList levels = new ArrayList<>(len); @@ -143,6 +158,9 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeOptionalString(uuid); + } out.writeString(description); out.writeVInt(levels.size()); for (ClusterBlockLevel level : levels) { @@ -157,7 +175,11 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(id).append(",").append(description).append(", blocks "); + sb.append(id).append(","); + if (uuid != null) { + sb.append(uuid).append(','); + } + sb.append(description).append(", blocks "); String delimiter = ""; for (ClusterBlockLevel level : levels) { sb.append(delimiter).append(level.name()); @@ -168,19 +190,19 @@ public String toString() { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ClusterBlock that = (ClusterBlock) o; - - if (id != that.id) return false; - - return true; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ClusterBlock that = (ClusterBlock) o; + return id == that.id && Objects.equals(uuid, that.uuid); } @Override public int hashCode() { - return id; + return Objects.hash(id, uuid); } public boolean isAllowReleaseResources() { diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java index 6343c2f72746d..8937ec261743c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.block; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -148,6 +147,18 @@ public boolean hasIndexBlock(String index, ClusterBlock block) { return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block); } + public boolean hasIndexBlock(String index, int blockId) { + final Set clusterBlocks = indicesBlocks.get(index); + if (clusterBlocks != null) { + for (ClusterBlock clusterBlock : clusterBlocks) { + if (clusterBlock.id() == blockId) { + return true; + } + } + } + return false; + } + public void globalBlockedRaiseException(ClusterBlockLevel level) throws ClusterBlockException { ClusterBlockException blockException = globalBlockedException(level); if (blockException != null) { @@ -335,7 +346,7 @@ public Builder blocks(ClusterBlocks blocks) { public Builder addBlocks(IndexMetaData indexMetaData) { String indexName = indexMetaData.getIndex().getName(); if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - addIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + addIndexBlock(indexName, MetaDataIndexStateService.createIndexClosedBlock()); } if (IndexMetaData.INDEX_READ_ONLY_SETTING.get(indexMetaData.getSettings())) { addIndexBlock(indexName, IndexMetaData.INDEX_READ_ONLY_BLOCK); @@ -404,6 +415,17 @@ public Builder removeIndexBlock(String index, ClusterBlock block) { return this; } + public Builder removeIndexBlock(String index, int blockId) { + if (!indices.containsKey(index)) { + return this; + } + indices.get(index).removeIf(block -> block.id() == blockId); + if (indices.get(index).isEmpty()) { + indices.remove(index); + } + return this; + } + public ClusterBlocks build() { // We copy the block sets here in case of the builder is modified after build is called ImmutableOpenMap.Builder> indicesBuilder = ImmutableOpenMap.builder(indices.size()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index cda8f9c6f0ac6..141e10bc3e98c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -48,6 +48,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.inject.Inject; @@ -83,8 +84,7 @@ public class MetaDataIndexStateService { private static final Logger logger = LogManager.getLogger(MetaDataIndexStateService.class); - public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, - false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE); + public static final int INDEX_CLOSED_BLOCK_ID = 4; private final ClusterService clusterService; private final AllocationService allocationService; @@ -123,6 +123,9 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina final TimeValue timeout = request.ackTimeout(); final TimeValue masterTimeout = request.masterNodeTimeout(); + // Creates a unique cluster block + final ClusterBlock closingBlock = createIndexClosedBlock(); + clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices), new ClusterStateUpdateTask(Priority.URGENT) { @@ -130,7 +133,7 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina @Override public ClusterState execute(final ClusterState currentState) { - return addIndexClosedBlocks(concreteIndices, currentState, blockedIndices); + return addIndexClosedBlocks(concreteIndices, closingBlock, currentState, blockedIndices); } @Override @@ -141,12 +144,12 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta } else { assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) - .execute(new WaitForClosedBlocksApplied(blockedIndices, timeout, - ActionListener.wrap(closedBlocksResults -> + .execute(new WaitForClosedBlocksApplied(blockedIndices, closingBlock, timeout, + ActionListener.wrap(results -> clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(final ClusterState currentState) throws Exception { - final ClusterState updatedState = closeRoutingTable(currentState, closedBlocksResults); + final ClusterState updatedState = closeRoutingTable(currentState, closingBlock, results); return allocationService.reroute(updatedState, "indices closed"); } @@ -158,8 +161,7 @@ public void onFailure(final String source, final Exception e) { @Override public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { - boolean acknowledged = closedBlocksResults.values().stream() - .allMatch(AcknowledgedResponse::isAcknowledged); + boolean acknowledged = results.values().stream().allMatch(AcknowledgedResponse::isAcknowledged); listener.onResponse(new AcknowledgedResponse(acknowledged)); } }), @@ -185,11 +187,12 @@ public TimeValue timeout() { /** * Step 1 - Start closing indices by adding a write block * - * This step builds the list of indices to close (the ones explicitly requested that are not in CLOSE state) and adds the index block - * {@link #INDEX_CLOSED_BLOCK} to every index to close in the cluster state. After the cluster state is published, the shards should + * This step builds the list of indices to close (the ones explicitly requested that are not in CLOSE state) and adds the unique + * cluster block to every index to close in the cluster state. After the cluster state is published, the shards should * start to reject writing operations and we can proceed with step 2. */ - static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterState currentState, final Set blockedIndices) { + static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterBlock closingBlock, + final ClusterState currentState, final Set blockedIndices) { final MetaData.Builder metadata = MetaData.builder(currentState.metaData()); final Set indicesToClose = new HashSet<>(); @@ -220,9 +223,7 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterSta for (IndexMetaData indexToClose : indicesToClose) { final Index index = indexToClose.getIndex(); - if (currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK) == false) { - blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); - } + blocks.addIndexBlock(index.getName(), closingBlock); if (useDirectClose) { logger.debug("closing index {} directly", index); metadata.put(IndexMetaData.builder(indexToClose).state(IndexMetaData.State.CLOSE)); @@ -246,16 +247,19 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterSta class WaitForClosedBlocksApplied extends AbstractRunnable { private final Set blockedIndices; + private final ClusterBlock clusterBlock; private final @Nullable TimeValue timeout; private final ActionListener> listener; private WaitForClosedBlocksApplied(final Set blockedIndices, + final ClusterBlock clusterBlock, final @Nullable TimeValue timeout, final ActionListener> listener) { if (blockedIndices == null || blockedIndices.isEmpty()) { throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices"); } this.blockedIndices = blockedIndices; + this.clusterBlock = clusterBlock; this.listener = listener; this.timeout = timeout; } @@ -271,7 +275,7 @@ protected void doRun() throws Exception { final CountDown countDown = new CountDown(blockedIndices.size()); final ClusterState state = clusterService.state(); for (Index blockedIndex : blockedIndices) { - waitForShardsReadyForClosing(blockedIndex, state, timeout, response -> { + waitForShardsReadyForClosing(blockedIndex, state, response -> { results.put(blockedIndex, response); if (countDown.countDown()) { listener.onResponse(unmodifiableMap(results)); @@ -280,7 +284,7 @@ protected void doRun() throws Exception { } } - private void waitForShardsReadyForClosing(final Index index, final ClusterState state, @Nullable final TimeValue timeout, + private void waitForShardsReadyForClosing(final Index index, final ClusterState state, final Consumer onResponse) { final IndexMetaData indexMetaData = state.metaData().index(index); if (indexMetaData == null) { @@ -290,6 +294,7 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState } final IndexRoutingTable indexRoutingTable = state.routingTable().index(index); if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) { + assert state.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK_ID); logger.debug("index {} has been blocked before closing and is already closed, ignoring", index); onResponse.accept(new AcknowledgedResponse(true)); return; @@ -302,7 +307,7 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState for (IntObjectCursor shard : shards) { final IndexShardRoutingTable shardRoutingTable = shard.value; final ShardId shardId = shardRoutingTable.shardId(); - sendVerifyShardBeforeCloseRequest(shardRoutingTable, timeout, new NotifyOnceListener() { + sendVerifyShardBeforeCloseRequest(shardRoutingTable, new NotifyOnceListener() { @Override public void innerOnResponse(final ReplicationResponse replicationResponse) { ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo(); @@ -326,7 +331,7 @@ private void processIfFinished() { } } - private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, @Nullable final TimeValue timeout, + private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, final ActionListener listener) { final ShardId shardId = shardRoutingTable.shardId(); if (shardRoutingTable.primaryShard().unassigned()) { @@ -337,7 +342,7 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar return; } final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock); if (timeout != null) { shardRequest.timeout(timeout); } @@ -349,7 +354,9 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar /** * Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing. */ - static ClusterState closeRoutingTable(final ClusterState currentState, final Map results) { + static ClusterState closeRoutingTable(final ClusterState currentState, + final ClusterBlock closingBlock, + final Map results) { final MetaData.Builder metadata = MetaData.builder(currentState.metaData()); final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); @@ -357,20 +364,23 @@ static ClusterState closeRoutingTable(final ClusterState currentState, final Map final Set closedIndices = new HashSet<>(); for (Map.Entry result : results.entrySet()) { final Index index = result.getKey(); + final boolean acknowledged = result.getValue().isAcknowledged(); try { final IndexMetaData indexMetaData = metadata.getSafe(index); - if (indexMetaData.getState() != IndexMetaData.State.CLOSE) { - if (result.getValue().isAcknowledged()) { + assert currentState.blocks().hasIndexBlock(index.getName(), closingBlock); + if (acknowledged) { + if (indexMetaData.getState() != IndexMetaData.State.CLOSE) { logger.debug("closing index {} succeed, removing index routing table", index); metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE)); routingTable.remove(index.getName()); closedIndices.add(index.getName()); } else { - logger.debug("closing index {} failed, removing index block because: {}", index, result.getValue()); - blocks.removeIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); + logger.debug("index {} already closed, removing index block {}", index, closingBlock); + blocks.removeIndexBlock(index.getName(), closingBlock); } } else { - logger.debug("index {} has been closed since it was blocked before closing, ignoring", index); + logger.debug("closing index {} failed, removing index block {}", index, closingBlock); + blocks.removeIndexBlock(index.getName(), closingBlock); } } catch (final IndexNotFoundException e) { logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index); @@ -449,7 +459,7 @@ public ClusterState execute(ClusterState currentState) { } mdBuilder.put(indexMetaData, true); - blocksBuilder.removeIndexBlock(indexName, INDEX_CLOSED_BLOCK); + blocksBuilder.removeIndexBlock(indexName, INDEX_CLOSED_BLOCK_ID); } ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build(); @@ -494,4 +504,13 @@ private static int getTotalShardCount(ClusterState state, Index index) { return indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas()); } + /** + * @return Generates a {@link ClusterBlock} that blocks read and write operations on closed or soon-to-be-closed indices. The + * cluster block is generated with the id value equals to {@link #INDEX_CLOSED_BLOCK_ID} and a unique UUID. + */ + public static ClusterBlock createIndexClosedBlock() { + return new ClusterBlock(INDEX_CLOSED_BLOCK_ID, UUIDs.randomBase64UUID(), "index closed", false, false, false, + RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE); + } + } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index de0cc5dfd5a37..b201b79e1a152 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -23,8 +23,10 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.seqno.SeqNoStats; @@ -37,7 +39,6 @@ import org.elasticsearch.transport.TransportService; import org.junit.Before; -import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -50,6 +51,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { private IndexShard indexShard; private TransportVerifyShardBeforeCloseAction action; private ClusterService clusterService; + private ClusterBlock clusterBlock; @Override @Before @@ -64,9 +66,10 @@ public void setUp() throws Exception { final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3)); when(indexShard.shardId()).thenReturn(shardId); + clusterBlock = MetaDataIndexStateService.createIndexClosedBlock(); clusterService = mock(ClusterService.class); when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test")) - .blocks(ClusterBlocks.builder().addIndexBlock("index", INDEX_CLOSED_BLOCK).build()).build()); + .blocks(ClusterBlocks.builder().addIndexBlock("index", clusterBlock).build()).build()); action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, mock(TransportService.class), clusterService, mock(IndicesService.class), mock(ThreadPool.class), mock(ShardStateAction.class), mock(ActionFilters.class), @@ -75,7 +78,7 @@ public void setUp() throws Exception { private void executeOnPrimaryOrReplica() throws Exception { final TransportVerifyShardBeforeCloseAction.ShardRequest request = - new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId()); + new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock); if (randomBoolean()) { assertNotNull(action.shardOperationOnPrimary(request, indexShard)); } else { @@ -102,7 +105,7 @@ public void testOperationFailsWithNoBlock() { IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); assertThat(exception.getMessage(), - equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + INDEX_CLOSED_BLOCK + " before closing")); + equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + clusterBlock + " before closing")); verify(indexShard, times(0)).flush(any(FlushRequest.class)); } diff --git a/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java b/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java index a84d160cf0c95..bdddc1b820174 100644 --- a/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java @@ -20,36 +20,32 @@ package org.elasticsearch.cluster.block; import org.elasticsearch.Version; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.List; +import static org.elasticsearch.test.VersionUtils.getPreviousVersion; import static org.elasticsearch.test.VersionUtils.randomVersion; +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; public class ClusterBlockTests extends ESTestCase { + public void testSerialization() throws Exception { - int iterations = randomIntBetween(10, 100); + int iterations = randomIntBetween(5, 20); for (int i = 0; i < iterations; i++) { - // Get a random version Version version = randomVersion(random()); - - // Get a random list of ClusterBlockLevels - EnumSet levels = EnumSet.noneOf(ClusterBlockLevel.class); - int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length); - for (int j = 0; j < nbLevels; j++) { - levels.add(randomFrom(ClusterBlockLevel.values())); - } - - ClusterBlock clusterBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(), - randomBoolean(), false, randomFrom(RestStatus.values()), levels); + ClusterBlock clusterBlock = randomClusterBlock(version); BytesStreamOutput out = new BytesStreamOutput(); out.setVersion(version); @@ -59,37 +55,81 @@ public void testSerialization() throws Exception { in.setVersion(version); ClusterBlock result = ClusterBlock.readClusterBlock(in); - assertThat(result.id(), equalTo(clusterBlock.id())); - assertThat(result.status(), equalTo(clusterBlock.status())); - assertThat(result.description(), equalTo(clusterBlock.description())); - assertThat(result.retryable(), equalTo(clusterBlock.retryable())); - assertThat(result.disableStatePersistence(), equalTo(clusterBlock.disableStatePersistence())); - assertArrayEquals(result.levels().toArray(), clusterBlock.levels().toArray()); + assertClusterBlockEquals(clusterBlock, result); } } - public void testToStringDanglingComma() { - EnumSet levels = EnumSet.noneOf(ClusterBlockLevel.class); - int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length); - for (int j = 0; j < nbLevels; j++) { - levels.add(randomFrom(ClusterBlockLevel.values())); + public void testBwcSerialization() throws Exception { + for (int runs = 0; runs < randomIntBetween(5, 20); runs++) { + // Generate a random cluster block in version < 7.0.0 + final Version version = randomVersionBetween(random(), Version.V_6_0_0, getPreviousVersion(Version.V_7_0_0)); + final ClusterBlock expected = randomClusterBlock(version); + assertNull(expected.uuid()); + + // Serialize to node in current version + final BytesStreamOutput out = new BytesStreamOutput(); + expected.writeTo(out); + + // Deserialize and check the cluster block + final ClusterBlock actual = ClusterBlock.readClusterBlock(out.bytes().streamInput()); + assertClusterBlockEquals(expected, actual); + } + + for (int runs = 0; runs < randomIntBetween(5, 20); runs++) { + // Generate a random cluster block in current version + final ClusterBlock expected = randomClusterBlock(Version.CURRENT); + + // Serialize to node in version < 7.0.0 + final BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(randomVersionBetween(random(), Version.V_6_0_0, getPreviousVersion(Version.V_7_0_0))); + expected.writeTo(out); + + // Deserialize and check the cluster block + final StreamInput in = out.bytes().streamInput(); + in.setVersion(out.getVersion()); + final ClusterBlock actual = ClusterBlock.readClusterBlock(in); + + assertThat(actual.id(), equalTo(expected.id())); + assertThat(actual.status(), equalTo(expected.status())); + assertThat(actual.description(), equalTo(expected.description())); + assertThat(actual.retryable(), equalTo(expected.retryable())); + assertThat(actual.disableStatePersistence(), equalTo(expected.disableStatePersistence())); + assertArrayEquals(actual.levels().toArray(), expected.levels().toArray()); } - ClusterBlock clusterBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(), - randomBoolean(), false, randomFrom(RestStatus.values()), levels); + } + + public void testToStringDanglingComma() { + final ClusterBlock clusterBlock = randomClusterBlock(); assertThat(clusterBlock.toString(), not(endsWith(","))); } public void testGlobalBlocksCheckedIfNoIndicesSpecified() { - EnumSet levels = EnumSet.noneOf(ClusterBlockLevel.class); - int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length); - for (int j = 0; j < nbLevels; j++) { - levels.add(randomFrom(ClusterBlockLevel.values())); - } - ClusterBlock globalBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(), - randomBoolean(), false, randomFrom(RestStatus.values()), levels); + ClusterBlock globalBlock = randomClusterBlock(); ClusterBlocks clusterBlocks = new ClusterBlocks(Collections.singleton(globalBlock), ImmutableOpenMap.of()); ClusterBlockException exception = clusterBlocks.indicesBlockedException(randomFrom(globalBlock.levels()), new String[0]); assertNotNull(exception); assertEquals(exception.blocks(), Collections.singleton(globalBlock)); } + + private ClusterBlock randomClusterBlock() { + return randomClusterBlock(randomVersion(random())); + } + + private ClusterBlock randomClusterBlock(final Version version) { + final String uuid = (version.onOrAfter(Version.V_7_0_0) && randomBoolean()) ? UUIDs.randomBase64UUID() : null; + final List levels = Arrays.asList(ClusterBlockLevel.values()); + return new ClusterBlock(randomInt(), uuid, "cluster block #" + randomInt(), randomBoolean(), randomBoolean(), randomBoolean(), + randomFrom(RestStatus.values()), EnumSet.copyOf(randomSubsetOf(randomIntBetween(1, levels.size()), levels))); + } + + private void assertClusterBlockEquals(final ClusterBlock expected, final ClusterBlock actual) { + assertEquals(expected, actual); + assertThat(actual.id(), equalTo(expected.id())); + assertThat(actual.uuid(), equalTo(expected.uuid())); + assertThat(actual.status(), equalTo(expected.status())); + assertThat(actual.description(), equalTo(expected.description())); + assertThat(actual.retryable(), equalTo(expected.retryable())); + assertThat(actual.disableStatePersistence(), equalTo(expected.disableStatePersistence())); + assertArrayEquals(actual.levels().toArray(), expected.levels().toArray()); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 6faaf8e133800..40d1a7144c001 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -59,6 +59,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.createIndexClosedBlock; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; import static org.hamcrest.Matchers.containsString; @@ -74,6 +75,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { public void testCloseRoutingTable() { final Set nonBlockedIndices = new HashSet<>(); final Map blockedIndices = new HashMap<>(); + final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosedBlock(); ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTable")).build(); for (int i = 0; i < randomIntBetween(1, 25); i++) { @@ -83,12 +85,12 @@ public void testCloseRoutingTable() { state = addOpenedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state); nonBlockedIndices.add(state.metaData().index(indexName).getIndex()); } else { - state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state); + state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock); blockedIndices.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean())); } } - final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices); + final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, closingBlock, blockedIndices); assertThat(updatedState.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size())); for (Index nonBlockedIndex : nonBlockedIndices) { @@ -96,7 +98,7 @@ public void testCloseRoutingTable() { } for (Map.Entry blockedIndex : blockedIndices.entrySet()) { if (blockedIndex.getValue().isAcknowledged()) { - assertIsClosed(blockedIndex.getKey().getName(), updatedState); + assertIsClosed(blockedIndex.getKey().getName(), updatedState, closingBlock); } else { assertIsOpened(blockedIndex.getKey().getName(), updatedState); } @@ -105,17 +107,19 @@ public void testCloseRoutingTable() { public void testAddIndexClosedBlocks() { final ClusterState initialState = ClusterState.builder(new ClusterName("testAddIndexClosedBlocks")).build(); + final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosedBlock(); { final Set blockedIndices = new HashSet<>(); + Index[] indices = new Index[]{new Index("_name", "_uid")}; expectThrows(IndexNotFoundException.class, () -> - MetaDataIndexStateService.addIndexClosedBlocks(new Index[]{new Index("_name", "_uid")}, initialState, blockedIndices)); + MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, initialState, blockedIndices)); assertTrue(blockedIndices.isEmpty()); } { final Set blockedIndices = new HashSet<>(); Index[] indices = Index.EMPTY_ARRAY; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, initialState, blockedIndices); + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, initialState, blockedIndices); assertSame(initialState, updatedState); assertTrue(blockedIndices.isEmpty()); } @@ -124,7 +128,7 @@ public void testAddIndexClosedBlocks() { ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState); Index[] indices = new Index[]{state.metaData().index("closed").getIndex()}; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices); + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, blockedIndices); assertSame(state, updatedState); assertTrue(blockedIndices.isEmpty()); } @@ -134,11 +138,11 @@ public void testAddIndexClosedBlocks() { state = addOpenedIndex("opened", randomIntBetween(1, 3), randomIntBetween(0, 3), state); Index[] indices = new Index[]{state.metaData().index("opened").getIndex(), state.metaData().index("closed").getIndex()}; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices); + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, blockedIndices); assertNotSame(state, updatedState); assertTrue(blockedIndices.contains(updatedState.metaData().index("opened").getIndex())); assertFalse(blockedIndices.contains(updatedState.metaData().index("closed").getIndex())); - assertIsBlocked("opened", updatedState, true); + assertBlocked("opened", updatedState, closingBlock); } { IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { @@ -150,7 +154,7 @@ public void testAddIndexClosedBlocks() { state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state); } Index[] indices = new Index[]{state.metaData().index("restored").getIndex()}; - MetaDataIndexStateService.addIndexClosedBlocks(indices, state, new HashSet<>()); + MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, new HashSet<>()); }); assertThat(exception.getMessage(), containsString("Cannot close indices that are being restored: [[restored]]")); } @@ -164,7 +168,7 @@ public void testAddIndexClosedBlocks() { state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state); } Index[] indices = new Index[]{state.metaData().index("snapshotted").getIndex()}; - MetaDataIndexStateService.addIndexClosedBlocks(indices, state, new HashSet<>()); + MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, new HashSet<>()); }); assertThat(exception.getMessage(), containsString("Cannot close indices that are being snapshotted: [[snapshotted]]")); } @@ -184,19 +188,16 @@ public void testAddIndexClosedBlocks() { Index[] indices = new Index[]{state.metaData().index("index-1").getIndex(), state.metaData().index("index-2").getIndex(), state.metaData().index("index-3").getIndex()}; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices); + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, blockedIndices); assertNotSame(state, updatedState); - assertTrue(blockedIndices.contains(updatedState.metaData().index("index-1").getIndex())); - assertTrue(blockedIndices.contains(updatedState.metaData().index("index-2").getIndex())); - assertTrue(blockedIndices.contains(updatedState.metaData().index("index-3").getIndex())); - if (mixedVersions) { - assertIsClosed("index-1", updatedState); - assertIsClosed("index-2", updatedState); - assertIsClosed("index-2", updatedState); - } else { - assertIsBlocked("index-1", updatedState, true); - assertIsBlocked("index-2", updatedState, true); - assertIsBlocked("index-3", updatedState, true); + + for (String index : Arrays.asList("index-1", "index-2", "index-3")) { + assertTrue(blockedIndices.contains(updatedState.metaData().index(index).getIndex())); + if (mixedVersions) { + assertIsClosed(index, updatedState, closingBlock); + } else { + assertBlocked("index-1", updatedState, closingBlock); + } } } } @@ -251,11 +252,12 @@ private static ClusterState addOpenedIndex(final String index, final int numShar } private static ClusterState addClosedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) { - return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.CLOSE, MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.CLOSE, createIndexClosedBlock()); } - private static ClusterState addBlockedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) { - return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.OPEN, MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + private static ClusterState addBlockedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state, + final ClusterBlock closingBlock) { + return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.OPEN, closingBlock); } private static ClusterState addRestoredIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) { @@ -329,16 +331,20 @@ private static ClusterState addIndex(final ClusterState currentState, private static void assertIsOpened(final String indexName, final ClusterState clusterState) { assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.OPEN)); assertThat(clusterState.routingTable().index(indexName), notNullValue()); - assertIsBlocked(indexName, clusterState, false); + assertNotBlocked(indexName, clusterState); } - private static void assertIsClosed(final String indexName, final ClusterState clusterState) { + private static void assertIsClosed(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) { assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.CLOSE)); assertThat(clusterState.routingTable().index(indexName), nullValue()); - assertIsBlocked(indexName, clusterState, true); + assertBlocked(indexName, clusterState, closingBlock); + } + + private static void assertBlocked(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) { + assertThat(clusterState.blocks().hasIndexBlock(indexName, closingBlock), is(true)); } - private static void assertIsBlocked(final String indexName, final ClusterState clusterState, final boolean blocked) { - assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(blocked)); + private static void assertNotBlocked(final String indexName, final ClusterState clusterState) { + assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(false)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java index a9ffd4c47e161..bf31a55fe460c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.index.Index; import java.util.Map; @@ -31,16 +32,19 @@ private MetaDataIndexStateServiceUtils(){ } /** - * Allows to call {@link MetaDataIndexStateService#addIndexClosedBlocks(Index[], ClusterState, Set)} which is a protected method. + * Allows to call {@link MetaDataIndexStateService#addIndexClosedBlocks(Index[], ClusterBlock, ClusterState, Set)} which is + * a protected method. */ - public static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterState state, final Set blockedIndices) { - return MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices); + public static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterBlock closingBlock, + final ClusterState state, final Set blockedIndices) { + return MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, blockedIndices); } /** - * Allows to call {@link MetaDataIndexStateService#closeRoutingTable(ClusterState, Map)} which is a protected method. + * Allows to call {@link MetaDataIndexStateService#closeRoutingTable(ClusterState, ClusterBlock, Map)} which is a protected method. */ - public static ClusterState closeRoutingTable(final ClusterState state, final Map results) { - return MetaDataIndexStateService.closeRoutingTable(state, results); + public static ClusterState closeRoutingTable(final ClusterState state, final ClusterBlock closingBlock, + final Map results) { + return MetaDataIndexStateService.closeRoutingTable(state, closingBlock, results); } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 1403543078b28..93a8698f0d254 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -52,6 +52,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; +import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.coordination.JoinTaskExecutor; import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor; import org.elasticsearch.cluster.metadata.AliasValidator; @@ -222,10 +223,12 @@ public ClusterState closeIndices(ClusterState state, CloseIndexRequest request) final Index[] concreteIndices = Arrays.stream(request.indices()) .map(index -> state.metaData().index(index).getIndex()).toArray(Index[]::new); + final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosedBlock(); + final Set blockedIndices = new HashSet<>(); - ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, state, blockedIndices); + ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, closingBlock, state, blockedIndices); - newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices.stream() + newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, closingBlock, blockedIndices.stream() .collect(Collectors.toMap(Function.identity(), index -> new AcknowledgedResponse(true)))); return allocationService.reroute(newState, "indices closed"); } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index c91189972c7b2..72c630d1fd543 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -39,12 +39,12 @@ import java.util.concurrent.CountDownLatch; import java.util.stream.IntStream; +import static java.util.Collections.emptySet; import static java.util.stream.Collectors.toList; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -238,18 +238,80 @@ public void testCloseWhileDeletingIndices() throws Exception { } } + public void testConcurrentClosesAndOpens() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client()); + waitForDocs(1, indexer); + + final CountDownLatch latch = new CountDownLatch(1); + final Runnable waitForLatch = () -> { + try { + latch.await(); + } catch (final InterruptedException e) { + throw new AssertionError(e); + } + }; + + final List threads = new ArrayList<>(); + for (int i = 0; i < randomIntBetween(1, 3); i++) { + threads.add(new Thread(() -> { + try { + waitForLatch.run(); + client().admin().indices().prepareClose(indexName).get(); + } catch (final Exception e) { + assertException(e, indexName); + } + })); + } + for (int i = 0; i < randomIntBetween(1, 3); i++) { + threads.add(new Thread(() -> { + try { + waitForLatch.run(); + client().admin().indices().prepareOpen(indexName).get(); + } catch (final Exception e) { + assertException(e, indexName); + } + })); + } + + for (Thread thread : threads) { + thread.start(); + } + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + + indexer.setAssertNoFailuresOnStop(false); + indexer.stop(); + + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + if (clusterState.metaData().indices().get(indexName).getState() == IndexMetaData.State.CLOSE) { + assertIndexIsClosed(indexName); + assertAcked(client().admin().indices().prepareOpen(indexName)); + } else { + assertIndexIsOpened(indexName); + } + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexer.totalIndexedDocs()); + } + static void assertIndexIsClosed(final String indexName) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.CLOSE)); assertThat(clusterState.routingTable().index(indexName), nullValue()); - assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true)); + assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(true)); + assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]", + clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); } static void assertIndexIsOpened(final String indexName) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN)); assertThat(clusterState.routingTable().index(indexName), notNullValue()); - assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(false)); + assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(false)); } static void assertException(final Throwable throwable, final String indexName) { @@ -257,7 +319,7 @@ static void assertException(final Throwable throwable, final String indexName) { if (t instanceof ClusterBlockException) { ClusterBlockException clusterBlockException = (ClusterBlockException) t; assertThat(clusterBlockException.blocks(), hasSize(1)); - assertThat(clusterBlockException.blocks(), hasItem(MetaDataIndexStateService.INDEX_CLOSED_BLOCK)); + assertTrue(clusterBlockException.blocks().stream().allMatch(b -> b.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)); } else if (t instanceof IndexClosedException) { IndexClosedException indexClosedException = (IndexClosedException) t; assertThat(indexClosedException.getIndex(), notNullValue()); diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 619a09cc18880..7b24ecd02b267 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -535,7 +535,7 @@ public void testRestoreIndexWithMissingShards() throws Exception { logger.info("--> start snapshot with default settings and closed index - should be blocked"); assertBlocked(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") .setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed") - .setWaitForCompletion(true), MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + .setWaitForCompletion(true), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID); logger.info("--> start snapshot with default settings without a closed index - should fail"); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 35e813756bca8..821815d4d0eba 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1560,7 +1560,7 @@ public void testSnapshotClosedIndex() throws Exception { logger.info("--> snapshot with closed index"); assertBlocked(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true) - .setIndices("test-idx", "test-idx-closed"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + .setIndices("test-idx", "test-idx-closed"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID); } public void testSnapshotSingleClosedIndex() throws Exception { @@ -1578,7 +1578,7 @@ public void testSnapshotSingleClosedIndex() throws Exception { logger.info("--> snapshot"); assertBlocked(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") - .setWaitForCompletion(true).setIndices("test-idx"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + .setWaitForCompletion(true).setIndices("test-idx"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID); } public void testRenameOnRestore() throws Exception { diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 6005e7e6163f1..48d18e096bf17 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -129,7 +129,7 @@ public static void assertAcked(CreateIndexResponse response) { * @param builder the request builder */ public static void assertBlocked(ActionRequestBuilder builder) { - assertBlocked(builder, null); + assertBlocked(builder, (ClusterBlock) null); } /** @@ -155,9 +155,9 @@ public static void assertBlocked(BroadcastResponse replicatedBroadcastResponse) * Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}. * * @param builder the request builder - * @param expectedBlock the expected block + * @param expectedBlockId the expected block id */ - public static void assertBlocked(ActionRequestBuilder builder, ClusterBlock expectedBlock) { + public static void assertBlocked(final ActionRequestBuilder builder, @Nullable final Integer expectedBlockId) { try { builder.get(); fail("Request executed with success but a ClusterBlockException was expected"); @@ -165,19 +165,29 @@ public static void assertBlocked(ActionRequestBuilder builder, ClusterBlock expe assertThat(e.blocks().size(), greaterThan(0)); assertThat(e.status(), equalTo(RestStatus.FORBIDDEN)); - if (expectedBlock != null) { + if (expectedBlockId != null) { boolean found = false; for (ClusterBlock clusterBlock : e.blocks()) { - if (clusterBlock.id() == expectedBlock.id()) { + if (clusterBlock.id() == expectedBlockId) { found = true; break; } } - assertThat("Request should have been blocked by [" + expectedBlock + "] instead of " + e.blocks(), found, equalTo(true)); + assertThat("Request should have been blocked by [" + expectedBlockId + "] instead of " + e.blocks(), found, equalTo(true)); } } } + /** + * Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}. + * + * @param builder the request builder + * @param expectedBlock the expected block + */ + public static void assertBlocked(final ActionRequestBuilder builder, @Nullable final ClusterBlock expectedBlock) { + assertBlocked(builder, expectedBlock != null ? expectedBlock.id() : null); + } + public static String formatShardStatus(BroadcastResponse response) { StringBuilder msg = new StringBuilder(); msg.append(" Total shards: ").append(response.getTotalShards()) From e976493d331215c5db34d90ba7d4ddc067de8237 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 19 Dec 2018 09:36:30 +0100 Subject: [PATCH 02/13] missing space --- .../indices/close/TransportVerifyShardBeforeCloseAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 309825ee7b460..62e61cd0e8a4d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -92,7 +92,7 @@ protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, return new ReplicaResult(); } - private void executeShardOperation(final ShardRequest request,final IndexShard indexShard) { + private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) { final ShardId shardId = indexShard.shardId(); if (indexShard.getActiveOperationsCount() != 0) { throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing"); From 62000b2f3b4be14234220c5dce9944884bfa7f3b Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 19 Dec 2018 12:11:02 +0100 Subject: [PATCH 03/13] hasIndexBlockWithId --- .../cluster/block/ClusterBlocks.java | 11 +++--- .../metadata/MetaDataIndexStateService.java | 4 +- .../cluster/block/ClusterBlockTests.java | 39 +++++++++++++++++++ .../MetaDataIndexStateServiceTests.java | 2 +- .../indices/state/CloseIndexIT.java | 4 +- 5 files changed, 50 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java index 8937ec261743c..77a09f0fba174 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java @@ -147,7 +147,7 @@ public boolean hasIndexBlock(String index, ClusterBlock block) { return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block); } - public boolean hasIndexBlock(String index, int blockId) { + public boolean hasIndexBlockWithId(String index, int blockId) { final Set clusterBlocks = indicesBlocks.get(index); if (clusterBlocks != null) { for (ClusterBlock clusterBlock : clusterBlocks) { @@ -415,12 +415,13 @@ public Builder removeIndexBlock(String index, ClusterBlock block) { return this; } - public Builder removeIndexBlock(String index, int blockId) { - if (!indices.containsKey(index)) { + public Builder removeIndexBlockWithId(String index, int blockId) { + final Set indexBlocks = indices.get(index); + if (indexBlocks == null) { return this; } - indices.get(index).removeIf(block -> block.id() == blockId); - if (indices.get(index).isEmpty()) { + indexBlocks.removeIf(block -> block.id() == blockId); + if (indexBlocks.isEmpty()) { indices.remove(index); } return this; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 141e10bc3e98c..65ccb36a8e965 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -294,7 +294,7 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState } final IndexRoutingTable indexRoutingTable = state.routingTable().index(index); if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) { - assert state.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK_ID); + assert state.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); logger.debug("index {} has been blocked before closing and is already closed, ignoring", index); onResponse.accept(new AcknowledgedResponse(true)); return; @@ -459,7 +459,7 @@ public ClusterState execute(ClusterState currentState) { } mdBuilder.put(indexMetaData, true); - blocksBuilder.removeIndexBlock(indexName, INDEX_CLOSED_BLOCK_ID); + blocksBuilder.removeIndexBlockWithId(indexName, INDEX_CLOSED_BLOCK_ID); } ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java b/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java index bdddc1b820174..d6b8f536860d4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java @@ -38,6 +38,8 @@ import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; public class ClusterBlockTests extends ESTestCase { @@ -111,6 +113,43 @@ public void testGlobalBlocksCheckedIfNoIndicesSpecified() { assertEquals(exception.blocks(), Collections.singleton(globalBlock)); } + public void testRemoveIndexBlockWithId() { + final ClusterBlocks.Builder builder = ClusterBlocks.builder(); + builder.addIndexBlock("index-1", + new ClusterBlock(1, "uuid", "", true, true, true, RestStatus.OK, EnumSet.copyOf(ClusterBlockLevel.ALL))); + builder.addIndexBlock("index-1", + new ClusterBlock(2, "uuid", "", true, true, true, RestStatus.OK, EnumSet.copyOf(ClusterBlockLevel.ALL))); + builder.addIndexBlock("index-1", + new ClusterBlock(3, "uuid", "", true, true, true, RestStatus.OK, EnumSet.copyOf(ClusterBlockLevel.ALL))); + builder.addIndexBlock("index-1", + new ClusterBlock(3, "other uuid", "", true, true, true, RestStatus.OK, EnumSet.copyOf(ClusterBlockLevel.ALL))); + + builder.addIndexBlock("index-2", + new ClusterBlock(3, "uuid3", "", true, true, true, RestStatus.OK, EnumSet.copyOf(ClusterBlockLevel.ALL))); + + ClusterBlocks clusterBlocks = builder.build(); + assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(4)); + assertThat(clusterBlocks.indices().get("index-2").size(), equalTo(1)); + + builder.removeIndexBlockWithId("index-1", 3); + clusterBlocks = builder.build(); + + assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(2)); + assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 1), is(true)); + assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 2), is(true)); + assertThat(clusterBlocks.indices().get("index-2").size(), equalTo(1)); + assertThat(clusterBlocks.hasIndexBlockWithId("index-2", 3), is(true)); + + builder.removeIndexBlockWithId("index-2", 3); + clusterBlocks = builder.build(); + + assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(2)); + assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 1), is(true)); + assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 2), is(true)); + assertThat(clusterBlocks.indices().get("index-2"), nullValue()); + assertThat(clusterBlocks.hasIndexBlockWithId("index-2", 3), is(false)); + } + private ClusterBlock randomClusterBlock() { return randomClusterBlock(randomVersion(random())); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 40d1a7144c001..0ac28dca7dc3a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -345,6 +345,6 @@ private static void assertBlocked(final String indexName, final ClusterState clu } private static void assertNotBlocked(final String indexName, final ClusterState clusterState) { - assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(false)); + assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(false)); } } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 72c630d1fd543..84092c254e96e 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -301,7 +301,7 @@ static void assertIndexIsClosed(final String indexName) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.CLOSE)); assertThat(clusterState.routingTable().index(indexName), nullValue()); - assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(true)); + assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(true)); assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]", clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream() .filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); @@ -311,7 +311,7 @@ static void assertIndexIsOpened(final String indexName) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN)); assertThat(clusterState.routingTable().index(indexName), notNullValue()); - assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(false)); + assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(false)); } static void assertException(final Throwable throwable, final String indexName) { From 79e2e99d2b6a8b409e0b09da662304df16e28854 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 19 Dec 2018 15:51:45 +0100 Subject: [PATCH 04/13] Keep last cluster block only --- .../metadata/MetaDataIndexStateService.java | 17 ++++++--- .../MetaDataIndexStateServiceTests.java | 36 +++++++++++++++++-- .../indices/state/CloseIndexIT.java | 2 +- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 65ccb36a8e965..99f6076c8de8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -223,6 +223,9 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterBlo for (IndexMetaData indexToClose : indicesToClose) { final Index index = indexToClose.getIndex(); + if (currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID)) { + blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); + } blocks.addIndexBlock(index.getName(), closingBlock); if (useDirectClose) { logger.debug("closing index {} directly", index); @@ -367,16 +370,20 @@ static ClusterState closeRoutingTable(final ClusterState currentState, final boolean acknowledged = result.getValue().isAcknowledged(); try { final IndexMetaData indexMetaData = metadata.getSafe(index); - assert currentState.blocks().hasIndexBlock(index.getName(), closingBlock); if (acknowledged) { - if (indexMetaData.getState() != IndexMetaData.State.CLOSE) { - logger.debug("closing index {} succeed, removing index routing table", index); + if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) { + logger.debug("closing index {} succeed but block has been removed by a concurrent open or close request", index); + assert indexMetaData.getState() == IndexMetaData.State.OPEN + || currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); + + } else if (indexMetaData.getState() == IndexMetaData.State.OPEN) { + logger.debug("closing index {} succeed", index); metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE)); routingTable.remove(index.getName()); closedIndices.add(index.getName()); } else { - logger.debug("index {} already closed, removing index block {}", index, closingBlock); - blocks.removeIndexBlock(index.getName(), closingBlock); + logger.debug("closing index {} succeed but index is already closed", index); + assert false : "Cluster block should have been removed"; } } else { logger.debug("closing index {} failed, removing index block {}", index, closingBlock); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 0ac28dca7dc3a..cba779a62c2a6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -56,9 +56,11 @@ import java.util.Set; import java.util.stream.Collectors; +import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID; import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.createIndexClosedBlock; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; @@ -202,6 +204,26 @@ public void testAddIndexClosedBlocks() { } } + public void testLastIndexClosedBlockIsApplied() { + ClusterState state = ClusterState.builder(new ClusterName("testAddIndexClosedBlocks")).build(); + state = addOpenedIndex("index", randomIntBetween(1, 3), randomIntBetween(0, 3), state); + + ClusterState previousState = null; + ClusterBlock closingBlock = null; + for (int i = 0; i < randomIntBetween(2, 10); i++) { + previousState = state; + closingBlock = MetaDataIndexStateService.createIndexClosedBlock(); + + Set blockedIndices = new HashSet<>(); + Index[] indices = new Index[]{previousState.metaData().index("index").getIndex()}; + state = MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, previousState, blockedIndices); + + assertNotSame(previousState, state); + assertTrue(blockedIndices.contains(state.metaData().index("index").getIndex())); + } + assertBlocked("index", state, closingBlock); + } + public void testValidateShardLimit() { int nodesInCluster = randomIntBetween(2,100); ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster); @@ -340,11 +362,19 @@ private static void assertIsClosed(final String indexName, final ClusterState cl assertBlocked(indexName, clusterState, closingBlock); } - private static void assertBlocked(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) { - assertThat(clusterState.blocks().hasIndexBlock(indexName, closingBlock), is(true)); + private static void assertBlocked(final String indexName, final ClusterState clusterState, final ClusterBlock... closingBlocks) { + assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, INDEX_CLOSED_BLOCK_ID), is(true)); + if (closingBlocks != null) { + for (ClusterBlock closingBlock : closingBlocks) { + assertThat(clusterState.blocks().hasIndexBlock(indexName, closingBlock), is(true)); + } + assertThat("Index " + indexName + " must have only 1 block with [id=" + INDEX_CLOSED_BLOCK_ID + "]", + clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == INDEX_CLOSED_BLOCK_ID).count(), equalTo((long) closingBlocks.length)); + } } private static void assertNotBlocked(final String indexName, final ClusterState clusterState) { - assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(false)); + assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, INDEX_CLOSED_BLOCK_ID), is(false)); } } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 84092c254e96e..5a42cac6b240c 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -146,7 +146,7 @@ public void testConcurrentClose() throws InterruptedException { } catch (InterruptedException e) { throw new AssertionError(e); } - assertAcked(client().admin().indices().prepareClose(indexName)); + client().admin().indices().prepareClose(indexName).get(); }); threads[i].start(); } From 48c9269cfb8ccc9c643bb3b3508e35386ed6fa3c Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 19 Dec 2018 15:56:23 +0100 Subject: [PATCH 05/13] Fix log message --- .../cluster/metadata/MetaDataIndexStateService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 99f6076c8de8d..ba2899fa0c3d5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -372,7 +372,7 @@ static ClusterState closeRoutingTable(final ClusterState currentState, final IndexMetaData indexMetaData = metadata.getSafe(index); if (acknowledged) { if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) { - logger.debug("closing index {} succeed but block has been removed by a concurrent open or close request", index); + logger.debug("closing index {} succeed but block has been removed by a concurrent close request", index); assert indexMetaData.getState() == IndexMetaData.State.OPEN || currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); From ec7cc6fd6043074f9163997e44eda1254e077f6b Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 20 Dec 2018 14:19:57 +0100 Subject: [PATCH 06/13] Revert "Keep last cluster block only" This reverts commit 79e2e99d --- .../MetaDataIndexStateServiceTests.java | 36 ++----------------- .../indices/state/CloseIndexIT.java | 2 +- 2 files changed, 4 insertions(+), 34 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index cba779a62c2a6..0ac28dca7dc3a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -56,11 +56,9 @@ import java.util.Set; import java.util.stream.Collectors; -import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; -import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID; import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.createIndexClosedBlock; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; @@ -204,26 +202,6 @@ public void testAddIndexClosedBlocks() { } } - public void testLastIndexClosedBlockIsApplied() { - ClusterState state = ClusterState.builder(new ClusterName("testAddIndexClosedBlocks")).build(); - state = addOpenedIndex("index", randomIntBetween(1, 3), randomIntBetween(0, 3), state); - - ClusterState previousState = null; - ClusterBlock closingBlock = null; - for (int i = 0; i < randomIntBetween(2, 10); i++) { - previousState = state; - closingBlock = MetaDataIndexStateService.createIndexClosedBlock(); - - Set blockedIndices = new HashSet<>(); - Index[] indices = new Index[]{previousState.metaData().index("index").getIndex()}; - state = MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, previousState, blockedIndices); - - assertNotSame(previousState, state); - assertTrue(blockedIndices.contains(state.metaData().index("index").getIndex())); - } - assertBlocked("index", state, closingBlock); - } - public void testValidateShardLimit() { int nodesInCluster = randomIntBetween(2,100); ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster); @@ -362,19 +340,11 @@ private static void assertIsClosed(final String indexName, final ClusterState cl assertBlocked(indexName, clusterState, closingBlock); } - private static void assertBlocked(final String indexName, final ClusterState clusterState, final ClusterBlock... closingBlocks) { - assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, INDEX_CLOSED_BLOCK_ID), is(true)); - if (closingBlocks != null) { - for (ClusterBlock closingBlock : closingBlocks) { - assertThat(clusterState.blocks().hasIndexBlock(indexName, closingBlock), is(true)); - } - assertThat("Index " + indexName + " must have only 1 block with [id=" + INDEX_CLOSED_BLOCK_ID + "]", - clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream() - .filter(clusterBlock -> clusterBlock.id() == INDEX_CLOSED_BLOCK_ID).count(), equalTo((long) closingBlocks.length)); - } + private static void assertBlocked(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) { + assertThat(clusterState.blocks().hasIndexBlock(indexName, closingBlock), is(true)); } private static void assertNotBlocked(final String indexName, final ClusterState clusterState) { - assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, INDEX_CLOSED_BLOCK_ID), is(false)); + assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(false)); } } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 5a42cac6b240c..84092c254e96e 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -146,7 +146,7 @@ public void testConcurrentClose() throws InterruptedException { } catch (InterruptedException e) { throw new AssertionError(e); } - client().admin().indices().prepareClose(indexName).get(); + assertAcked(client().admin().indices().prepareClose(indexName)); }); threads[i].start(); } From 7caf533fddcd997daf7fd73265d1d609a19cc870 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 21 Dec 2018 12:01:50 +0100 Subject: [PATCH 07/13] Reuse blocks --- .../cluster/block/ClusterBlocks.java | 2 +- .../metadata/MetaDataIndexStateService.java | 230 ++++++++++-------- .../elasticsearch/transport/TcpTransport.java | 2 +- .../MetaDataIndexStateServiceTests.java | 121 +++++---- .../MetaDataIndexStateServiceUtils.java | 17 +- .../indices/cluster/ClusterStateChanges.java | 11 +- .../indices/state/CloseIndexIT.java | 43 ++-- .../indices/state/OpenCloseIndexIT.java | 21 +- .../indices/state/ReopenWhileClosingIT.java | 167 +++++++++++++ 9 files changed, 418 insertions(+), 196 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java index 77a09f0fba174..3ab5d752b9077 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java @@ -346,7 +346,7 @@ public Builder blocks(ClusterBlocks blocks) { public Builder addBlocks(IndexMetaData indexMetaData) { String indexName = indexMetaData.getIndex().getName(); if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - addIndexBlock(indexName, MetaDataIndexStateService.createIndexClosedBlock()); + addIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK); } if (IndexMetaData.INDEX_READ_ONLY_SETTING.get(indexMetaData.getSettings())) { addIndexBlock(indexName, IndexMetaData.INDEX_READ_ONLY_BLOCK); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index ba2899fa0c3d5..e76345054d345 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -68,6 +68,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -85,6 +86,8 @@ public class MetaDataIndexStateService { private static final Logger logger = LogManager.getLogger(MetaDataIndexStateService.class); public static final int INDEX_CLOSED_BLOCK_ID = 4; + public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, + false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE); private final ClusterService clusterService; private final AllocationService allocationService; @@ -123,17 +126,14 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina final TimeValue timeout = request.ackTimeout(); final TimeValue masterTimeout = request.masterNodeTimeout(); - // Creates a unique cluster block - final ClusterBlock closingBlock = createIndexClosedBlock(); - clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices), new ClusterStateUpdateTask(Priority.URGENT) { - private final Set blockedIndices = new HashSet<>(); + private final Map blockedIndices = new HashMap<>(); @Override public ClusterState execute(final ClusterState currentState) { - return addIndexClosedBlocks(concreteIndices, closingBlock, currentState, blockedIndices); + return addIndexClosedBlocks(concreteIndices, blockedIndices, currentState); } @Override @@ -144,12 +144,12 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta } else { assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) - .execute(new WaitForClosedBlocksApplied(blockedIndices, closingBlock, timeout, + .execute(new WaitForClosedBlocksApplied(blockedIndices, timeout, ActionListener.wrap(results -> clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(final ClusterState currentState) throws Exception { - final ClusterState updatedState = closeRoutingTable(currentState, closingBlock, results); + final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results); return allocationService.reroute(updatedState, "indices closed"); } @@ -187,12 +187,12 @@ public TimeValue timeout() { /** * Step 1 - Start closing indices by adding a write block * - * This step builds the list of indices to close (the ones explicitly requested that are not in CLOSE state) and adds the unique - * cluster block to every index to close in the cluster state. After the cluster state is published, the shards should - * start to reject writing operations and we can proceed with step 2. + * This step builds the list of indices to close (the ones explicitly requested that are not in CLOSE state) and adds a unique cluster + * block (or reuses an existing one) to every index to close in the cluster state. After the cluster state is published, the shards + * should start to reject writing operations and we can proceed with step 2. */ - static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterBlock closingBlock, - final ClusterState currentState, final Set blockedIndices) { + static ClusterState addIndexClosedBlocks(final Index[] indices, final Map blockedIndices, + final ClusterState currentState) { final MetaData.Builder metadata = MetaData.builder(currentState.metaData()); final Set indicesToClose = new HashSet<>(); @@ -202,6 +202,7 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterBlo indicesToClose.add(indexMetaData); } else { logger.debug("index {} is already closed, ignoring", index); + assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); } } @@ -223,20 +224,35 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterBlo for (IndexMetaData indexToClose : indicesToClose) { final Index index = indexToClose.getIndex(); - if (currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID)) { - blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); + + ClusterBlock indexBlock = null; + final Set clusterBlocks = currentState.blocks().indices().get(index.getName()); + if (clusterBlocks != null) { + for (ClusterBlock clusterBlock : clusterBlocks) { + if (clusterBlock.id() == INDEX_CLOSED_BLOCK_ID) { + // Reuse the existing index closed block + indexBlock = clusterBlock; + break; + } + } } - blocks.addIndexBlock(index.getName(), closingBlock); if (useDirectClose) { logger.debug("closing index {} directly", index); - metadata.put(IndexMetaData.builder(indexToClose).state(IndexMetaData.State.CLOSE)); + metadata.put(IndexMetaData.builder(indexToClose).state(IndexMetaData.State.CLOSE)); // increment version? + blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); routingTable.remove(index.getName()); + indexBlock = INDEX_CLOSED_BLOCK; + } + if (indexBlock == null) { + // Create a new index closed block + indexBlock = createIndexClosedBlock(); } - blockedIndices.add(index); + blocks.addIndexBlock(index.getName(), indexBlock); + blockedIndices.put(index, indexBlock); } logger.info(() -> new ParameterizedMessage("closing indices {}", - blockedIndices.stream().map(Object::toString).collect(Collectors.joining(",")))); + blockedIndices.keySet().stream().map(Object::toString).collect(Collectors.joining(",")))); return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build(); } @@ -249,20 +265,17 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterBlo */ class WaitForClosedBlocksApplied extends AbstractRunnable { - private final Set blockedIndices; - private final ClusterBlock clusterBlock; + private final Map blockedIndices; private final @Nullable TimeValue timeout; private final ActionListener> listener; - private WaitForClosedBlocksApplied(final Set blockedIndices, - final ClusterBlock clusterBlock, + private WaitForClosedBlocksApplied(final Map blockedIndices, final @Nullable TimeValue timeout, final ActionListener> listener) { if (blockedIndices == null || blockedIndices.isEmpty()) { - throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices"); + throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null"); } this.blockedIndices = blockedIndices; - this.clusterBlock = clusterBlock; this.listener = listener; this.timeout = timeout; } @@ -277,18 +290,18 @@ protected void doRun() throws Exception { final Map results = ConcurrentCollections.newConcurrentMap(); final CountDown countDown = new CountDown(blockedIndices.size()); final ClusterState state = clusterService.state(); - for (Index blockedIndex : blockedIndices) { - waitForShardsReadyForClosing(blockedIndex, state, response -> { - results.put(blockedIndex, response); + blockedIndices.forEach((index, block) -> { + waitForShardsReadyForClosing(index, block, state, response -> { + results.put(index, response); if (countDown.countDown()) { listener.onResponse(unmodifiableMap(results)); } }); - } + }); } - private void waitForShardsReadyForClosing(final Index index, final ClusterState state, - final Consumer onResponse) { + private void waitForShardsReadyForClosing(final Index index, final ClusterBlock closingBlock, + final ClusterState state, final Consumer onResponse) { final IndexMetaData indexMetaData = state.metaData().index(index); if (indexMetaData == null) { logger.debug("index {} has been blocked before closing and is now deleted, ignoring", index); @@ -297,7 +310,7 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState } final IndexRoutingTable indexRoutingTable = state.routingTable().index(index); if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) { - assert state.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); + assert state.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); logger.debug("index {} has been blocked before closing and is already closed, ignoring", index); onResponse.accept(new AcknowledgedResponse(true)); return; @@ -310,7 +323,7 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState for (IntObjectCursor shard : shards) { final IndexShardRoutingTable shardRoutingTable = shard.value; final ShardId shardId = shardRoutingTable.shardId(); - sendVerifyShardBeforeCloseRequest(shardRoutingTable, new NotifyOnceListener() { + sendVerifyShardBeforeCloseRequest(shardRoutingTable, closingBlock, new NotifyOnceListener() { @Override public void innerOnResponse(final ReplicationResponse replicationResponse) { ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo(); @@ -335,6 +348,7 @@ private void processIfFinished() { } private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, + final ClusterBlock closingBlock, final ActionListener listener) { final ShardId shardId = shardRoutingTable.shardId(); if (shardRoutingTable.primaryShard().unassigned()) { @@ -345,7 +359,7 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar return; } final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock); if (timeout != null) { shardRequest.timeout(timeout); } @@ -358,7 +372,7 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar * Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing. */ static ClusterState closeRoutingTable(final ClusterState currentState, - final ClusterBlock closingBlock, + final Map blockedIndices, final Map results) { final MetaData.Builder metadata = MetaData.builder(currentState.metaData()); final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); @@ -369,26 +383,28 @@ static ClusterState closeRoutingTable(final ClusterState currentState, final Index index = result.getKey(); final boolean acknowledged = result.getValue().isAcknowledged(); try { + if (acknowledged == false) { + logger.debug("closing index {} failed", index); + continue; + } final IndexMetaData indexMetaData = metadata.getSafe(index); - if (acknowledged) { - if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) { - logger.debug("closing index {} succeed but block has been removed by a concurrent close request", index); - assert indexMetaData.getState() == IndexMetaData.State.OPEN - || currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); - - } else if (indexMetaData.getState() == IndexMetaData.State.OPEN) { - logger.debug("closing index {} succeed", index); - metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE)); - routingTable.remove(index.getName()); - closedIndices.add(index.getName()); - } else { - logger.debug("closing index {} succeed but index is already closed", index); - assert false : "Cluster block should have been removed"; - } - } else { - logger.debug("closing index {} failed, removing index block {}", index, closingBlock); - blocks.removeIndexBlock(index.getName(), closingBlock); + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + logger.debug("closing index {} succeed but index is already closed", index); + assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); + continue; + } + final ClusterBlock closingBlock = blockedIndices.get(index); + if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) { + logger.debug("closing index {} succeed but block has been removed in the mean time", index); + continue; } + + logger.debug("closing index {} succeed", index); + blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID).addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); + metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE)); + routingTable.remove(index.getName()); + closedIndices.add(index.getName()); + } catch (final IndexNotFoundException e) { logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index); } @@ -424,64 +440,73 @@ private void onlyOpenIndex(final OpenIndexClusterStateUpdateRequest request, final String indicesAsString = Arrays.toString(request.indices()); clusterService.submitStateUpdateTask("open-indices " + indicesAsString, - new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { + @Override + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); + } - @Override - public ClusterState execute(ClusterState currentState) { - List indicesToOpen = new ArrayList<>(); - for (Index index : request.indices()) { - final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index); - if (indexMetaData.getState() != IndexMetaData.State.OPEN) { - indicesToOpen.add(indexMetaData); - } + @Override + public ClusterState execute(final ClusterState currentState) { + final ClusterState updatedState = openIndices(request.indices(), currentState); + //no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask + return allocationService.reroute(updatedState, "indices opened [" + indicesAsString + "]"); } + } + ); + } - validateShardLimit(currentState, request.indices()); + ClusterState openIndices(final Index[] indices, final ClusterState currentState) { + final List indicesToOpen = new ArrayList<>(); + for (Index index : indices) { + final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index); + if (indexMetaData.getState() != IndexMetaData.State.OPEN) { + indicesToOpen.add(indexMetaData); + } else if (currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID)) { + indicesToOpen.add(indexMetaData); + } + } - if (indicesToOpen.isEmpty()) { - return currentState; - } + validateShardLimit(currentState, indices); + if (indicesToOpen.isEmpty()) { + return currentState; + } - logger.info("opening indices [{}]", indicesAsString); - - MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); - ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder() - .blocks(currentState.blocks()); - final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion() - .minimumIndexCompatibilityVersion(); - for (IndexMetaData closedMetaData : indicesToOpen) { - final String indexName = closedMetaData.getIndex().getName(); - IndexMetaData indexMetaData = IndexMetaData.builder(closedMetaData).state(IndexMetaData.State.OPEN).build(); - // The index might be closed because we couldn't import it due to old incompatible version - // We need to check that this index can be upgraded to the current version - indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData, minIndexCompatibilityVersion); - try { - indicesService.verifyIndexMetadata(indexMetaData, indexMetaData); - } catch (Exception e) { - throw new ElasticsearchException("Failed to verify index " + indexMetaData.getIndex(), e); - } + logger.info(() -> new ParameterizedMessage("opening indices [{}]", + String.join(",", indicesToOpen.stream().map(i -> (CharSequence) i.getIndex().toString())::iterator))); - mdBuilder.put(indexMetaData, true); - blocksBuilder.removeIndexBlockWithId(indexName, INDEX_CLOSED_BLOCK_ID); + final MetaData.Builder metadata = MetaData.builder(currentState.metaData()); + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion().minimumIndexCompatibilityVersion(); + + for (IndexMetaData indexMetaData : indicesToOpen) { + final Index index = indexMetaData.getIndex(); + if (indexMetaData.getState() != IndexMetaData.State.OPEN) { + IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.OPEN).build(); + // The index might be closed because we couldn't import it due to old incompatible version + // We need to check that this index can be upgraded to the current version + updatedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(updatedIndexMetaData, minIndexCompatibilityVersion); + try { + indicesService.verifyIndexMetadata(updatedIndexMetaData, updatedIndexMetaData); + } catch (Exception e) { + throw new ElasticsearchException("Failed to verify index " + index, e); } + metadata.put(updatedIndexMetaData, true); + } - ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build(); + // Always removes index closed blocks (note: this can fail on-going close index actions) + blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); + } - RoutingTable.Builder rtBuilder = RoutingTable.builder(updatedState.routingTable()); - for (IndexMetaData index : indicesToOpen) { - rtBuilder.addAsFromCloseToOpen(updatedState.metaData().getIndexSafe(index.getIndex())); - } + ClusterState updatedState = ClusterState.builder(currentState).metaData(metadata).blocks(blocks).build(); - //no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask - return allocationService.reroute( - ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(), - "indices opened [" + indicesAsString + "]"); + final RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable()); + for (IndexMetaData previousIndexMetaData : indicesToOpen) { + if (previousIndexMetaData.getState() != IndexMetaData.State.OPEN) { + routingTable.addAsFromCloseToOpen(updatedState.metaData().getIndexSafe(previousIndexMetaData.getIndex())); } - }); + } + return ClusterState.builder(updatedState).routingTable(routingTable.build()).build(); } /** @@ -512,11 +537,14 @@ private static int getTotalShardCount(ClusterState state, Index index) { } /** - * @return Generates a {@link ClusterBlock} that blocks read and write operations on closed or soon-to-be-closed indices. The + * @return Generates a {@link ClusterBlock} that blocks read and write operations on soon-to-be-closed indices. The * cluster block is generated with the id value equals to {@link #INDEX_CLOSED_BLOCK_ID} and a unique UUID. */ public static ClusterBlock createIndexClosedBlock() { - return new ClusterBlock(INDEX_CLOSED_BLOCK_ID, UUIDs.randomBase64UUID(), "index closed", false, false, false, + final String description = "Index is blocked due to on-going index closing operation. Note that the closing process can take " + + "time and writes operations are blocked in the meantime. Execute an open index request to unblock the index and allow writes " + + "operation again. Execute a new close index request will try to close the index again."; + return new ClusterBlock(INDEX_CLOSED_BLOCK_ID, UUIDs.randomBase64UUID(), description , false, false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE); } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 8edf97c929550..f1f26831ac62b 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -653,7 +653,7 @@ public void onFailure(Exception e) { } } } else { - logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e); + logger.trace(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e); // close the channel, which will cause a node to be disconnected if relevant CloseableChannel.closeChannel(channel); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 0ac28dca7dc3a..6931922b15839 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -56,10 +56,14 @@ import java.util.Set; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; -import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.createIndexClosedBlock; +import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK; +import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; import static org.hamcrest.Matchers.containsString; @@ -74,75 +78,83 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { public void testCloseRoutingTable() { final Set nonBlockedIndices = new HashSet<>(); - final Map blockedIndices = new HashMap<>(); - final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosedBlock(); + final Map blockedIndices = new HashMap<>(); + final Map results = new HashMap<>(); ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTable")).build(); for (int i = 0; i < randomIntBetween(1, 25); i++) { - final String indexName = randomAlphaOfLengthBetween(5, 15); + final String indexName = "index-" + i; if (randomBoolean()) { state = addOpenedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state); nonBlockedIndices.add(state.metaData().index(indexName).getIndex()); } else { + final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosedBlock(); state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock); - blockedIndices.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean())); + blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock); + results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean())); } } - final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, closingBlock, blockedIndices); + final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); assertThat(updatedState.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size())); for (Index nonBlockedIndex : nonBlockedIndices) { assertIsOpened(nonBlockedIndex.getName(), updatedState); + assertThat(updatedState.blocks().hasIndexBlockWithId(nonBlockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(false)); } - for (Map.Entry blockedIndex : blockedIndices.entrySet()) { - if (blockedIndex.getValue().isAcknowledged()) { - assertIsClosed(blockedIndex.getKey().getName(), updatedState, closingBlock); + for (Index blockedIndex : blockedIndices.keySet()) { + if (results.get(blockedIndex).isAcknowledged()) { + assertIsClosed(blockedIndex.getName(), updatedState); } else { - assertIsOpened(blockedIndex.getKey().getName(), updatedState); + assertIsOpened(blockedIndex.getName(), updatedState); + assertThat(updatedState.blocks().hasIndexBlockWithId(blockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(true)); } } } public void testAddIndexClosedBlocks() { final ClusterState initialState = ClusterState.builder(new ClusterName("testAddIndexClosedBlocks")).build(); - final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosedBlock(); { - final Set blockedIndices = new HashSet<>(); + final Map blockedIndices = new HashMap<>(); Index[] indices = new Index[]{new Index("_name", "_uid")}; expectThrows(IndexNotFoundException.class, () -> - MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, initialState, blockedIndices)); + MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, initialState)); assertTrue(blockedIndices.isEmpty()); } { - final Set blockedIndices = new HashSet<>(); + final Map blockedIndices = new HashMap<>(); Index[] indices = Index.EMPTY_ARRAY; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, initialState, blockedIndices); + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, initialState); assertSame(initialState, updatedState); assertTrue(blockedIndices.isEmpty()); } { - final Set blockedIndices = new HashSet<>(); + final Map blockedIndices = new HashMap<>(); ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState); Index[] indices = new Index[]{state.metaData().index("closed").getIndex()}; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, blockedIndices); + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state); assertSame(state, updatedState); assertTrue(blockedIndices.isEmpty()); + } { - final Set blockedIndices = new HashSet<>(); + final Map blockedIndices = new HashMap<>(); ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState); state = addOpenedIndex("opened", randomIntBetween(1, 3), randomIntBetween(0, 3), state); Index[] indices = new Index[]{state.metaData().index("opened").getIndex(), state.metaData().index("closed").getIndex()}; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, blockedIndices); + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state); assertNotSame(state, updatedState); - assertTrue(blockedIndices.contains(updatedState.metaData().index("opened").getIndex())); - assertFalse(blockedIndices.contains(updatedState.metaData().index("closed").getIndex())); - assertBlocked("opened", updatedState, closingBlock); + + Index opened = updatedState.metaData().index("opened").getIndex(); + assertTrue(blockedIndices.containsKey(opened)); + assertHasBlock("opened", updatedState, blockedIndices.get(opened)); + + Index closed = updatedState.metaData().index("closed").getIndex(); + assertFalse(blockedIndices.containsKey(closed)); } { IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { @@ -154,7 +166,7 @@ public void testAddIndexClosedBlocks() { state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state); } Index[] indices = new Index[]{state.metaData().index("restored").getIndex()}; - MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, new HashSet<>()); + MetaDataIndexStateService.addIndexClosedBlocks(indices, unmodifiableMap(emptyMap()), state); }); assertThat(exception.getMessage(), containsString("Cannot close indices that are being restored: [[restored]]")); } @@ -168,12 +180,12 @@ public void testAddIndexClosedBlocks() { state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state); } Index[] indices = new Index[]{state.metaData().index("snapshotted").getIndex()}; - MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, new HashSet<>()); + MetaDataIndexStateService.addIndexClosedBlocks(indices, unmodifiableMap(emptyMap()), state); }); assertThat(exception.getMessage(), containsString("Cannot close indices that are being snapshotted: [[snapshotted]]")); } { - final Set blockedIndices = new HashSet<>(); + final Map blockedIndices = new HashMap<>(); ClusterState state = addOpenedIndex("index-1", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState); state = addOpenedIndex("index-2", randomIntBetween(1, 3), randomIntBetween(0, 3), state); state = addOpenedIndex("index-3", randomIntBetween(1, 3), randomIntBetween(0, 3), state); @@ -181,27 +193,51 @@ public void testAddIndexClosedBlocks() { if (mixedVersions) { state = ClusterState.builder(state) .nodes(DiscoveryNodes.builder(state.nodes()) - .add(new DiscoveryNode("old_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + .add(new DiscoveryNode("old_node", buildNewFakeTransportAddress(), emptyMap(), new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())), Version.V_6_0_0))) .build(); } - Index[] indices = new Index[]{state.metaData().index("index-1").getIndex(), - state.metaData().index("index-2").getIndex(), state.metaData().index("index-3").getIndex()}; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, blockedIndices); + Index index1 = state.metaData().index("index-1").getIndex(); + Index index2 = state.metaData().index("index-2").getIndex(); + Index index3 = state.metaData().index("index-3").getIndex(); + Index[] indices = new Index[]{index1, index2, index3}; + + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state); assertNotSame(state, updatedState); - for (String index : Arrays.asList("index-1", "index-2", "index-3")) { - assertTrue(blockedIndices.contains(updatedState.metaData().index(index).getIndex())); + for (Index index : indices) { + assertTrue(blockedIndices.containsKey(index)); if (mixedVersions) { - assertIsClosed(index, updatedState, closingBlock); + assertIsClosed(index.getName(), updatedState); } else { - assertBlocked("index-1", updatedState, closingBlock); + assertHasBlock(index.getName(), updatedState, blockedIndices.get(index)); } } } } + public void testAddIndexClosedBlocksReusesBlocks() { + ClusterState state = ClusterState.builder(new ClusterName("testAddIndexClosedBlocksReuseBlocks")).build(); + state = addOpenedIndex("test", randomIntBetween(1, 3), randomIntBetween(0, 3), state); + + Index test = state.metaData().index("test").getIndex(); + Index[] indices = new Index[]{test}; + + final Map blockedIndices = new HashMap<>(); + state = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state); + + assertTrue(blockedIndices.containsKey(test)); + assertHasBlock(test.getName(), state, blockedIndices.get(test)); + + final Map blockedIndices2 = new HashMap<>(); + state = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices2, state); + + assertTrue(blockedIndices2.containsKey(test)); + assertHasBlock(test.getName(), state, blockedIndices2.get(test)); + assertEquals(blockedIndices.get(test), blockedIndices2.get(test)); + } + public void testValidateShardLimit() { int nodesInCluster = randomIntBetween(2,100); ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster); @@ -252,7 +288,7 @@ private static ClusterState addOpenedIndex(final String index, final int numShar } private static ClusterState addClosedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) { - return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.CLOSE, createIndexClosedBlock()); + return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.CLOSE, INDEX_CLOSED_BLOCK); } private static ClusterState addBlockedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state, @@ -331,20 +367,21 @@ private static ClusterState addIndex(final ClusterState currentState, private static void assertIsOpened(final String indexName, final ClusterState clusterState) { assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.OPEN)); assertThat(clusterState.routingTable().index(indexName), notNullValue()); - assertNotBlocked(indexName, clusterState); } - private static void assertIsClosed(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) { + private static void assertIsClosed(final String indexName, final ClusterState clusterState) { assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.CLOSE)); assertThat(clusterState.routingTable().index(indexName), nullValue()); - assertBlocked(indexName, clusterState, closingBlock); + assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true)); + assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]", + clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); } - private static void assertBlocked(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) { + private static void assertHasBlock(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) { assertThat(clusterState.blocks().hasIndexBlock(indexName, closingBlock), is(true)); - } - - private static void assertNotBlocked(final String indexName, final ClusterState clusterState) { - assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(false)); + assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]", + clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java index bf31a55fe460c..5ee6a7c60da3d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java @@ -24,7 +24,6 @@ import org.elasticsearch.index.Index; import java.util.Map; -import java.util.Set; public class MetaDataIndexStateServiceUtils { @@ -32,19 +31,19 @@ private MetaDataIndexStateServiceUtils(){ } /** - * Allows to call {@link MetaDataIndexStateService#addIndexClosedBlocks(Index[], ClusterBlock, ClusterState, Set)} which is - * a protected method. + * Allows to call {@link MetaDataIndexStateService#addIndexClosedBlocks(Index[], Map, ClusterState)} which is a protected method. */ - public static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterBlock closingBlock, - final ClusterState state, final Set blockedIndices) { - return MetaDataIndexStateService.addIndexClosedBlocks(indices, closingBlock, state, blockedIndices); + public static ClusterState addIndexClosedBlocks(final Index[] indices, final Map blockedIndices, + final ClusterState state) { + return MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state); } /** - * Allows to call {@link MetaDataIndexStateService#closeRoutingTable(ClusterState, ClusterBlock, Map)} which is a protected method. + * Allows to call {@link MetaDataIndexStateService#closeRoutingTable(ClusterState, Map, Map)} which is a protected method. */ - public static ClusterState closeRoutingTable(final ClusterState state, final ClusterBlock closingBlock, + public static ClusterState closeRoutingTable(final ClusterState state, + final Map blockedIndices, final Map results) { - return MetaDataIndexStateService.closeRoutingTable(state, closingBlock, results); + return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 93a8698f0d254..387ba1c3d9653 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -95,9 +95,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -223,12 +224,10 @@ public ClusterState closeIndices(ClusterState state, CloseIndexRequest request) final Index[] concreteIndices = Arrays.stream(request.indices()) .map(index -> state.metaData().index(index).getIndex()).toArray(Index[]::new); - final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosedBlock(); + final Map blockedIndices = new HashMap<>(); + ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, blockedIndices, state); - final Set blockedIndices = new HashSet<>(); - ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, closingBlock, state, blockedIndices); - - newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, closingBlock, blockedIndices.stream() + newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices, blockedIndices.keySet().stream() .collect(Collectors.toMap(Function.identity(), index -> new AcknowledgedResponse(true)))); return allocationService.reroute(newState, "indices closed"); } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 84092c254e96e..f6ec07c1af73a 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -135,6 +135,7 @@ public void testConcurrentClose() throws InterruptedException { final int nbDocs = randomIntBetween(10, 50); indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); + ensureYellowAndNoInitializingShards(indexName); final CountDownLatch startClosing = new CountDownLatch(1); final Thread[] threads = new Thread[randomIntBetween(2, 5)]; @@ -146,7 +147,11 @@ public void testConcurrentClose() throws InterruptedException { } catch (InterruptedException e) { throw new AssertionError(e); } - assertAcked(client().admin().indices().prepareClose(indexName)); + try { + assertAcked(client().admin().indices().prepareClose(indexName)); + } catch (final Exception e) { + assertException(e, indexName); + } }); threads[i].start(); } @@ -261,7 +266,7 @@ public void testConcurrentClosesAndOpens() throws Exception { waitForLatch.run(); client().admin().indices().prepareClose(indexName).get(); } catch (final Exception e) { - assertException(e, indexName); + throw new AssertionError(e); } })); } @@ -269,9 +274,9 @@ public void testConcurrentClosesAndOpens() throws Exception { threads.add(new Thread(() -> { try { waitForLatch.run(); - client().admin().indices().prepareOpen(indexName).get(); + assertAcked(client().admin().indices().prepareOpen(indexName).get()); } catch (final Exception e) { - assertException(e, indexName); + throw new AssertionError(e); } })); } @@ -291,27 +296,31 @@ public void testConcurrentClosesAndOpens() throws Exception { if (clusterState.metaData().indices().get(indexName).getState() == IndexMetaData.State.CLOSE) { assertIndexIsClosed(indexName); assertAcked(client().admin().indices().prepareOpen(indexName)); - } else { - assertIndexIsOpened(indexName); } + refresh(indexName); + assertIndexIsOpened(indexName); assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexer.totalIndexedDocs()); } - static void assertIndexIsClosed(final String indexName) { + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.CLOSE)); - assertThat(clusterState.routingTable().index(indexName), nullValue()); - assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(true)); - assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]", - clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream() - .filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); + for (String index : indices) { + assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.CLOSE)); + assertThat(clusterState.routingTable().index(index), nullValue()); + assertThat(clusterState.blocks().hasIndexBlock(index, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true)); + assertThat("Index " + index + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]", + clusterState.blocks().indices().getOrDefault(index, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); + } } - static void assertIndexIsOpened(final String indexName) { + static void assertIndexIsOpened(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN)); - assertThat(clusterState.routingTable().index(indexName), notNullValue()); - assertThat(clusterState.blocks().hasIndexBlockWithId(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID), is(false)); + for (String index : indices) { + assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.OPEN)); + assertThat(clusterState.routingTable().index(index), notNullValue()); + assertThat(clusterState.blocks().hasIndexBlock(index, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(false)); + } } static void assertException(final Throwable throwable, final String indexName) { diff --git a/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java index ddbbd0ea73a8f..e9e9108f5e8f1 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java @@ -21,7 +21,6 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; @@ -46,6 +45,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE; +import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed; +import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -53,7 +54,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; public class OpenCloseIndexIT extends ESIntegTestCase { public void testSimpleCloseOpen() { @@ -258,23 +258,6 @@ public void testOpenWaitingForActiveShardsFailed() throws Exception { ensureGreen("test"); } - private void assertIndexIsOpened(String... indices) { - checkIndexState(IndexMetaData.State.OPEN, indices); - } - - private void assertIndexIsClosed(String... indices) { - checkIndexState(IndexMetaData.State.CLOSE, indices); - } - - private void checkIndexState(IndexMetaData.State expectedState, String... indices) { - ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().execute().actionGet(); - for (String index : indices) { - IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get(index); - assertThat(indexMetaData, notNullValue()); - assertThat(indexMetaData.getState(), equalTo(expectedState)); - } - } - public void testOpenCloseWithDocs() throws IOException, ExecutionException, InterruptedException { String mapping = Strings.toString(XContentFactory.jsonBuilder(). startObject(). diff --git a/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java new file mode 100644 index 0000000000000..12d304c3bde78 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java @@ -0,0 +1,167 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.state; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Glob; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID; +import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed; +import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 2) +public class ReopenWhileClosingIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return singletonList(MockTransportService.TestPlugin.class); + } + + public void testReopenDuringClose() throws Exception { + final String indexName = "test"; + createIndexWithDocs(indexName); + + ensureClusterSizeConsistency(); + + final CountDownLatch block = new CountDownLatch(1); + final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(indexName, block::countDown); + + ActionFuture closeIndexResponse = client().admin().indices().prepareClose(indexName).execute(); + assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS)); + assertIndexIsBlocked(indexName); + assertFalse(closeIndexResponse.isDone()); + + assertAcked(client().admin().indices().prepareOpen(indexName)); + + releaseBlock.close(); + closeIndexResponse.get(); + assertIndexIsOpened(indexName); + } + + public void testReopenDuringCloseOnMultipleIndices() throws Exception { + final List indices = new ArrayList<>(); + for (int i = 0; i < randomIntBetween(2, 10); i++) { + indices.add("index-" + i); + createIndexWithDocs(indices.get(i)); + } + + ensureClusterSizeConsistency(); + + final CountDownLatch block = new CountDownLatch(1); + final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(randomFrom(indices), block::countDown); + + ActionFuture closeIndexResponse = client().admin().indices().prepareClose("index-*").execute(); + assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS)); + assertFalse(closeIndexResponse.isDone()); + indices.forEach(ReopenWhileClosingIT::assertIndexIsBlocked); + + final List reopenedIndices = randomSubsetOf(indices); + assertAcked(client().admin().indices().prepareOpen(reopenedIndices.toArray(Strings.EMPTY_ARRAY))); + + releaseBlock.close(); + closeIndexResponse.get(); + + indices.forEach(index -> { + if (reopenedIndices.contains(index)) { + assertIndexIsOpened(index); + } else { + assertIndexIsClosed(index); + } + }); + } + + private void createIndexWithDocs(final String indexName) { + createIndex(indexName); + final int nbDocs = scaledRandomIntBetween(1, 100); + for (int i = 0; i < nbDocs; i++) { + index(indexName, "_doc", String.valueOf(i), "num", i); + } + assertIndexIsOpened(indexName); + } + + /** + * Intercepts and blocks the {@link TransportVerifyShardBeforeCloseAction} executed for the given index pattern. + */ + private Releasable interceptVerifyShardBeforeCloseActions(final String indexPattern, final Runnable onIntercept) { + final MockTransportService mockTransportService = (MockTransportService) internalCluster() + .getInstance(TransportService.class, internalCluster().getMasterName()); + + final CountDownLatch release = new CountDownLatch(1); + for (DiscoveryNode node : internalCluster().clusterService().state().getNodes()) { + mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, node.getName()), + (connection, requestId, action, request, options) -> { + if (action.startsWith(TransportVerifyShardBeforeCloseAction.NAME)) { + if (request instanceof TransportVerifyShardBeforeCloseAction.ShardRequest) { + final String index = ((TransportVerifyShardBeforeCloseAction.ShardRequest) request).shardId().getIndexName(); + if (Glob.globMatch(indexPattern, index)) { + logger.info("request {} intercepted for index {}", requestId, index); + onIntercept.run(); + try { + release.await(); + logger.info("request {} released for index {}", requestId, index); + } catch (final InterruptedException e) { + throw new AssertionError(e); + } + } + } + + } + connection.sendRequest(requestId, action, request, options); + }); + } + final RunOnce releaseOnce = new RunOnce(release::countDown); + return releaseOnce::run; + } + + private static void assertIndexIsBlocked(final String... indices) { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + for (String index : indices) { + assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.OPEN)); + assertThat(clusterState.routingTable().index(index), notNullValue()); + assertThat("Index " + index + " must have only 1 block with [id=" + INDEX_CLOSED_BLOCK_ID + "]", + clusterState.blocks().indices().getOrDefault(index, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); + } + } +} From a2af914322501de18a30624df90029759a6c1c69 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 21 Dec 2018 12:03:29 +0100 Subject: [PATCH 08/13] Revert TcpTransport.java --- .../src/main/java/org/elasticsearch/transport/TcpTransport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index f1f26831ac62b..8edf97c929550 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -653,7 +653,7 @@ public void onFailure(Exception e) { } } } else { - logger.trace(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e); + logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e); // close the channel, which will cause a node to be disconnected if relevant CloseableChannel.closeChannel(channel); } From a1c1856ad155e1ad6c5abcbf736517137d9b1294 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 21 Dec 2018 13:41:41 +0100 Subject: [PATCH 09/13] Fix TransportVerifyShardBeforeCloseActionTests --- .../close/TransportVerifyShardBeforeCloseActionTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index cfb9a4dd67f61..3b887372b6929 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -105,8 +105,8 @@ public void setUp() throws Exception { clusterService = createClusterService(threadPool); clusterBlock = MetaDataIndexStateService.createIndexClosedBlock(); - when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test")) - .blocks(ClusterBlocks.builder().addIndexBlock("index", clusterBlock).build()).build()); + setState(clusterService, new ClusterState.Builder(clusterService.state()) + .blocks(ClusterBlocks.builder().blocks(clusterService.state().blocks()).addIndexBlock("index", clusterBlock).build()).build()); transport = new CapturingTransport(); TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool, From faab39a629269a62553b0ff87bd66dee32117efb Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 21 Dec 2018 15:52:41 +0100 Subject: [PATCH 10/13] Apply more feedback --- .../cluster/block/ClusterBlock.java | 3 +- .../cluster/block/ClusterBlocks.java | 14 ++++++ .../metadata/MetaDataIndexStateService.java | 49 +++++++++++++------ ...portVerifyShardBeforeCloseActionTests.java | 2 +- .../cluster/block/ClusterBlockTests.java | 30 +++++++++--- .../MetaDataIndexStateServiceTests.java | 2 +- .../indices/state/CloseIndexIT.java | 2 +- .../indices/state/ReopenWhileClosingIT.java | 8 +-- 8 files changed, 80 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java index 25e828dfe18f8..5713462b9212f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.block; import org.elasticsearch.Version; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -36,7 +37,7 @@ public class ClusterBlock implements Streamable, ToXContentFragment { private int id; - private String uuid; + private @Nullable String uuid; private String description; private EnumSet levels; private boolean retryable; diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java index 3ab5d752b9077..e487e03044f8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -159,6 +160,19 @@ public boolean hasIndexBlockWithId(String index, int blockId) { return false; } + @Nullable + public ClusterBlock getIndexBlockWithId(final String index, final int blockId) { + final Set clusterBlocks = indicesBlocks.get(index); + if (clusterBlocks != null) { + for (ClusterBlock clusterBlock : clusterBlocks) { + if (clusterBlock.id() == blockId) { + return clusterBlock; + } + } + } + return null; + } + public void globalBlockedRaiseException(ClusterBlockLevel level) throws ClusterBlockException { ClusterBlockException blockException = globalBlockedException(level); if (blockException != null) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index e76345054d345..808704d5a1297 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -48,6 +48,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.collect.ImmutableOpenIntMap; @@ -68,6 +69,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -147,9 +149,30 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta .execute(new WaitForClosedBlocksApplied(blockedIndices, timeout, ActionListener.wrap(results -> clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) { + + boolean acknowledged = true; + @Override public ClusterState execute(final ClusterState currentState) throws Exception { final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results); + // Combine the results of the verify shards before close actions with the cluster state changes + // to determine if the current close action effectively closed all indices. + for (Map.Entry result : results.entrySet()) { + IndexMetaData updatedMetaData = updatedState.metaData().index(result.getKey()); + if (updatedMetaData != null) { + if (result.getValue().isAcknowledged()) { + if (updatedMetaData.getState() == IndexMetaData.State.CLOSE) { + IndexMetaData previousMetaData = currentState.metaData().index(result.getKey()); + if (previousMetaData != null) { + acknowledged = (previousMetaData.getState() == IndexMetaData.State.OPEN); + } + } + } else { + acknowledged = false; + break; + } + } + } return allocationService.reroute(updatedState, "indices closed"); } @@ -161,7 +184,6 @@ public void onFailure(final String source, final Exception e) { @Override public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { - boolean acknowledged = results.values().stream().allMatch(AcknowledgedResponse::isAcknowledged); listener.onResponse(new AcknowledgedResponse(acknowledged)); } }), @@ -242,10 +264,10 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final Map levels = Arrays.asList(ClusterBlockLevel.values()); return new ClusterBlock(randomInt(), uuid, "cluster block #" + randomInt(), randomBoolean(), randomBoolean(), randomBoolean(), - randomFrom(RestStatus.values()), EnumSet.copyOf(randomSubsetOf(randomIntBetween(1, levels.size()), levels))); + randomFrom(RestStatus.values()), copyOf(randomSubsetOf(randomIntBetween(1, levels.size()), levels))); } private void assertClusterBlockEquals(final ClusterBlock expected, final ClusterBlock actual) { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 6931922b15839..c30925514bb93 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -89,7 +89,7 @@ public void testCloseRoutingTable() { state = addOpenedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state); nonBlockedIndices.add(state.metaData().index(indexName).getIndex()); } else { - final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosedBlock(); + final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock(); state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock); blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock); results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean())); diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index f6ec07c1af73a..a0304c96430f0 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -148,7 +148,7 @@ public void testConcurrentClose() throws InterruptedException { throw new AssertionError(e); } try { - assertAcked(client().admin().indices().prepareClose(indexName)); + client().admin().indices().prepareClose(indexName).get(); } catch (final Exception e) { assertException(e, indexName); } diff --git a/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java index 12d304c3bde78..7f45d74313c83 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java @@ -62,7 +62,7 @@ public void testReopenDuringClose() throws Exception { final String indexName = "test"; createIndexWithDocs(indexName); - ensureClusterSizeConsistency(); + ensureYellowAndNoInitializingShards(indexName); final CountDownLatch block = new CountDownLatch(1); final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(indexName, block::countDown); @@ -75,7 +75,7 @@ public void testReopenDuringClose() throws Exception { assertAcked(client().admin().indices().prepareOpen(indexName)); releaseBlock.close(); - closeIndexResponse.get(); + assertFalse(closeIndexResponse.get().isAcknowledged()); assertIndexIsOpened(indexName); } @@ -86,7 +86,7 @@ public void testReopenDuringCloseOnMultipleIndices() throws Exception { createIndexWithDocs(indices.get(i)); } - ensureClusterSizeConsistency(); + ensureYellowAndNoInitializingShards(indices.toArray(Strings.EMPTY_ARRAY)); final CountDownLatch block = new CountDownLatch(1); final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(randomFrom(indices), block::countDown); @@ -100,7 +100,7 @@ public void testReopenDuringCloseOnMultipleIndices() throws Exception { assertAcked(client().admin().indices().prepareOpen(reopenedIndices.toArray(Strings.EMPTY_ARRAY))); releaseBlock.close(); - closeIndexResponse.get(); + assertFalse(closeIndexResponse.get().isAcknowledged()); indices.forEach(index -> { if (reopenedIndices.contains(index)) { From 349f25456854ad43e3073bb6f6678c1ad290bd87 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 7 Jan 2019 11:08:32 +0100 Subject: [PATCH 11/13] Apply feedback --- .../metadata/MetaDataIndexStateService.java | 25 ++++++------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 14c8ed72ee2b7..aa4434a0a74c6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -152,22 +152,11 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta @Override public ClusterState execute(final ClusterState currentState) throws Exception { final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results); - // Combine the results of the verify shards before close actions with the cluster state changes - // to determine if the current close action effectively closed all indices. for (Map.Entry result : results.entrySet()) { IndexMetaData updatedMetaData = updatedState.metaData().index(result.getKey()); - if (updatedMetaData != null) { - if (result.getValue().isAcknowledged()) { - if (updatedMetaData.getState() == IndexMetaData.State.CLOSE) { - IndexMetaData previousMetaData = currentState.metaData().index(result.getKey()); - if (previousMetaData != null) { - acknowledged = (previousMetaData.getState() == IndexMetaData.State.OPEN); - } - } - } else { - acknowledged = false; - break; - } + if (updatedMetaData != null && updatedMetaData.getState() != IndexMetaData.State.CLOSE) { + acknowledged = false; + break; } } return allocationService.reroute(updatedState, "indices closed"); @@ -261,9 +250,11 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final Map Date: Mon, 7 Jan 2019 12:34:36 +0100 Subject: [PATCH 12/13] Fix ReopenWhileClosingIT (OpenIndexRequest requires a non empty list of indices to reopen) --- .../org/elasticsearch/indices/state/ReopenWhileClosingIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java index 7f45d74313c83..901c4f327af48 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java @@ -96,7 +96,7 @@ public void testReopenDuringCloseOnMultipleIndices() throws Exception { assertFalse(closeIndexResponse.isDone()); indices.forEach(ReopenWhileClosingIT::assertIndexIsBlocked); - final List reopenedIndices = randomSubsetOf(indices); + final List reopenedIndices = randomSubsetOf(randomIntBetween(1, indices.size()), indices); assertAcked(client().admin().indices().prepareOpen(reopenedIndices.toArray(Strings.EMPTY_ARRAY))); releaseBlock.close(); From 692ea01ce8610f240d223b1ead6384f2f40871de Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 7 Jan 2019 13:29:49 +0100 Subject: [PATCH 13/13] Unmute WaitForRefreshAndCloseIT --- .../org/elasticsearch/test/rest/WaitForRefreshAndCloseIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/distribution/archives/integ-test-zip/src/test/java/org/elasticsearch/test/rest/WaitForRefreshAndCloseIT.java b/distribution/archives/integ-test-zip/src/test/java/org/elasticsearch/test/rest/WaitForRefreshAndCloseIT.java index 2292def6d4a3b..52b1a8c52b58e 100644 --- a/distribution/archives/integ-test-zip/src/test/java/org/elasticsearch/test/rest/WaitForRefreshAndCloseIT.java +++ b/distribution/archives/integ-test-zip/src/test/java/org/elasticsearch/test/rest/WaitForRefreshAndCloseIT.java @@ -20,7 +20,6 @@ package org.elasticsearch.test.rest; import org.apache.http.util.EntityUtils; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Request; @@ -41,7 +40,6 @@ /** * Tests that wait for refresh is fired if the index is closed. */ -@LuceneTestCase.AwaitsFix(bugUrl = "to be created") public class WaitForRefreshAndCloseIT extends ESRestTestCase { @Before public void setupIndex() throws IOException {