From b6ea6dffda07fcf05e75e2ceb0ee6c1a722fcad5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 6 May 2019 19:48:24 -0400 Subject: [PATCH 1/4] Closed indices should not sync global checkpoint --- .../indices/cluster/IndicesClusterStateService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 821e095fc20b0..df630c33aed24 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -600,7 +600,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR } try { - final long primaryTerm = state.metaData().index(shardRouting.index()).primaryTerm(shardRouting.id()); + final IndexMetaData indexMetaData = state.metaData().getIndexSafe(shardRouting.index()); + final long primaryTerm = indexMetaData.primaryTerm(shardRouting.id()); logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); indicesService.createShard( @@ -610,7 +611,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR new RecoveryListener(shardRouting, primaryTerm), repositoriesService, failedShardHandler, - globalCheckpointSyncer, + indexMetaData.getState() == IndexMetaData.State.CLOSE ? shardId -> {} : globalCheckpointSyncer, retentionLeaseSyncer); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); From 0e2417730ad091024800a4e25b19fa3c5a7761a3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 7 May 2019 04:16:03 -0400 Subject: [PATCH 2/4] add allow closed indices --- .../TransportReplicationAction.java | 10 ++++- .../seqno/GlobalCheckpointSyncAction.java | 21 +++++++++ .../cluster/IndicesClusterStateService.java | 5 +-- .../indices/state/CloseIndexIT.java | 43 +++++++++++++++++++ 4 files changed, 75 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 6edaa95033997..dfdc76141697e 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -225,6 +225,13 @@ protected boolean resolveIndex() { return true; } + /** + * Returns true if this action allows closed indices; defaults to false. + */ + protected boolean allowClosedIndices() { + return false; + } + protected TransportRequestOptions transportOptions(Settings settings) { return TransportRequestOptions.EMPTY; } @@ -659,7 +666,8 @@ protected void doRun() { retry(new IndexNotFoundException(concreteIndex)); return; } - if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + + if (allowClosedIndices() == false && indexMetaData.getState() == IndexMetaData.State.CLOSE) { throw new IndexClosedException(indexMetaData.getIndex()); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index d67cbc833d666..bd6f4383338a5 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -123,6 +124,26 @@ private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { } } + @Override + protected ClusterBlockLevel globalBlockLevel() { + return null; // internal action - never block it + } + + @Override + public ClusterBlockLevel indexBlockLevel() { + return null; // internal action - never block it + } + + @Override + protected boolean resolveIndex() { + return false; // single index - don't resolve + } + + @Override + protected boolean allowClosedIndices() { + return true; // it's okay to sync global checkpoints on closed indices + } + public static final class Request extends ReplicationRequest { private Request(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index df630c33aed24..821e095fc20b0 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -600,8 +600,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR } try { - final IndexMetaData indexMetaData = state.metaData().getIndexSafe(shardRouting.index()); - final long primaryTerm = indexMetaData.primaryTerm(shardRouting.id()); + final long primaryTerm = state.metaData().index(shardRouting.index()).primaryTerm(shardRouting.id()); logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); indicesService.createShard( @@ -611,7 +610,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR new RecoveryListener(shardRouting, primaryTerm), repositoriesService, failedShardHandler, - indexMetaData.getState() == IndexMetaData.State.CLOSE ? shardId -> {} : globalCheckpointSyncer, + globalCheckpointSyncer, retentionLeaseSyncer); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 740034f12ecc5..098c9b9555174 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -36,18 +36,25 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.util.Collections.emptySet; import static java.util.stream.Collectors.toList; @@ -75,6 +82,11 @@ public Settings indexSettings() { return super.indexSettings(); } + @Override + protected Collection> nodePlugins() { + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList()); + } + public void testCloseMissingIndex() { IndexNotFoundException e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareClose("test").get()); assertThat(e.getMessage(), is("no such index [test]")); @@ -421,6 +433,37 @@ public Settings onNodeStopped(String nodeName) throws Exception { } } + public void testSyncGlobalCheckpointClosedIndices() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(3); + final String indexName = "resync_closed_indices"; + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + assertAcked(client().admin().indices().prepareClose(indexName)); + assertIndexIsClosed(indexName); + ensureGreen(indexName); + ShardRouting primary = clusterService().state().routingTable().index(indexName).shard(0).primaryShard(); + AtomicBoolean requestSent = new AtomicBoolean(); + for (DiscoveryNode node : clusterService().state().nodes()) { + MockTransportService transportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, node.getName()); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.startsWith(GlobalCheckpointSyncAction.ACTION_NAME)) { + requestSent.set(true); + } + connection.sendRequest(requestId, action, request, options); + }); + } + internalCluster().restartNode(clusterService().state().nodes().get(primary.currentNodeId()).getName(), + new InternalTestCluster.RestartCallback()); + ensureGreen(indexName); + assertTrue(requestSent.get()); + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { From 543222791bea918203cbf2b857c52a27c53df27d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 7 May 2019 04:29:16 -0400 Subject: [PATCH 3/4] ignore index level blocks only --- .../index/seqno/GlobalCheckpointSyncAction.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index bd6f4383338a5..5bce408ba5cdd 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -124,11 +124,6 @@ private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { } } - @Override - protected ClusterBlockLevel globalBlockLevel() { - return null; // internal action - never block it - } - @Override public ClusterBlockLevel indexBlockLevel() { return null; // internal action - never block it From 94bff3856ea0c23721f253f46148f79ddd69f337 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 20 May 2019 11:32:54 -0400 Subject: [PATCH 4/4] skip global checkpoint sync for closed indices --- .../TransportReplicationAction.java | 10 +---- .../seqno/GlobalCheckpointSyncAction.java | 16 ------- .../elasticsearch/index/shard/IndexShard.java | 4 +- .../index/shard/IndexShardTests.java | 27 ++++++++++++ .../indices/state/CloseIndexIT.java | 42 ++++++------------- 5 files changed, 43 insertions(+), 56 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index dfdc76141697e..6edaa95033997 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -225,13 +225,6 @@ protected boolean resolveIndex() { return true; } - /** - * Returns true if this action allows closed indices; defaults to false. - */ - protected boolean allowClosedIndices() { - return false; - } - protected TransportRequestOptions transportOptions(Settings settings) { return TransportRequestOptions.EMPTY; } @@ -666,8 +659,7 @@ protected void doRun() { retry(new IndexNotFoundException(concreteIndex)); return; } - - if (allowClosedIndices() == false && indexMetaData.getState() == IndexMetaData.State.CLOSE) { + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { throw new IndexClosedException(indexMetaData.getIndex()); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 5bce408ba5cdd..d67cbc833d666 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -124,21 +123,6 @@ private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { } } - @Override - public ClusterBlockLevel indexBlockLevel() { - return null; // internal action - never block it - } - - @Override - protected boolean resolveIndex() { - return false; // single index - don't resolve - } - - @Override - protected boolean allowClosedIndices() { - return true; // it's okay to sync global checkpoints on closed indices - } - public static final class Request extends ReplicationRequest { private Request(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 11e4fb81d9fbe..e4f4fada38370 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2137,8 +2137,8 @@ public void maybeSyncGlobalCheckpoint(final String reason) { StreamSupport .stream(globalCheckpoints.values().spliterator(), false) .anyMatch(v -> v.value < globalCheckpoint); - // only sync if there is a shard lagging the primary - if (syncNeeded) { + // only sync if index is not closed and there is a shard lagging the primary + if (syncNeeded && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN) { logger.trace("syncing global checkpoint for [{}]", reason); globalCheckpointSyncer.run(); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0be7b4433fac3..4cfa26e779f71 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1099,6 +1099,33 @@ public void testGlobalCheckpointSync() throws IOException { closeShards(replicaShard, primaryShard); } + public void testClosedIndicesSkipSyncGlobalCheckpoint() throws Exception { + ShardId shardId = new ShardId("index", "_na_", 0); + IndexMetaData.Builder indexMetadata = IndexMetaData.builder("index") + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)) + .state(IndexMetaData.State.CLOSE).primaryTerm(0, 1); + ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(8), true, + ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE); + AtomicBoolean synced = new AtomicBoolean(); + IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, new InternalEngineFactory(), + () -> synced.set(true), RetentionLeaseSyncer.EMPTY); + recoverShardFromStore(primaryShard); + IndexShard replicaShard = newShard(shardId, false); + recoverReplica(replicaShard, primaryShard, true); + int numDocs = between(1, 10); + for (int i = 0; i < numDocs; i++) { + indexDoc(primaryShard, "_doc", Integer.toString(i)); + } + assertThat(primaryShard.getLocalCheckpoint(), equalTo(numDocs - 1L)); + primaryShard.updateLocalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), primaryShard.getLocalCheckpoint()); + long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, primaryShard.getLocalCheckpoint()); + primaryShard.updateGlobalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), globalCheckpointOnReplica); + primaryShard.maybeSyncGlobalCheckpoint("test"); + assertFalse("closed indices should skip global checkpoint sync", synced.get()); + closeShards(primaryShard, replicaShard); + } + public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 098c9b9555174..6f666483b18d0 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -36,25 +36,20 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportService; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; import static java.util.Collections.emptySet; import static java.util.stream.Collectors.toList; @@ -82,11 +77,6 @@ public Settings indexSettings() { return super.indexSettings(); } - @Override - protected Collection> nodePlugins() { - return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList()); - } - public void testCloseMissingIndex() { IndexNotFoundException e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareClose("test").get()); assertThat(e.getMessage(), is("no such index [test]")); @@ -433,9 +423,9 @@ public Settings onNodeStopped(String nodeName) throws Exception { } } - public void testSyncGlobalCheckpointClosedIndices() throws Exception { + public void testResyncPropagatePrimaryTerm() throws Exception { internalCluster().ensureAtLeastNumDataNodes(3); - final String indexName = "resync_closed_indices"; + final String indexName = "closed_indices_promotion"; createIndex(indexName, Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) @@ -446,22 +436,16 @@ public void testSyncGlobalCheckpointClosedIndices() throws Exception { assertAcked(client().admin().indices().prepareClose(indexName)); assertIndexIsClosed(indexName); ensureGreen(indexName); - ShardRouting primary = clusterService().state().routingTable().index(indexName).shard(0).primaryShard(); - AtomicBoolean requestSent = new AtomicBoolean(); - for (DiscoveryNode node : clusterService().state().nodes()) { - MockTransportService transportService = - (MockTransportService) internalCluster().getInstance(TransportService.class, node.getName()); - transportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.startsWith(GlobalCheckpointSyncAction.ACTION_NAME)) { - requestSent.set(true); - } - connection.sendRequest(requestId, action, request, options); - }); - } - internalCluster().restartNode(clusterService().state().nodes().get(primary.currentNodeId()).getName(), - new InternalTestCluster.RestartCallback()); + String nodeWithPrimary = clusterService().state().nodes().get(clusterService().state() + .routingTable().index(indexName).shard(0).primaryShard().currentNodeId()).getName(); + internalCluster().restartNode(nodeWithPrimary, new InternalTestCluster.RestartCallback()); ensureGreen(indexName); - assertTrue(requestSent.get()); + long primaryTerm = clusterService().state().metaData().index(indexName).primaryTerm(0); + for (String nodeName : internalCluster().nodesInclude(indexName)) { + IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeName) + .indexService(resolveIndex(indexName)).getShard(0); + assertThat(shard.routingEntry().toString(), shard.getOperationPrimaryTerm(), equalTo(primaryTerm)); + } } static void assertIndexIsClosed(final String... indices) {