Skip to content

Commit 06a3785

Browse files
committed
Rewrite reuse peer recovery test
1 parent 2e67a0b commit 06a3785

File tree

1 file changed

+65
-76
lines changed

1 file changed

+65
-76
lines changed

core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java

Lines changed: 65 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
2424
import org.elasticsearch.action.admin.indices.stats.IndexStats;
2525
import org.elasticsearch.action.admin.indices.stats.ShardStats;
26+
import org.elasticsearch.client.Client;
2627
import org.elasticsearch.cluster.ClusterState;
2728
import org.elasticsearch.cluster.metadata.IndexMetaData;
2829
import org.elasticsearch.cluster.node.DiscoveryNode;
2930
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
3031
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
3132
import org.elasticsearch.cluster.service.ClusterService;
3233
import org.elasticsearch.common.settings.Settings;
34+
import org.elasticsearch.common.util.concurrent.CountDown;
3335
import org.elasticsearch.common.xcontent.XContentFactory;
3436
import org.elasticsearch.env.Environment;
3537
import org.elasticsearch.env.NodeEnvironment;
@@ -45,17 +47,23 @@
4547
import org.elasticsearch.test.ESIntegTestCase.Scope;
4648
import org.elasticsearch.test.InternalTestCluster;
4749
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
50+
import org.elasticsearch.test.junit.annotations.TestLogging;
4851
import org.elasticsearch.test.store.MockFSDirectoryService;
4952
import org.elasticsearch.test.store.MockFSIndexStore;
5053

5154
import java.nio.file.DirectoryStream;
5255
import java.nio.file.Files;
5356
import java.nio.file.Path;
57+
import java.util.ArrayList;
5458
import java.util.Arrays;
5559
import java.util.Collection;
5660
import java.util.Collections;
5761
import java.util.HashMap;
62+
import java.util.HashSet;
63+
import java.util.List;
5864
import java.util.Map;
65+
import java.util.Set;
66+
import java.util.concurrent.CountDownLatch;
5967
import java.util.stream.IntStream;
6068

6169
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@@ -384,105 +392,86 @@ public void testLatestVersionLoaded() throws Exception {
384392
assertThat(state.metaData().index("test").getAliases().get("test_alias").filter(), notNullValue());
385393
}
386394

