Skip to content

Commit bcbf1af

Browse files
committed
Peer recovery should flush at the end (#41660)
Flushing at the end of a peer recovery (if needed) can bring these benefits: 1. Closing an index won't end up with the red state for a recovering replica should always be ready for closing whether it performs the verifying-before-close step or not. 2. Good opportunities to compact store (i.e., flushing and merging Lucene, and trimming translog) Closes #40024 Closes #39588
1 parent 84df48c commit bcbf1af

File tree

2 files changed

+52
-0
lines changed

2 files changed

+52
-0
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.ExceptionsHelper;
2929
import org.elasticsearch.Version;
3030
import org.elasticsearch.action.ActionListener;
31+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
3132
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.common.UUIDs;
3334
import org.elasticsearch.common.bytes.BytesReference;
@@ -39,6 +40,7 @@
3940
import org.elasticsearch.index.mapper.MapperException;
4041
import org.elasticsearch.index.seqno.ReplicationTracker;
4142
import org.elasticsearch.index.seqno.RetentionLeases;
43+
import org.elasticsearch.index.seqno.SequenceNumbers;
4244
import org.elasticsearch.index.shard.IndexShard;
4345
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
4446
import org.elasticsearch.index.shard.IndexShardState;
@@ -299,11 +301,19 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> l
299301
// Persist the global checkpoint.
300302
indexShard.sync();
301303
indexShard.persistRetentionLeases();
304+
if (hasUncommittedOperations()) {
305+
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
306+
}
302307
indexShard.finalizeRecovery();
303308
return null;
304309
});
305310
}
306311

312+
private boolean hasUncommittedOperations() throws IOException {
313+
long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
314+
return indexShard.estimateNumberOfHistoryOperations("peer-recovery", localCheckpointOfCommit + 1) > 0;
315+
}
316+
307317
@Override
308318
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
309319
indexShard.activateWithPrimaryContext(primaryContext);

server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
3232
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
3333
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
34+
import org.elasticsearch.action.admin.indices.stats.ShardStats;
3435
import org.elasticsearch.action.index.IndexRequestBuilder;
3536
import org.elasticsearch.action.search.SearchResponse;
3637
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
@@ -52,9 +53,12 @@
5253
import org.elasticsearch.index.analysis.TokenFilterFactory;
5354
import org.elasticsearch.index.mapper.MapperParsingException;
5455
import org.elasticsearch.index.recovery.RecoveryStats;
56+
import org.elasticsearch.index.seqno.SequenceNumbers;
57+
import org.elasticsearch.index.shard.ShardId;
5558
import org.elasticsearch.index.store.Store;
5659
import org.elasticsearch.indices.IndicesService;
5760
import org.elasticsearch.indices.analysis.AnalysisModule;
61+
import org.elasticsearch.indices.flush.SyncedFlushUtil;
5862
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
5963
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
6064
import org.elasticsearch.plugins.AnalysisPlugin;
@@ -84,14 +88,19 @@
8488
import java.util.Collection;
8589
import java.util.List;
8690
import java.util.Map;
91+
import java.util.Set;
8792
import java.util.concurrent.CountDownLatch;
8893
import java.util.concurrent.ExecutionException;
8994
import java.util.concurrent.Semaphore;
9095
import java.util.concurrent.atomic.AtomicBoolean;
9196
import java.util.concurrent.atomic.AtomicInteger;
9297
import java.util.function.Consumer;
98+
import java.util.stream.Collectors;
99+
import java.util.stream.IntStream;
100+
import java.util.stream.Stream;
93101

94102
import static java.util.Collections.singletonMap;
103+
import static java.util.stream.Collectors.toList;
95104
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
96105
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
97106
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@@ -910,6 +919,39 @@ public void testDoNotInfinitelyWaitForMapping() {
910919
assertHitCount(client().prepareSearch().get(), numDocs);
911920
}
912921

922+
public void testRecoveryFlushReplica() throws Exception {
923+
internalCluster().ensureAtLeastNumDataNodes(3);
924+
String indexName = "test-index";
925+
createIndex(indexName, Settings.builder().put("index.number_of_replicas", 0).put("index.number_of_shards", 1).build());
926+
int numDocs = randomIntBetween(0, 10);
927+
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, numDocs)
928+
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
929+
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
930+
.setSettings(Settings.builder().put("index.number_of_replicas", 1)));
931+
ensureGreen(indexName);
932+
ShardId shardId = null;
933+
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) {
934+
shardId = shardStats.getShardRouting().shardId();
935+
if (shardStats.getShardRouting().primary() == false) {
936+
assertThat(shardStats.getCommitStats().getNumDocs(), equalTo(numDocs));
937+
SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
938+
shardStats.getCommitStats().getUserData().entrySet());
939+
assertThat(commitInfo.localCheckpoint, equalTo(shardStats.getSeqNoStats().getLocalCheckpoint()));
940+
assertThat(commitInfo.maxSeqNo, equalTo(shardStats.getSeqNoStats().getMaxSeqNo()));
941+
}
942+
}
943+
SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
944+
assertBusy(() -> assertThat(client().admin().indices().prepareSyncedFlush(indexName).get().failedShards(), equalTo(0)));
945+
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
946+
.setSettings(Settings.builder().put("index.number_of_replicas", 2)));
947+
ensureGreen(indexName);
948+
// Recovery should keep syncId if no indexing activity on the primary after synced-flush.
949+
Set<String> syncIds = Stream.of(client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards())
950+
.map(shardStats -> shardStats.getCommitStats().syncId())
951+
.collect(Collectors.toSet());
952+
assertThat(syncIds, hasSize(1));
953+
}
954+
913955
public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin {
914956
final AtomicBoolean throwParsingError = new AtomicBoolean();
915957
@Override

0 commit comments

Comments
 (0)