diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 2c3d178db882c..9ae7d065dd949 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -108,13 +108,7 @@ private void executeShardOperation(final ShardRequest request, final IndexShard if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing"); } - - final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); - if (indexShard.getGlobalCheckpoint() != maxSeqNo) { - throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint() - + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); - } - + indexShard.verifyShardBeforeIndexClosing(); indexShard.flush(new FlushRequest().force(true)); logger.trace("{} shard is ready for closing", shardId); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e450e93e9d397..b79bb079d9427 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -265,6 +265,20 @@ protected final DocsStats docsStats(IndexReader indexReader) { return new DocsStats(numDocs, numDeletedDocs, sizeInBytes); } + /** + * Performs the pre-closing checks on the {@link Engine}. + * + * @throws IllegalStateException if the sanity checks failed + */ + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { + final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); + final long maxSeqNo = getSeqNoStats(globalCheckpoint).getMaxSeqNo(); + if (globalCheckpoint != maxSeqNo) { + throw new IllegalStateException("Global checkpoint [" + globalCheckpoint + + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); + } + } + /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 5c09708b62cae..a33f6f8fe27e0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -142,6 +142,16 @@ protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final } } + @Override + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { + // the value of the global checkpoint is verified when the read-only engine is opened, + // and it is not expected to change during the lifecycle of the engine. We could also + // check this value before closing the read-only engine but if something went wrong + // and the global checkpoint is not in-sync with the max. sequence number anymore, + // checking the value here again would prevent the read-only engine to be closed and + // reopened as an internal engine, which would be the path to fix the issue. + } + protected final DirectoryReader wrapReader(DirectoryReader reader, Function readerWrapperFunction) throws IOException { reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 22976af581be6..1ea894e7aed74 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3092,4 +3092,13 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo; } + + /** + * Performs the pre-closing checks on the {@link IndexShard}. + * + * @throws IllegalStateException if the sanity checks failed + */ + public void verifyShardBeforeIndexClosing() throws IllegalStateException { + getEngine().verifyEngineBeforeIndexClosing(); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 1b192edfda6e6..687b01680704e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -40,8 +40,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.seqno.SeqNoStats; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -73,6 +71,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -100,8 +99,6 @@ public void setUp() throws Exception { indexShard = mock(IndexShard.class); when(indexShard.getActiveOperationsCount()).thenReturn(0); - when(indexShard.getGlobalCheckpoint()).thenReturn(0L); - when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(0L, 0L, 0L)); final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3)); when(indexShard.shardId()).thenReturn(shardId); @@ -174,17 +171,16 @@ public void testOperationFailsWithNoBlock() { verify(indexShard, times(0)).flush(any(FlushRequest.class)); } - public void testOperationFailsWithGlobalCheckpointNotCaughtUp() { - final long maxSeqNo = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, Long.MAX_VALUE); - final long localCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, maxSeqNo); - final long globalCheckpoint = randomValueOtherThan(maxSeqNo, - () -> randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, localCheckpoint)); - when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint)); - when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint); + public void testVerifyShardBeforeIndexClosing() throws Exception { + executeOnPrimaryOrReplica(); + verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); + verify(indexShard, times(1)).flush(any(FlushRequest.class)); + } - IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); - assertThat(exception.getMessage(), equalTo("Global checkpoint [" + globalCheckpoint + "] mismatches maximum sequence number [" - + maxSeqNo + "] on index shard " + indexShard.shardId())); + public void testVerifyShardBeforeIndexClosingFailed() { + doThrow(new IllegalStateException("test")).when(indexShard).verifyShardBeforeIndexClosing(); + expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); + verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); verify(indexShard, times(0)).flush(any(FlushRequest.class)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index b345afe9b8f89..87bf9b4c3de04 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -189,4 +189,25 @@ public void testReadOnly() throws IOException { } } } + + /** + * Test that {@link ReadOnlyEngine#verifyEngineBeforeIndexClosing()} never fails + * whatever the value of the global checkpoint to check is. + */ + public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + store.createEmpty(Version.CURRENT.luceneVersion); + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { + globalCheckpoint.set(randomNonNegativeLong()); + try { + readOnlyEngine.verifyEngineBeforeIndexClosing(); + } catch (final IllegalStateException e) { + fail("Read-only engine pre-closing verifications failed"); + } + } + } + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 23157c177816f..c779b491d581e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -195,4 +195,12 @@ private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { public long getNumberOfOptimizedIndexing() { return numOfOptimizedIndexing.count(); } + + @Override + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { + // the value of the global checkpoint is not verified when the following engine is closed, + // allowing it to be closed even in the case where all operations have not been fetched and + // processed from the leader and the operations history has gaps. This way the following + // engine can be closed and reopened in order to bootstrap the follower index again. + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java new file mode 100644 index 0000000000000..0551d30c2e73a --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class CloseFollowerIndexIT extends CcrIntegTestCase { + + public void testCloseAndReopenFollowerIndex() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + PutFollowAction.Request followRequest = new PutFollowAction.Request(); + followRequest.setRemoteCluster("leader_cluster"); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowerIndex("index2"); + followRequest.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10)); + followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10)); + followRequest.getParameters().setMaxReadRequestSize(new ByteSizeValue(1)); + followRequest.getParameters().setMaxOutstandingReadRequests(128); + followRequest.waitForActiveShards(ActiveShardCount.DEFAULT); + + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + ensureFollowerGreen("index2"); + + AtomicBoolean isRunning = new AtomicBoolean(true); + int numThreads = 4; + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread(() -> { + while (isRunning.get()) { + leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + } + }); + threads[i].start(); + } + + atLeastDocsIndexed(followerClient(), "index2", 32); + AcknowledgedResponse response = followerClient().admin().indices().close(new CloseIndexRequest("index2")).get(); + assertThat(response.isAcknowledged(), is(true)); + + ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); + List blocks = new ArrayList<>(clusterState.getBlocks().indices().get("index2")); + assertThat(blocks.size(), equalTo(1)); + assertThat(blocks.get(0).id(), equalTo(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)); + + isRunning.set(false); + for (Thread thread : threads) { + thread.join(); + } + assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2")).get()); + + refresh(leaderClient(), "index1"); + SearchRequest leaderSearchRequest = new SearchRequest("index1"); + leaderSearchRequest.source().trackTotalHits(true); + long leaderIndexDocs = leaderClient().search(leaderSearchRequest).actionGet().getHits().getTotalHits().value; + assertBusy(() -> { + refresh(followerClient(), "index2"); + SearchRequest followerSearchRequest = new SearchRequest("index2"); + followerSearchRequest.source().trackTotalHits(true); + long followerIndexDocs = followerClient().search(followerSearchRequest).actionGet().getHits().getTotalHits().value; + assertThat(followerIndexDocs, equalTo(leaderIndexDocs)); + }); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index df406a4c09a68..67d31ff39007f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -640,4 +640,23 @@ public void testProcessOnceOnPrimary() throws Exception { } } } + + /** + * Test that {@link FollowingEngine#verifyEngineBeforeIndexClosing()} never fails + * whatever the value of the global checkpoint to check is. + */ + public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { + final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); + runIndexTest( + seqNo, + Engine.Operation.Origin.PRIMARY, + (followingEngine, index) -> { + globalCheckpoint.set(randomNonNegativeLong()); + try { + followingEngine.verifyEngineBeforeIndexClosing(); + } catch (final IllegalStateException e) { + fail("Following engine pre-closing verifications failed"); + } + }); + } }