395+
@TestLogging("org.elasticsearch.indices.recovery:TRACE")
387396
public void testReusePeerRecovery() throws Exception {
388-
final Settings settings = Settings.builder()
389-
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
390-
.put("gateway.recover_after_nodes", 4)
391-
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 4)
392-
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4)
393-
.put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build();
394-
395-
internalCluster().startNodes(4, settings);
396-
// prevent any rebalance actions during the peer recovery
397-
// if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
398-
// we reuse the files on disk after full restarts for replicas.
399-
assertAcked(prepareCreate("test").setSettings(Settings.builder()
400-
.put(indexSettings())
401-
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
402-
ensureGreen();
397+
internalCluster().startMasterOnlyNode();
398+
final String primaryNode = internalCluster().startDataOnlyNode(nodeSettings(0));
399+
400+
// create the index with our mapping
401+
client(primaryNode)
402+
.admin()
403+
.indices()
404+
.prepareCreate("test")
405+
.setSettings(Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 1))
406+
.get();
407+
403408
logger.info("--> indexing docs");
404-
for (int i = 0; i < 1000; i++) {
405-
client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
406-
if ((i % 200) == 0) {
407-
client().admin().indices().prepareFlush().execute().actionGet();
408-
}
409+
for (int i = 0; i < randomIntBetween(1, 1024); i++) {
410+
client(primaryNode).prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
409411
}
410-
if (randomBoolean()) {
411-
client().admin().indices().prepareFlush().execute().actionGet();
412-
}
413-
logger.info("Running Cluster Health");
412+
413+
client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
414+
415+
// start the replica node; we do this after indexing so a file-based recovery is triggered to ensure the files are identical
416+
final String replicaNode = internalCluster().startDataOnlyNode(nodeSettings(1));
414417
ensureGreen();
415-
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get(); // just wait for merges
416-
client().admin().indices().prepareFlush().setForce(true).get();
417-
418-
boolean useSyncIds = randomBoolean();
419-
if (useSyncIds == false) {
420-
logger.info("--> disabling allocation while the cluster is shut down");
421-
422-
// Disable allocations while we are closing nodes
423-
client().admin().cluster().prepareUpdateSettings()
424-
.setTransientSettings(Settings.builder()
425-
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
426-
.get();
427-
logger.info("--> full cluster restart");
428-
internalCluster().fullRestart();
429-
430-
logger.info("--> waiting for cluster to return to green after first shutdown");
431-
ensureGreen();
432-
} else {
433-
logger.info("--> trying to sync flush");
434-
assertEquals(client().admin().indices().prepareSyncedFlush("test").get().failedShards(), 0);
435-
assertSyncIdsNotNull();
418+
419+
final RecoveryResponse initialRecoveryReponse = client().admin().indices().prepareRecoveries("test").get();
420+
final Set<String> files = new HashSet<>();
421+
for (final RecoveryState recoveryState : initialRecoveryReponse.shardRecoveryStates().get("test")) {
422+
if (recoveryState.getTargetNode().getName().equals(replicaNode)) {
423+
for (final RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
424+
files.add(file.name());
425+
}
426+
break;
427+
}
436428
}
437429

438-
logger.info("--> disabling allocation while the cluster is shut down{}", useSyncIds ? "" : " a second time");
439-
// Disable allocations while we are closing nodes
440-
client().admin().cluster().prepareUpdateSettings()
441-
.setTransientSettings(Settings.builder()
442-
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
443-
.get();
430+
logger.info("--> restart replica node");
444431

445-
Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
432+
internalCluster().restartNode(replicaNode, new RestartCallback() {
433+
@Override
434+
public Settings onNodeStopped(String nodeName) throws Exception {
435+
// index some more documents; we expect to reuse the files that already exist on the replica
436+
for (int i = 0; i < randomIntBetween(1, 1024); i++) {
437+
client(primaryNode).prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
438+
}
446439

447-
logger.info("--> full cluster restart");
448-
internalCluster().fullRestart();
440+
// prevent a sequence-number-based recovery from being possible
441+
client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
442+
return super.onNodeStopped(nodeName);
443+
}
444+
});
449445

450-
logger.info("--> waiting for cluster to return to green after {}shutdown", useSyncIds ? "" : "second ");
451446
ensureGreen();
452-
primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
453447

454-
if (useSyncIds) {
455-
assertSyncIdsNotNull();
456-
}
457-
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
458-
for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
448+
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
449+
for (final RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
459450
long recovered = 0;
460-
for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
461-
if (file.name().startsWith("segments")) {
451+
int filesRecovered = 0;
452+
for (final RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
453+
if (files.contains(file.name()) == false) {
462454
recovered += file.length();
455+
filesRecovered++;
463456
}
464457
}
465-
if (!recoveryState.getPrimary() && (useSyncIds == false)) {
458+
if (recoveryState.getPrimary()) {
459+
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
460+
assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
461+
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0));
462+
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
463+
} else {
466464
logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
467465
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
468466
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
469467
assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered));
470468
assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0L));
471469
// we have to recover the segments file since we commit the translog ID on engine startup
472470
assertThat("all bytes should be reused except of the segments file", recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes() - recovered));
473-
assertThat("no files should be recovered except of the segments file", recoveryState.getIndex().recoveredFileCount(), equalTo(1));
474-
assertThat("all files should be reused except of the segments file", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - 1));
471+
assertThat("no files should be recovered except of the segments file", recoveryState.getIndex().recoveredFileCount(), equalTo(filesRecovered));
472+
assertThat("all files should be reused except of the segments file", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered));
475473
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
476-
} else {
477-
if (useSyncIds && !recoveryState.getPrimary()) {
478-
logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}",
479-
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
480-
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
481-
}
482-
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
483-
assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
484-
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0));
485-
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
474+
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0));
486475
}
487476
}
488477
}

0 commit comments

Comments
 (0)