diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index b595551f76187..aefe00d3dfdca 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -121,22 +121,24 @@ protected void masterOperation(final SnapshotsStatusRequest request, new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()])) .snapshots(snapshots).timeout(request.masterNodeTimeout()); transportNodesSnapshotsStatus.execute(nodesRequest, new ActionListener() { - @Override - public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) { - try { - List currentSnapshots = - snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())); - listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses)); - } catch (Exception e) { - listener.onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { + @Override + public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) { + threadPool.generic().execute(() -> { + try { + List currentSnapshots = + snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())); + listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses)); + } catch (Exception e) { listener.onFailure(e); } }); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } else { // We don't have any in-progress shards, just return current stats listener.onResponse(buildResponse(request, currentSnapshots, null)); diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 73be2ea006656..ba88f5da2d949 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -25,6 +25,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -84,7 +85,7 @@ public String toString() { return builder.append("]").toString(); } - public static class Entry { + public static class Entry implements ToXContent { private final State state; private final Snapshot snapshot; private final boolean includeGlobalState; @@ -211,7 +212,50 @@ public int hashCode() { @Override public String toString() { - return snapshot.toString(); + return Strings.toString(this); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(REPOSITORY, snapshot.getRepository()); + builder.field(SNAPSHOT, snapshot.getSnapshotId().getName()); + builder.field(UUID, snapshot.getSnapshotId().getUUID()); + builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState()); + builder.field(PARTIAL, partial); + builder.field(STATE, state); + builder.startArray(INDICES); + { + for (IndexId index : indices) { + index.toXContent(builder, params); + } + } + builder.endArray(); + builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(startTime)); + builder.field(REPOSITORY_STATE_ID, repositoryStateId); + builder.startArray(SHARDS); + { + for (ObjectObjectCursor shardEntry : shards) { + ShardId shardId = shardEntry.key; + ShardSnapshotStatus status = shardEntry.value; + builder.startObject(); + { + builder.field(INDEX, shardId.getIndex()); + builder.field(SHARD, shardId.getId()); + builder.field(STATE, status.state()); + builder.field(NODE, status.nodeId()); + } + builder.endObject(); + } + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public boolean isFragment() { + return false; } // package private for testing @@ -527,7 +571,7 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startArray(SNAPSHOTS); for (Entry entry : entries) { - toXContent(entry, builder, params); + entry.toXContent(builder, params); } builder.endArray(); return builder; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 998ab2a38639b..950adde6c7fa9 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -765,18 +765,20 @@ public ClusterState execute(ClusterState currentState) { ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); boolean snapshotChanged = false; for (ObjectObjectCursor shardEntry : snapshot.shards()) { - ShardSnapshotStatus shardStatus = shardEntry.value; + final ShardSnapshotStatus shardStatus = shardEntry.value; + final ShardId shardId = shardEntry.key; if (!shardStatus.state().completed() && shardStatus.nodeId() != null) { if (nodes.nodeExists(shardStatus.nodeId())) { - shards.put(shardEntry.key, shardEntry.value); + shards.put(shardId, shardStatus); } else { // TODO: Restart snapshot on another node? snapshotChanged = true; logger.warn("failing snapshot of shard [{}] on closed node [{}]", - shardEntry.key, shardStatus.nodeId()); - shards.put(shardEntry.key, - new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); + shardId, shardStatus.nodeId()); + shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); } + } else { + shards.put(shardId, shardStatus); } } if (snapshotChanged) { @@ -808,6 +810,8 @@ public void onFailure(Exception e) { } }, updatedSnapshot.getRepositoryStateId(), false); } + assert updatedSnapshot.shards().size() == snapshot.shards().size() + : "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]"; } if (changed) { return ClusterState.builder(currentState) diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index e48874a3a9620..c445848d5b710 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -1229,6 +1229,55 @@ public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception { }, 60L, TimeUnit.SECONDS); } + public void testDataNodeRestartAfterShardSnapshotFailure() throws Exception { + logger.info("--> starting a master node and two data nodes"); + internalCluster().startMasterOnlyNode(); + final List dataNodes = internalCluster().startDataOnlyNodes(2); + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + assertAcked(prepareCreate("test-idx", 0, Settings.builder() + .put("number_of_shards", 2).put("number_of_replicas", 0))); + ensureGreen(); + logger.info("--> indexing some data"); + final int numdocs = randomIntBetween(50, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test-idx", "type1", + Integer.toString(i)).setSource("field1", "bar " + i); + } + indexRandom(true, builders); + flushAndRefresh(); + blockAllDataNodes("test-repo"); + logger.info("--> snapshot"); + client(internalCluster().getMasterName()).admin().cluster() + .prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); + logger.info("--> restarting first data node, which should cause the primary shard on it to be failed"); + internalCluster().restartNode(dataNodes.get(0), InternalTestCluster.EMPTY_CALLBACK); + + logger.info("--> wait for shard snapshot of first primary to show as failed"); + assertBusy(() -> assertThat( + client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots() + .get(0).getShardsStats().getFailedShards(), is(1)), 60L, TimeUnit.SECONDS); + + logger.info("--> restarting second data node, which should cause the primary shard on it to be failed"); + internalCluster().restartNode(dataNodes.get(1), InternalTestCluster.EMPTY_CALLBACK); + + // check that snapshot completes with both failed shards being accounted for in the snapshot result + assertBusy(() -> { + GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster() + .prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get(); + assertEquals(1, snapshotsStatusResponse.getSnapshots().size()); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + assertTrue(snapshotInfo.state().toString(), snapshotInfo.state().completed()); + assertThat(snapshotInfo.totalShards(), is(2)); + assertThat(snapshotInfo.shardFailures(), hasSize(2)); + }, 60L, TimeUnit.SECONDS); + } + private long calculateTotalFilesSize(List files) { return files.stream().mapToLong(f -> { try {