From b9308b3d9acccce5f2db7e4beb0820220d4cd493 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 11 Feb 2019 10:46:02 +0100 Subject: [PATCH 1/3] Specialize pre-closing checks for engine implementations --- ...TransportVerifyShardBeforeCloseAction.java | 8 +- .../elasticsearch/index/engine/Engine.java | 7 ++ .../index/engine/InternalEngine.java | 9 ++ .../index/engine/ReadOnlyEngine.java | 10 +++ .../elasticsearch/index/shard/IndexShard.java | 9 ++ ...portVerifyShardBeforeCloseActionTests.java | 24 +++-- .../index/engine/ReadOnlyEngineTests.java | 21 +++++ .../ccr/index/engine/FollowingEngine.java | 8 ++ .../xpack/ccr/CloseFollowerIndexIT.java | 90 +++++++++++++++++++ .../index/engine/FollowingEngineTests.java | 19 ++++ 10 files changed, 184 insertions(+), 21 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 2c3d178db882c..ad17ae11f0eb2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -108,13 +108,7 @@ private void executeShardOperation(final ShardRequest request, final IndexShard if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing"); } - - final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); - if (indexShard.getGlobalCheckpoint() != maxSeqNo) { - throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint() - + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); - } - + indexShard.checkIndexBeforeClose(); indexShard.flush(new FlushRequest().force(true)); logger.trace("{} shard is ready for closing", shardId); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e450e93e9d397..4993219a713ae 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -265,6 +265,13 @@ protected final DocsStats docsStats(IndexReader indexReader) { return new DocsStats(numDocs, numDeletedDocs, sizeInBytes); } + /** + * Check the consistency of the given global checkpoint with the engine before closing it + * + * @param globalCheckpoint the value of the global checkpoint + */ + public abstract void checkGlobalCheckpointBeforeClose(long globalCheckpoint); + /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index b2143dcc0407f..27eb418e64fd9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2460,6 +2460,15 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) { return localCheckpointTracker.getStats(globalCheckpoint); } + @Override + public void checkGlobalCheckpointBeforeClose(final long globalCheckpoint) { + final long maxSeqNo = getSeqNoStats(globalCheckpoint).getMaxSeqNo(); + if (globalCheckpoint != maxSeqNo) { + throw new IllegalStateException("Global checkpoint [" + globalCheckpoint + + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); + } + } + /** * Returns the number of times a version was looked up either from the index. * Note this is only available if assertions are enabled diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 5c09708b62cae..ee17a6be06d18 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -142,6 +142,16 @@ protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final } } + @Override + public void checkGlobalCheckpointBeforeClose(final long globalCheckpoint) { + // the value of the global checkpoint is verified when the read-only engine is opened, + // and it is not expected to change during the lifecycle of the engine. We could also + // check this value before closing the read-only engine but if something went wrong + // and the global checkpoint is not in-sync with the max. sequence number anymore, + // checking the value here again would prevent the read-only engine to be closed and + // reopened as an internal engine, which would be the path to fix the issue. + } + protected final DirectoryReader wrapReader(DirectoryReader reader, Function readerWrapperFunction) throws IOException { reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 22976af581be6..5fa32b56c877f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3092,4 +3092,13 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo; } + + /** + * Performs the pre-closing checks on the {@link IndexShard}. + * + * @throws IllegalStateException if the sanity checks failed + */ + public void checkIndexBeforeClose() throws IllegalStateException { + getEngine().checkGlobalCheckpointBeforeClose(replicationTracker.getGlobalCheckpoint()); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 1b192edfda6e6..a21238085241e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -40,8 +40,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.seqno.SeqNoStats; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -73,6 +71,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -100,8 +99,6 @@ public void setUp() throws Exception { indexShard = mock(IndexShard.class); when(indexShard.getActiveOperationsCount()).thenReturn(0); - when(indexShard.getGlobalCheckpoint()).thenReturn(0L); - when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(0L, 0L, 0L)); final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3)); when(indexShard.shardId()).thenReturn(shardId); @@ -174,17 +171,16 @@ public void testOperationFailsWithNoBlock() { verify(indexShard, times(0)).flush(any(FlushRequest.class)); } - public void testOperationFailsWithGlobalCheckpointNotCaughtUp() { - final long maxSeqNo = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, Long.MAX_VALUE); - final long localCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, maxSeqNo); - final long globalCheckpoint = randomValueOtherThan(maxSeqNo, - () -> randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, localCheckpoint)); - when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint)); - when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint); + public void testCheckIndexBeforeClose() throws Exception { + executeOnPrimaryOrReplica(); + verify(indexShard, times(1)).checkIndexBeforeClose(); + verify(indexShard, times(1)).flush(any(FlushRequest.class)); + } - IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); - assertThat(exception.getMessage(), equalTo("Global checkpoint [" + globalCheckpoint + "] mismatches maximum sequence number [" - + maxSeqNo + "] on index shard " + indexShard.shardId())); + public void testCheckIndexBeforeCloseFailed() { + doThrow(new IllegalStateException("test")).when(indexShard).checkIndexBeforeClose(); + expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); + verify(indexShard, times(1)).checkIndexBeforeClose(); verify(indexShard, times(0)).flush(any(FlushRequest.class)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index b345afe9b8f89..532f213057326 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -189,4 +189,25 @@ public void testReadOnly() throws IOException { } } } + + /** + * Test that {@link ReadOnlyEngine#checkGlobalCheckpointBeforeClose(long)} never fails + * whatever the value of the global checkpoint to check is. + */ + public void testCheckGlobalCheckpointBeforeCloseIsNoOp() throws IOException { + IOUtils.close(engine, store); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, + () -> SequenceNumbers.NO_OPS_PERFORMED); + store.createEmpty(Version.CURRENT.luceneVersion); + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { + final long globalCheckpoint = randomNonNegativeLong(); + try { + readOnlyEngine.checkGlobalCheckpointBeforeClose(globalCheckpoint); + } catch (final IllegalStateException e) { + fail("Read-only engine failed when checking the global checkpoint value [" + globalCheckpoint + "] before close"); + } + } + } + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 23157c177816f..79a3d1140cf11 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -195,4 +195,12 @@ private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { public long getNumberOfOptimizedIndexing() { return numOfOptimizedIndexing.count(); } + + @Override + public void checkGlobalCheckpointBeforeClose(final long globalCheckpoint) { + // the value of the global checkpoint is not verified when the following engine is closed, + // allowing it to be closed even in the case where all operations have not been fetched and + // processed from the leader and the operations history has gaps. This way the following + // engine can be closed and reopened in order to bootstrap the follower index again. + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java new file mode 100644 index 0000000000000..fb4eee80ae753 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class CloseFollowerIndexIT extends CcrIntegTestCase { + + public void testCloseAndReopenFollowerIndex() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + PutFollowAction.Request followRequest = new PutFollowAction.Request(); + followRequest.setRemoteCluster("leader_cluster"); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowerIndex("index2"); + followRequest.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10)); + followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10)); + followRequest.getParameters().setMaxReadRequestSize(new ByteSizeValue(1)); + followRequest.getParameters().setMaxOutstandingReadRequests(128); + followRequest.waitForActiveShards(ActiveShardCount.DEFAULT); + + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + ensureFollowerGreen("index2"); + + AtomicBoolean isRunning = new AtomicBoolean(true); + int numThreads = 4; + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread(() -> { + while (isRunning.get()) { + leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + } + }); + threads[i].start(); + } + + atLeastDocsIndexed(followerClient(), "index2", 32); + AcknowledgedResponse response = followerClient().admin().indices().close(new CloseIndexRequest("index2")).get(); + assertThat(response.isAcknowledged(), is(true)); + + ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); + List blocks = new ArrayList<>(clusterState.getBlocks().indices().get("index2")); + assertThat(blocks.size(), equalTo(1)); + assertThat(blocks.get(0).id(), equalTo(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)); + + isRunning.set(false); + for (Thread thread : threads) { + thread.join(); + } + assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2")).get()); + + refresh(leaderClient(), "index1"); + SearchRequest leaderSearchRequest = new SearchRequest("index1"); + leaderSearchRequest.source().trackTotalHits(true); + long leaderIndexDocs = leaderClient().search(leaderSearchRequest).actionGet().getHits().getTotalHits().value; + assertBusy(() -> { + SearchRequest followerSearchRequest = new SearchRequest("index2"); + followerSearchRequest.source().trackTotalHits(true); + long followerIndexDocs = followerClient().search(followerSearchRequest).actionGet().getHits().getTotalHits().value; + assertThat(followerIndexDocs, equalTo(leaderIndexDocs)); + }); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index df406a4c09a68..9249b6518e82d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -640,4 +640,23 @@ public void testProcessOnceOnPrimary() throws Exception { } } } + + /** + * Test that {@link FollowingEngine#checkGlobalCheckpointBeforeClose(long)} never fails + * whatever the value of the global checkpoint to check is. + */ + public void testCheckGlobalCheckpointBeforeCloseIsNoOp() throws IOException { + final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); + runIndexTest( + seqNo, + Engine.Operation.Origin.PRIMARY, + (followingEngine, index) -> { + final long globalCheckpoint = randomNonNegativeLong(); + try { + followingEngine.checkGlobalCheckpointBeforeClose(globalCheckpoint); + } catch (final IllegalStateException e) { + fail("Following engine failed when checking the global checkpoint value [" + globalCheckpoint + "] before close"); + } + }); + } } From 680af384dd39d42e523e15dfe110750b4d363e65 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 11 Feb 2019 11:36:15 +0100 Subject: [PATCH 2/3] Apply feedback --- .../TransportVerifyShardBeforeCloseAction.java | 2 +- .../org/elasticsearch/index/engine/Engine.java | 13 ++++++++++--- .../elasticsearch/index/engine/InternalEngine.java | 9 --------- .../elasticsearch/index/engine/ReadOnlyEngine.java | 2 +- .../org/elasticsearch/index/shard/IndexShard.java | 4 ++-- ...TransportVerifyShardBeforeCloseActionTests.java | 10 +++++----- .../index/engine/ReadOnlyEngineTests.java | 14 +++++++------- .../xpack/ccr/index/engine/FollowingEngine.java | 2 +- .../ccr/index/engine/FollowingEngineTests.java | 10 +++++----- 9 files changed, 32 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index ad17ae11f0eb2..9ae7d065dd949 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -108,7 +108,7 @@ private void executeShardOperation(final ShardRequest request, final IndexShard if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing"); } - indexShard.checkIndexBeforeClose(); + indexShard.verifyShardBeforeIndexClosing(); indexShard.flush(new FlushRequest().force(true)); logger.trace("{} shard is ready for closing", shardId); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 4993219a713ae..b79bb079d9427 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -266,11 +266,18 @@ protected final DocsStats docsStats(IndexReader indexReader) { } /** - * Check the consistency of the given global checkpoint with the engine before closing it + * Performs the pre-closing checks on the {@link Engine}. * - * @param globalCheckpoint the value of the global checkpoint + * @throws IllegalStateException if the sanity checks failed */ - public abstract void checkGlobalCheckpointBeforeClose(long globalCheckpoint); + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { + final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); + final long maxSeqNo = getSeqNoStats(globalCheckpoint).getMaxSeqNo(); + if (globalCheckpoint != maxSeqNo) { + throw new IllegalStateException("Global checkpoint [" + globalCheckpoint + + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); + } + } /** * A throttling class that can be activated, causing the diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 27eb418e64fd9..b2143dcc0407f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2460,15 +2460,6 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) { return localCheckpointTracker.getStats(globalCheckpoint); } - @Override - public void checkGlobalCheckpointBeforeClose(final long globalCheckpoint) { - final long maxSeqNo = getSeqNoStats(globalCheckpoint).getMaxSeqNo(); - if (globalCheckpoint != maxSeqNo) { - throw new IllegalStateException("Global checkpoint [" + globalCheckpoint - + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); - } - } - /** * Returns the number of times a version was looked up either from the index. * Note this is only available if assertions are enabled diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index ee17a6be06d18..a33f6f8fe27e0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -143,7 +143,7 @@ protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final } @Override - public void checkGlobalCheckpointBeforeClose(final long globalCheckpoint) { + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { // the value of the global checkpoint is verified when the read-only engine is opened, // and it is not expected to change during the lifecycle of the engine. We could also // check this value before closing the read-only engine but if something went wrong diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5fa32b56c877f..1ea894e7aed74 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3098,7 +3098,7 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { * * @throws IllegalStateException if the sanity checks failed */ - public void checkIndexBeforeClose() throws IllegalStateException { - getEngine().checkGlobalCheckpointBeforeClose(replicationTracker.getGlobalCheckpoint()); + public void verifyShardBeforeIndexClosing() throws IllegalStateException { + getEngine().verifyEngineBeforeIndexClosing(); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index a21238085241e..687b01680704e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -171,16 +171,16 @@ public void testOperationFailsWithNoBlock() { verify(indexShard, times(0)).flush(any(FlushRequest.class)); } - public void testCheckIndexBeforeClose() throws Exception { + public void testVerifyShardBeforeIndexClosing() throws Exception { executeOnPrimaryOrReplica(); - verify(indexShard, times(1)).checkIndexBeforeClose(); + verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); verify(indexShard, times(1)).flush(any(FlushRequest.class)); } - public void testCheckIndexBeforeCloseFailed() { - doThrow(new IllegalStateException("test")).when(indexShard).checkIndexBeforeClose(); + public void testVerifyShardBeforeIndexClosingFailed() { + doThrow(new IllegalStateException("test")).when(indexShard).verifyShardBeforeIndexClosing(); expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); - verify(indexShard, times(1)).checkIndexBeforeClose(); + verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); verify(indexShard, times(0)).flush(any(FlushRequest.class)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 532f213057326..87bf9b4c3de04 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -191,21 +191,21 @@ public void testReadOnly() throws IOException { } /** - * Test that {@link ReadOnlyEngine#checkGlobalCheckpointBeforeClose(long)} never fails + * Test that {@link ReadOnlyEngine#verifyEngineBeforeIndexClosing()} never fails * whatever the value of the global checkpoint to check is. */ - public void testCheckGlobalCheckpointBeforeCloseIsNoOp() throws IOException { + public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, - () -> SequenceNumbers.NO_OPS_PERFORMED); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); store.createEmpty(Version.CURRENT.luceneVersion); try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { - final long globalCheckpoint = randomNonNegativeLong(); + globalCheckpoint.set(randomNonNegativeLong()); try { - readOnlyEngine.checkGlobalCheckpointBeforeClose(globalCheckpoint); + readOnlyEngine.verifyEngineBeforeIndexClosing(); } catch (final IllegalStateException e) { - fail("Read-only engine failed when checking the global checkpoint value [" + globalCheckpoint + "] before close"); + fail("Read-only engine pre-closing verifications failed"); } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 79a3d1140cf11..c779b491d581e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -197,7 +197,7 @@ public long getNumberOfOptimizedIndexing() { } @Override - public void checkGlobalCheckpointBeforeClose(final long globalCheckpoint) { + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { // the value of the global checkpoint is not verified when the following engine is closed, // allowing it to be closed even in the case where all operations have not been fetched and // processed from the leader and the operations history has gaps. This way the following diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 9249b6518e82d..67d31ff39007f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -642,20 +642,20 @@ public void testProcessOnceOnPrimary() throws Exception { } /** - * Test that {@link FollowingEngine#checkGlobalCheckpointBeforeClose(long)} never fails + * Test that {@link FollowingEngine#verifyEngineBeforeIndexClosing()} never fails * whatever the value of the global checkpoint to check is. */ - public void testCheckGlobalCheckpointBeforeCloseIsNoOp() throws IOException { + public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); runIndexTest( seqNo, Engine.Operation.Origin.PRIMARY, (followingEngine, index) -> { - final long globalCheckpoint = randomNonNegativeLong(); + globalCheckpoint.set(randomNonNegativeLong()); try { - followingEngine.checkGlobalCheckpointBeforeClose(globalCheckpoint); + followingEngine.verifyEngineBeforeIndexClosing(); } catch (final IllegalStateException e) { - fail("Following engine failed when checking the global checkpoint value [" + globalCheckpoint + "] before close"); + fail("Following engine pre-closing verifications failed"); } }); } From b684a74e79521cf35dfdd92f51a117a4f7f616d0 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 11 Feb 2019 12:00:58 +0100 Subject: [PATCH 3/3] Refresh index2 --- .../java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java index fb4eee80ae753..0551d30c2e73a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java @@ -81,6 +81,7 @@ public void testCloseAndReopenFollowerIndex() throws Exception { leaderSearchRequest.source().trackTotalHits(true); long leaderIndexDocs = leaderClient().search(leaderSearchRequest).actionGet().getHits().getTotalHits().value; assertBusy(() -> { + refresh(followerClient(), "index2"); SearchRequest followerSearchRequest = new SearchRequest("index2"); followerSearchRequest.source().trackTotalHits(true); long followerIndexDocs = followerClient().search(followerSearchRequest).actionGet().getHits().getTotalHits().value;