Skip to content

Commit 26c32d7

Browse files
committed
Fix snapshot getting stuck in INIT state (#27214)
If the master disconnects from the cluster after initiating snapshot, but just before the snapshot switches from INIT to STARTED state, the snapshot can get indefinitely stuck in the INIT state. This error is specific to v5.x+ and was triggered by keeping the master node that stepped down in the node list, the cleanup logic in snapshot/restore assumed that if master steps down it is always removed from the the node list. This commit changes the logic to trigger cleanup even if no nodes left the cluster. Closes #27180
1 parent 153b93f commit 26c32d7

File tree

3 files changed

+165
-6
lines changed

3 files changed

+165
-6
lines changed

core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ public void move(String source, String target) throws IOException {
140140
Path targetPath = path.resolve(target);
141141
// If the target file exists then Files.move() behaviour is implementation specific
142142
// the existing file might be replaced or this method fails by throwing an IOException.
143-
assert !Files.exists(targetPath);
143+
if (Files.exists(targetPath)) {
144+
throw new FileAlreadyExistsException("blob [" + targetPath + "] already exists, cannot overwrite");
145+
}
144146
Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE);
145147
IOUtils.fsync(path, true);
146148
}

core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,15 @@ public void onFailure(String source, Exception e) {
425425
removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
426426
}
427427

428+
@Override
429+
public void onNoLongerMaster(String source) {
430+
// We are not longer a master - we shouldn't try to do any cleanup
431+
// The new master will take care of it
432+
logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId());
433+
userCreateSnapshotListener.onFailure(
434+
new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization"));
435+
}
436+
428437
@Override
429438
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
430439
// The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted
@@ -473,6 +482,10 @@ public void onFailure(Exception e) {
473482
cleanupAfterError(e);
474483
}
475484

