Skip to content

Commit 433a506

Browse files
SNAPSHOT: Improve Resilience SnapshotShardService (#36113)
* Resolve the index in the snapshotting thread * Added test for routing table - snapshot state mismatch
1 parent 9c1c46a commit 433a506

File tree

3 files changed

+137
-2
lines changed

3 files changed

+137
-2
lines changed

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,15 +323,15 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
323323

324324
for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().entrySet()) {
325325
final ShardId shardId = shardEntry.getKey();
326-
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
327326
final IndexId indexId = indicesMap.get(shardId.getIndexName());
328-
assert indexId != null;
329327
executor.execute(new AbstractRunnable() {
330328

331329
final SetOnce<Exception> failure = new SetOnce<>();
332330

333331
@Override
334332
public void doRun() {
333+
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
334+
assert indexId != null;
335335
snapshot(indexShard, snapshot, indexId, shardEntry.getValue());
336336
}
337337

server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@
7878
import org.elasticsearch.test.ESIntegTestCase.Scope;
7979
import org.elasticsearch.test.InternalTestCluster;
8080
import org.elasticsearch.test.TestCustomMetaData;
81+
import org.elasticsearch.test.disruption.BusyMasterServiceDisruption;
82+
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
8183
import org.elasticsearch.test.rest.FakeRestRequest;
8284

8385
import java.io.IOException;
@@ -1151,6 +1153,50 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
11511153
assertThat(anotherStats.getTotalSize(), is(snapshot1FileSize));
11521154
}
11531155

1156+
public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
1157+
logger.info("--> starting a master node and two data nodes");
1158+
internalCluster().startMasterOnlyNode();
1159+
internalCluster().startDataOnlyNodes(2);
1160+
logger.info("--> creating repository");
1161+
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
1162+
.setType("mock").setSettings(Settings.builder()
1163+
.put("location", randomRepoPath())
1164+
.put("compress", randomBoolean())
1165+
.put("max_snapshot_bytes_per_sec", "1000b")
1166+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
1167+
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
1168+
.put("number_of_shards", 5).put("number_of_replicas", 0)));
1169+
ensureGreen();
1170+
logger.info("--> indexing some data");
1171+
final int numdocs = randomIntBetween(50, 100);
1172+
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
1173+
for (int i = 0; i < builders.length; i++) {
1174+
builders[i] = client().prepareIndex("test-idx", "type1",
1175+
Integer.toString(i)).setSource("field1", "bar " + i);
1176+
}
1177+
indexRandom(true, builders);
1178+
flushAndRefresh();
1179+
final String dataNode = blockNodeWithIndex("test-repo", "test-idx");
1180+
logger.info("--> snapshot");
1181+
client(internalCluster().getMasterName()).admin().cluster()
1182+
.prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
1183+
ServiceDisruptionScheme disruption = new BusyMasterServiceDisruption(random(), Priority.HIGH);
1184+
setDisruptionScheme(disruption);
1185+
disruption.startDisrupting();
1186+
logger.info("--> restarting data node, which should cause primary shards to be failed");
1187+
internalCluster().restartNode(dataNode, InternalTestCluster.EMPTY_CALLBACK);
1188+
unblockNode("test-repo", dataNode);
1189+
disruption.stopDisrupting();
1190+
// check that snapshot completes
1191+
assertBusy(() -> {
1192+
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
1193+
.prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get();
1194+
assertEquals(1, snapshotsStatusResponse.getSnapshots().size());
1195+
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
1196+
assertTrue(snapshotInfo.state().toString(), snapshotInfo.state().completed());
1197+
}, 30, TimeUnit.SECONDS);
1198+
}
1199+
11541200
private long calculateTotalFilesSize(List<Path> files) {
11551201
return files.stream().mapToLong(f -> {
11561202
try {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.test.disruption;
20+
21+
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
23+
import org.elasticsearch.cluster.service.ClusterService;
24+
import org.elasticsearch.common.Priority;
25+
import org.elasticsearch.common.unit.TimeValue;
26+
import org.elasticsearch.test.InternalTestCluster;
27+
import java.util.Random;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
30+
public class BusyMasterServiceDisruption extends SingleNodeDisruption {
31+
private final AtomicBoolean active = new AtomicBoolean();
32+
private final Priority priority;
33+
34+
public BusyMasterServiceDisruption(Random random, Priority priority) {
35+
super(random);
36+
this.priority = priority;
37+
}
38+
39+
@Override
40+
public void startDisrupting() {
41+
disruptedNode = cluster.getMasterName();
42+
final String disruptionNodeCopy = disruptedNode;
43+
if (disruptionNodeCopy == null) {
44+
return;
45+
}
46+
ClusterService clusterService = cluster.getInstance(ClusterService.class, disruptionNodeCopy);
47+
if (clusterService == null) {
48+
return;
49+
}
50+
logger.info("making master service busy on node [{}] at priority [{}]", disruptionNodeCopy, priority);
51+
active.set(true);
52+
submitTask(clusterService);
53+
}
54+
55+
private void submitTask(ClusterService clusterService) {
56+
clusterService.getMasterService().submitStateUpdateTask(
57+
"service_disruption_block",
58+
new ClusterStateUpdateTask(priority) {
59+
@Override
60+
public ClusterState execute(ClusterState currentState) {
61+
if (active.get()) {
62+
submitTask(clusterService);
63+
}
64+
return currentState;
65+
}
66+
67+
@Override
68+
public void onFailure(String source, Exception e) {
69+
logger.error("unexpected error during disruption", e);
70+
}
71+
}
72+
);
73+
}
74+
75+
@Override
76+
public void stopDisrupting() {
77+
active.set(false);
78+
}
79+
80+
@Override
81+
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
82+
removeFromCluster(cluster);
83+
}
84+
85+
@Override
86+
public TimeValue expectedTimeToHeal() {
87+
return TimeValue.timeValueMinutes(0);
88+
}
89+
}

0 commit comments

Comments
 (0)