485+
public void onNoLongerMaster(String source) {
486+
userCreateSnapshotListener.onFailure(e);
487+
}
488+
476489
private void cleanupAfterError(Exception exception) {
477490
if(snapshotCreated) {
478491
try {
@@ -628,7 +641,8 @@ private SnapshotShardFailure findShardFailure(List<SnapshotShardFailure> shardFa
628641
public void applyClusterState(ClusterChangedEvent event) {
629642
try {
630643
if (event.localNodeMaster()) {
631-
if (event.nodesRemoved()) {
644+
// We don't remove old master when master flips anymore. So, we need to check for change in master
645+
if (event.nodesRemoved() || event.previousState().nodes().isLocalNodeElectedMaster() == false) {
632646
processSnapshotsOnRemovedNodes(event);
633647
}
634648
if (event.routingTableChanged()) {
@@ -981,7 +995,7 @@ private void removeSnapshotFromClusterState(final Snapshot snapshot, final Snaps
981995
* @param listener listener to notify when snapshot information is removed from the cluster state
982996
*/
983997
private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, final Exception failure,
984-
@Nullable ActionListener<SnapshotInfo> listener) {
998+
@Nullable CleanupAfterErrorListener listener) {
985999
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
9861000

9871001
@Override
@@ -1013,6 +1027,13 @@ public void onFailure(String source, Exception e) {
10131027
}
10141028
}
10151029

1030+
@Override
1031+
public void onNoLongerMaster(String source) {
1032+
if (listener != null) {
1033+
listener.onNoLongerMaster(source);
1034+
}
1035+
}
1036+
10161037
@Override
10171038
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
10181039
for (SnapshotCompletionListener listener : snapshotCompletionListeners) {
@@ -1183,9 +1204,16 @@ public void onSnapshotCompletion(Snapshot completedSnapshot, SnapshotInfo snapsh
11831204
if (completedSnapshot.equals(snapshot)) {
11841205
logger.debug("deleted snapshot completed - deleting files");
11851206
removeListener(this);
1186-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() ->
1187-
deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(),
1188-
listener, true)
1207+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
1208+
try {
1209+
deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(),
1210+
listener, true);
1211+
1212+
} catch (Exception ex) {
1213+
logger.warn((Supplier<?>) () ->
1214+
new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex);
1215+
}
1216+
}
11891217
);
11901218
}
11911219
}

core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,19 @@
2323
import org.apache.logging.log4j.util.Supplier;
2424
import org.apache.lucene.index.CorruptIndexException;
2525
import org.elasticsearch.ElasticsearchException;
26+
import org.elasticsearch.action.ActionFuture;
2627
import org.elasticsearch.action.DocWriteResponse;
28+
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
29+
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
2730
import org.elasticsearch.action.get.GetResponse;
2831
import org.elasticsearch.action.index.IndexRequestBuilder;
2932
import org.elasticsearch.action.index.IndexResponse;
3033
import org.elasticsearch.client.Client;
34+
import org.elasticsearch.cluster.ClusterChangedEvent;
3135
import org.elasticsearch.cluster.ClusterState;
36+
import org.elasticsearch.cluster.ClusterStateListener;
3237
import org.elasticsearch.cluster.ClusterStateUpdateTask;
38+
import org.elasticsearch.cluster.SnapshotsInProgress;
3339
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3440
import org.elasticsearch.cluster.block.ClusterBlock;
3541
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -45,6 +51,7 @@
4551
import org.elasticsearch.common.Strings;
4652
import org.elasticsearch.common.collect.Tuple;
4753
import org.elasticsearch.common.settings.Settings;
54+
import org.elasticsearch.common.unit.ByteSizeUnit;
4855
import org.elasticsearch.common.unit.TimeValue;
4956
import org.elasticsearch.common.xcontent.XContentType;
5057
import org.elasticsearch.discovery.zen.ElectMasterService;
@@ -58,6 +65,9 @@
5865
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
5966
import org.elasticsearch.monitor.jvm.HotThreads;
6067
import org.elasticsearch.plugins.Plugin;
68+
import org.elasticsearch.snapshots.SnapshotInfo;
69+
import org.elasticsearch.snapshots.SnapshotMissingException;
70+
import org.elasticsearch.snapshots.SnapshotState;
6171
import org.elasticsearch.test.ESIntegTestCase;
6272
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
6373
import org.elasticsearch.test.ESIntegTestCase.Scope;
@@ -113,6 +123,7 @@
113123
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
114124
import static org.hamcrest.Matchers.equalTo;
115125
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
126+
import static org.hamcrest.Matchers.instanceOf;
116127
import static org.hamcrest.Matchers.is;
117128
import static org.hamcrest.Matchers.not;
118129
import static org.hamcrest.Matchers.nullValue;
@@ -1256,6 +1267,124 @@ public void testElectMasterWithLatestVersion() throws Exception {
12561267

12571268
}
12581269

1270+
public void testDisruptionOnSnapshotInitialization() throws Exception {
1271+
final Settings settings = Settings.builder()
1272+
.put(DEFAULT_SETTINGS)
1273+
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
1274+
.build();
1275+
final String idxName = "test";
1276+
configureCluster(settings, 4, null, 2);
1277+
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
1278+
final String dataNode = internalCluster().startDataOnlyNode();
1279+
ensureStableCluster(4);
1280+
1281+
createRandomIndex(idxName);
1282+
1283+
logger.info("--> creating repository");
1284+
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
1285+
.setType("fs").setSettings(Settings.builder()
1286+
.put("location", randomRepoPath())
1287+
.put("compress", randomBoolean())
1288+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
1289+
1290+
// Writing incompatible snapshot can cause this test to fail due to a race condition in repo initialization
1291+
// by the current master and the former master. It is not causing any issues in real life scenario, but
1292+
// might make this test to fail. We are going to complete initialization of the snapshot to prevent this failures.
1293+
logger.info("--> initializing the repository");
1294+
assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
1295+
.setWaitForCompletion(true).setIncludeGlobalState(true).setIndices().get().getSnapshotInfo().state());
1296+
1297+
final String masterNode1 = internalCluster().getMasterName();
1298+
Set<String> otherNodes = new HashSet<>();
1299+
otherNodes.addAll(allMasterEligibleNodes);
1300+
otherNodes.remove(masterNode1);
1301+
otherNodes.add(dataNode);
1302+
1303+
NetworkDisruption networkDisruption =
1304+
new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes),
1305+
new NetworkDisruption.NetworkUnresponsive());
1306+
internalCluster().setDisruptionScheme(networkDisruption);
1307+
1308+
ClusterService clusterService = internalCluster().clusterService(masterNode1);
1309+
CountDownLatch disruptionStarted = new CountDownLatch(1);
1310+
clusterService.addListener(new ClusterStateListener() {
1311+
@Override
1312+
public void clusterChanged(ClusterChangedEvent event) {
1313+
SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE);
1314+
if (snapshots != null && snapshots.entries().size() > 0) {
1315+
if (snapshots.entries().get(0).state() == SnapshotsInProgress.State.INIT) {
1316+
// The snapshot started, we can start disruption so the INIT state will arrive to another master node
1317+
logger.info("--> starting disruption");
1318+
networkDisruption.startDisrupting();
1319+
clusterService.removeListener(this);
1320+
disruptionStarted.countDown();
1321+
}
1322+
}
1323+
}
1324+
});
1325+
1326+
logger.info("--> starting snapshot");
1327+
ActionFuture<CreateSnapshotResponse> future = client(masterNode1).admin().cluster()
1328+
.prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(false).setIndices(idxName).execute();
1329+
1330+
logger.info("--> waiting for disruption to start");
1331+
assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));
1332+
1333+
logger.info("--> wait until the snapshot is done");
1334+
assertBusy(() -> {
1335+
SnapshotsInProgress snapshots = dataNodeClient().admin().cluster().prepareState().setLocal(true).get().getState()
1336+
.custom(SnapshotsInProgress.TYPE);
1337+
if (snapshots != null && snapshots.entries().size() > 0) {
1338+
logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state());
1339+
fail("Snapshot is still running");
1340+
} else {
1341+
logger.info("Snapshot is no longer in the cluster state");
1342+
}
1343+
}, 1, TimeUnit.MINUTES);
1344+
1345+
logger.info("--> verify that snapshot was successful or no longer exist");
1346+
assertBusy(() -> {
1347+
try {
1348+
GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots("test-repo")
1349+
.setSnapshots("test-snap-2").get();
1350+
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
1351+
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
1352+
assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());
1353+
assertEquals(0, snapshotInfo.failedShards());
1354+
logger.info("--> done verifying");
1355+
} catch (SnapshotMissingException exception) {
1356+
logger.info("--> snapshot doesn't exist");
1357+
}
1358+
}, 1, TimeUnit.MINUTES);
1359+
1360+
logger.info("--> stopping disrupting");
1361+
networkDisruption.stopDisrupting();
1362+
ensureStableCluster(4, masterNode1);
1363+
logger.info("--> done");
1364+
1365+
try {
1366+
future.get();
1367+
} catch (Exception ex) {
1368+
logger.info("--> got exception from hanged master", ex);
1369+
Throwable cause = ex.getCause();
1370+
assertThat(cause, instanceOf(MasterNotDiscoveredException.class));
1371+
cause = cause.getCause();
1372+
assertThat(cause, instanceOf(Discovery.FailedToCommitClusterStateException.class));
1373+
}
1374+
}
1375+
1376+
private void createRandomIndex(String idxName) throws ExecutionException, InterruptedException {
1377+
assertAcked(prepareCreate(idxName, 0, Settings.builder().put("number_of_shards", between(1, 20))
1378+
.put("number_of_replicas", 0)));
1379+
logger.info("--> indexing some data");
1380+
final int numdocs = randomIntBetween(10, 100);
1381+
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
1382+
for (int i = 0; i < builders.length; i++) {
1383+
builders[i] = client().prepareIndex(idxName, "type1", Integer.toString(i)).setSource("field1", "bar " + i);
1384+
}
1385+
indexRandom(true, builders);
1386+
}
1387+
12591388
protected NetworkDisruption addRandomDisruptionType(TwoPartitions partitions) {
12601389
final NetworkLinkDisruptionType disruptionType;
12611390
if (randomBoolean()) {

0 commit comments

Comments
 (0)