From f513d0f533b150f3922aaca29cef78a976d1aabc Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Feb 2019 13:49:43 +0100 Subject: [PATCH 01/14] Fix concurrent ending step 1 --- .../snapshots/SnapshotsService.java | 31 ++++++++++++++++--- .../SharedClusterSnapshotRestoreIT.java | 11 ++----- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index df3d0f16f9188..fa39aecdeb160 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -121,6 +121,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final Map>> snapshotCompletionListeners = new ConcurrentHashMap<>(); + // Set of snapshots that are currently being + private final Set endingSnapshots = new HashSet<>(); + @Inject public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, RepositoriesService repositoriesService, ThreadPool threadPool) { @@ -382,7 +385,15 @@ protected void doRun() { metaData = builder.build(); } - repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData); + synchronized (endingSnapshots) { + final SnapshotId snapshotId = snapshot.snapshot().getSnapshotId(); + if (endingSnapshots.contains(snapshotId)) { + // Snapshot was already aborted concurrently, we're done here. + userCreateSnapshotListener.onResponse(snapshot.snapshot()); + return; + } + repository.initializeSnapshot(snapshotId, snapshot.indices(), metaData); + } snapshotCreated = true; logger.info("snapshot [{}] started", snapshot.snapshot()); @@ -408,7 +419,7 @@ public ClusterState execute(ClusterState currentState) { } if (entry.state() != State.ABORTED) { - // Replace the snapshot that was just intialized + // Replace the snapshot that was just initialized ImmutableOpenMap shards = shards(currentState, entry.indices()); if (!partial) { @@ -443,8 +454,6 @@ public ClusterState execute(ClusterState currentState) { } } else { assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization"; - failure = "snapshot was aborted during initialization"; - endSnapshot = entry; entries.add(endSnapshot); } } @@ -995,6 +1004,11 @@ void endSnapshot(final SnapshotsInProgress.Entry entry) { * @param failure failure reason or null if snapshot was successful */ private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) { + synchronized (endingSnapshots) { + if (endingSnapshots.add(entry.snapshot().getSnapshotId()) == false) { + return; + } + } threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @Override protected void doRun() { @@ -1075,6 +1089,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] failed to remove snapshot metadata", snapshot), e); + endedSnapshot(snapshot.getSnapshotId()); if (listener != null) { listener.onFailure(e); } @@ -1082,6 +1097,7 @@ public void onFailure(String source, Exception e) { @Override public void onNoLongerMaster(String source) { + endedSnapshot(snapshot.getSnapshotId()); if (listener != null) { listener.onNoLongerMaster(); } @@ -1101,6 +1117,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS logger.warn("Failed to notify listeners", e); } } + endedSnapshot(snapshot.getSnapshotId()); if (listener != null) { listener.onResponse(snapshotInfo); } @@ -1108,6 +1125,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } + private void endedSnapshot(SnapshotId snapshotId) { + synchronized (endingSnapshots) { + endingSnapshots.remove(snapshotId); + } + } + /** * Deletes a snapshot from the repository, looking up the {@link Snapshot} reference before deleting. * If the snapshot is still running cancels the snapshot first and then deletes it from the repository. diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 78892516c4ade..5ca7cbc6aef50 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -3683,14 +3683,9 @@ public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { // The deletion must set the snapshot in the ABORTED state assertBusy(() -> { - try { - SnapshotsStatusResponse status = - client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get(); - assertThat(status.getSnapshots().iterator().next().getState(), equalTo(State.ABORTED)); - } catch (Exception e) { - // Force assertBusy to retry on every exception - throw new AssertionError(e); - } + SnapshotsStatusResponse status = + client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get(); + assertThat(status.getSnapshots().iterator().next().getState(), equalTo(State.ABORTED)); }); // Now unblock the repository From c3f0cf9b2dd972109d58aa66a1aec1cf41951fd4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Feb 2019 14:58:14 +0100 Subject: [PATCH 02/14] reenable test --- .../elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index b1570f5806689..5ca7cbc6aef50 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -3637,7 +3637,6 @@ public void testParallelRestoreOperationsFromSingleSnapshot() throws Exception { } @TestLogging("org.elasticsearch.snapshots:TRACE") - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/38226") public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { final Client client = client(); From dcdb6f029d8bd582f949ce4ef721f204c247a7af Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Feb 2019 15:12:46 +0100 Subject: [PATCH 03/14] nicer --- .../snapshots/SnapshotsService.java | 25 ++----------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index fa39aecdeb160..5aac089e2ab40 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -400,14 +400,10 @@ protected void doRun() { if (snapshot.indices().isEmpty()) { // No indices in this snapshot - we are done userCreateSnapshotListener.onResponse(snapshot.snapshot()); - endSnapshot(snapshot); return; } clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { - SnapshotsInProgress.Entry endSnapshot; - String failure; - @Override public ClusterState execute(ClusterState currentState) { SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); @@ -428,8 +424,7 @@ public ClusterState execute(ClusterState currentState) { Set missing = indicesWithMissingShards.v1(); Set closed = indicesWithMissingShards.v2(); if (missing.isEmpty() == false || closed.isEmpty() == false) { - endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards); - entries.add(endSnapshot); + entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, shards)); final StringBuilder failureMessage = new StringBuilder(); if (missing.isEmpty() == false) { @@ -443,18 +438,14 @@ public ClusterState execute(ClusterState currentState) { failureMessage.append("Indices are closed "); failureMessage.append(closed); } - failure = failureMessage.toString(); continue; } } SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards); entries.add(updatedSnapshot); - if (completed(shards.values())) { - endSnapshot = updatedSnapshot; - } } else { assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization"; - entries.add(endSnapshot); + entries.add(entry); } } return ClusterState.builder(currentState) @@ -486,14 +477,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS // completion listener in this method. For the snapshot completion to work properly, the snapshot // should still exist when listener is registered. userCreateSnapshotListener.onResponse(snapshot.snapshot()); - - // Now that snapshot completion listener is registered we can end the snapshot if needed - // We should end snapshot only if 1) we didn't accept it for processing (which happens when there - // is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should - // go ahead and continue working on this snapshot rather then end here. - if (endSnapshot != null) { - endSnapshot(endSnapshot, failure); - } } }); } @@ -789,7 +772,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { ImmutableOpenMap shardsMap = shards.build(); if (!snapshot.state().completed() && completed(shardsMap.values())) { updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap); - endSnapshot(updatedSnapshot); } else { updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap); } @@ -849,7 +831,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { changed = true; if (!snapshot.state().completed() && completed(shards.values())) { updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards); - endSnapshot(updatedSnapshot); } else { updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards); } @@ -1236,7 +1217,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { assert shards.isEmpty(); // No shards in this snapshot, we delete it right away since the SnapshotShardsService // has no work to do. - endSnapshot(snapshotEntry); } else if (state == State.STARTED) { // snapshot is started - mark every non completed shard as aborted final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); @@ -1269,7 +1249,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { // where we force to finish the snapshot logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); shards = snapshotEntry.shards(); - endSnapshot(snapshotEntry); } } SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards); From cb6bf42b766440a0ca7ea520ec993b1c00cd24f8 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Feb 2019 17:16:00 +0100 Subject: [PATCH 04/14] worsk --- .../snapshots/SnapshotShardsService.java | 2 - .../snapshots/SnapshotsService.java | 385 ++++++++---------- 2 files changed, 173 insertions(+), 214 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 93b078977357a..fbb0a876e8f29 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -591,8 +591,6 @@ private class SnapshotStateExecutor implements ClusterStateTaskExecutor>> snapshotCompletionListeners = new ConcurrentHashMap<>(); // Set of snapshots that are currently being - private final Set endingSnapshots = new HashSet<>(); + private final Set endingSnapshots = Collections.newSetFromMap(new ConcurrentHashMap<>()); @Inject public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, @@ -210,7 +212,7 @@ public List snapshots(final String repositoryName, } final ArrayList snapshotList = new ArrayList<>(snapshotSet); CollectionUtil.timSort(snapshotList); - return Collections.unmodifiableList(snapshotList); + return unmodifiableList(snapshotList); } /** @@ -226,7 +228,7 @@ public List currentSnapshots(final String repositoryName) { snapshotList.add(inProgressSnapshot(entry)); } CollectionUtil.timSort(snapshotList); - return Collections.unmodifiableList(snapshotList); + return unmodifiableList(snapshotList); } /** @@ -385,15 +387,13 @@ protected void doRun() { metaData = builder.build(); } - synchronized (endingSnapshots) { - final SnapshotId snapshotId = snapshot.snapshot().getSnapshotId(); - if (endingSnapshots.contains(snapshotId)) { - // Snapshot was already aborted concurrently, we're done here. - userCreateSnapshotListener.onResponse(snapshot.snapshot()); - return; - } - repository.initializeSnapshot(snapshotId, snapshot.indices(), metaData); + if (endingSnapshots.contains(snapshot.snapshot())) { + // Snapshot was already aborted concurrently, we're done here. + userCreateSnapshotListener.onResponse(snapshot.snapshot()); + return; } + + repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData); snapshotCreated = true; logger.info("snapshot [{}] started", snapshot.snapshot()); @@ -414,10 +414,12 @@ public ClusterState execute(ClusterState currentState) { continue; } - if (entry.state() != State.ABORTED) { + if (entry.state() == State.ABORTED) { + entries.add(entry); + } else { // Replace the snapshot that was just initialized - ImmutableOpenMap shards = - shards(currentState, entry.indices()); + ImmutableOpenMap shards = + shards(currentState, entry.indices()); if (!partial) { Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData()); @@ -441,15 +443,11 @@ public ClusterState execute(ClusterState currentState) { continue; } } - SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards); - entries.add(updatedSnapshot); - } else { - assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization"; - entries.add(entry); + entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards)); } } return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))) + .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))) .build(); } @@ -544,7 +542,7 @@ private void cleanupAfterError(Exception exception) { } - private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { + private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { return new SnapshotInfo(entry.snapshot().getSnapshotId(), entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), entry.startTime(), entry.includeGlobalState()); @@ -602,7 +600,7 @@ public List currentSnapshots(final String repository, builder.add(entry); } } - return Collections.unmodifiableList(builder); + return unmodifiableList(builder); } /** @@ -658,7 +656,7 @@ public Map snapshotShards(final String reposi return unmodifiableMap(shardStatus); } - private SnapshotShardFailure findShardFailure(List shardFailures, ShardId shardId) { + private static SnapshotShardFailure findShardFailure(List shardFailures, ShardId shardId) { for (SnapshotShardFailure shardFailure : shardFailures) { if (shardId.getIndexName().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) { return shardFailure; @@ -672,14 +670,24 @@ public void applyClusterState(ClusterChangedEvent event) { try { if (event.localNodeMaster()) { // We don't remove old master when master flips anymore. So, we need to check for change in master - if (event.nodesRemoved() || event.previousState().nodes().isLocalNodeElectedMaster() == false) { - processSnapshotsOnRemovedNodes(event); + final boolean newMaster = event.previousState().nodes().isLocalNodeElectedMaster() == false; + final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress != null) { + if (removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes(), newMaster)) { + processSnapshotsOnRemovedNodes(newMaster); + } + if (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) { + processStartedShards(); + } + // Removes finished snapshots from the cluster state, that the previous master failed to end properly before dying. + snapshotsInProgress.entries().stream() + .filter( + entry -> entry.state().completed() || (newMaster || entry.state() != State.INIT) && entry.shards().isEmpty()) + .forEach(this::endSnapshot); } - if (event.routingTableChanged()) { - processStartedShards(event); + if (newMaster) { + finalizeSnapshotDeletionFromPreviousMaster(event); } - removeFinishedSnapshotFromClusterState(event); - finalizeSnapshotDeletionFromPreviousMaster(event); } } catch (Exception e) { logger.warn("Failed to update snapshot state ", e); @@ -698,164 +706,135 @@ public void applyClusterState(ClusterChangedEvent event) { * snapshot was deleted and a call to GET snapshots would reveal that the snapshot no longer exists. */ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterChangedEvent event) { - if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) { - SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; - SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); - deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId()); - } + SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; + SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); + deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId()); } } /** - * Removes a finished snapshot from the cluster state. This can happen if the previous - * master node processed a cluster state update that marked the snapshot as finished, - * but the previous master node died before removing the snapshot in progress from the - * cluster state. It is then the responsibility of the new master node to end the - * snapshot and remove it from the cluster state. + * Cleans up shard snapshots that were running on removed nodes + * @param newMaster true if this master was not master in the previous state */ - private void removeFinishedSnapshotFromClusterState(ClusterChangedEvent event) { - if (event.localNodeMaster() && !event.previousState().nodes().isLocalNodeElectedMaster()) { - SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - if (snapshotsInProgress != null && !snapshotsInProgress.entries().isEmpty()) { - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { - if (entry.state().completed()) { - endSnapshot(entry); + private void processSnapshotsOnRemovedNodes(boolean newMaster) { + clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + DiscoveryNodes nodes = currentState.nodes(); + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots == null) { + return currentState; + } + boolean changed = false; + ArrayList entries = new ArrayList<>(); + for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { + SnapshotsInProgress.Entry updatedSnapshot = snapshot; + if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + boolean snapshotChanged = false; + for (ObjectObjectCursor shardEntry : snapshot.shards()) { + ShardSnapshotStatus shardStatus = shardEntry.value; + if (!shardStatus.state().completed() && shardStatus.nodeId() != null) { + if (nodes.nodeExists(shardStatus.nodeId())) { + shards.put(shardEntry.key, shardEntry.value); + } else { + // TODO: Restart snapshot on another node? + snapshotChanged = true; + logger.warn("failing snapshot of shard [{}] on closed node [{}]", + shardEntry.key, shardStatus.nodeId()); + shards.put(shardEntry.key, + new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); + } + } + } + if (snapshotChanged) { + changed = true; + ImmutableOpenMap shardsMap = shards.build(); + if (!snapshot.state().completed() && completed(shardsMap.values())) { + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap); + } else { + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap); + } + } + entries.add(updatedSnapshot); + } else if (snapshot.state() == State.INIT && newMaster) { + changed = true; + // Mark the snapshot as aborted as it failed to start from the previous master + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); + entries.add(updatedSnapshot); + + // Clean up the snapshot that failed to start from the old master + deleteSnapshot(snapshot.snapshot(), new ActionListener() { + @Override + public void onResponse(Void aVoid) { + logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot()); + } + + @Override + public void onFailure(Exception e) { + logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); + } + }, updatedSnapshot.getRepositoryStateId(), false); } } + if (changed) { + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); + } + return currentState; } - } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("failed to update snapshot state after node removal"); + } + }); } - /** - * Cleans up shard snapshots that were running on removed nodes - * - * @param event cluster changed event - */ - private void processSnapshotsOnRemovedNodes(ClusterChangedEvent event) { - if (removedNodesCleanupNeeded(event)) { - // Check if we just became the master - final boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster(); - clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - DiscoveryNodes nodes = currentState.nodes(); - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots == null) { - return currentState; - } + private void processStartedShards() { + clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + RoutingTable routingTable = currentState.routingTable(); + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null) { boolean changed = false; ArrayList entries = new ArrayList<>(); for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { SnapshotsInProgress.Entry updatedSnapshot = snapshot; - boolean snapshotChanged = false; - if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - for (ObjectObjectCursor shardEntry : snapshot.shards()) { - ShardSnapshotStatus shardStatus = shardEntry.value; - if (!shardStatus.state().completed() && shardStatus.nodeId() != null) { - if (nodes.nodeExists(shardStatus.nodeId())) { - shards.put(shardEntry.key, shardEntry.value); - } else { - // TODO: Restart snapshot on another node? - snapshotChanged = true; - logger.warn("failing snapshot of shard [{}] on closed node [{}]", - shardEntry.key, shardStatus.nodeId()); - shards.put(shardEntry.key, - new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); - } - } - } - if (snapshotChanged) { + if (snapshot.state() == State.STARTED) { + ImmutableOpenMap shards = processWaitingShards(snapshot.shards(), + routingTable); + if (shards != null) { changed = true; - ImmutableOpenMap shardsMap = shards.build(); - if (!snapshot.state().completed() && completed(shardsMap.values())) { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap); + if (!snapshot.state().completed() && completed(shards.values())) { + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards); } else { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap); + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards); } } entries.add(updatedSnapshot); - } else if (snapshot.state() == State.INIT && newMaster) { - changed = true; - // Mark the snapshot as aborted as it failed to start from the previous master - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); - entries.add(updatedSnapshot); - - // Clean up the snapshot that failed to start from the old master - deleteSnapshot(snapshot.snapshot(), new ActionListener() { - @Override - public void onResponse(Void aVoid) { - logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot()); - } - - @Override - public void onFailure(Exception e) { - logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); - } - }, updatedSnapshot.getRepositoryStateId(), false); } } if (changed) { - snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); } - return currentState; - } - - @Override - public void onFailure(String source, Exception e) { - logger.warn("failed to update snapshot state after node removal"); - } - }); - } - } - - private void processStartedShards(ClusterChangedEvent event) { - if (waitingShardsStartedOrUnassigned(event)) { - clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - RoutingTable routingTable = currentState.routingTable(); - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots != null) { - boolean changed = false; - ArrayList entries = new ArrayList<>(); - for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { - SnapshotsInProgress.Entry updatedSnapshot = snapshot; - if (snapshot.state() == State.STARTED) { - ImmutableOpenMap shards = processWaitingShards(snapshot.shards(), - routingTable); - if (shards != null) { - changed = true; - if (!snapshot.state().completed() && completed(shards.values())) { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards); - } else { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards); - } - } - entries.add(updatedSnapshot); - } - } - if (changed) { - snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); - } - } - return currentState; } + return currentState; + } - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> - new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); - } - }); - } + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> + new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); + } + }); } - private ImmutableOpenMap processWaitingShards( + private static ImmutableOpenMap processWaitingShards( ImmutableOpenMap snapshotShards, RoutingTable routingTable) { boolean snapshotChanged = false; ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); @@ -895,19 +874,16 @@ private ImmutableOpenMap processWaitingShards( } } - private boolean waitingShardsStartedOrUnassigned(ClusterChangedEvent event) { - SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); - if (curr != null) { - for (SnapshotsInProgress.Entry entry : curr.entries()) { - if (entry.state() == State.STARTED && !entry.waitingIndices().isEmpty()) { - for (ObjectCursor index : entry.waitingIndices().keys()) { - if (event.indexRoutingTableChanged(index.value)) { - IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value); - for (ShardId shardId : entry.waitingIndices().get(index.value)) { - ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard(); - if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) { - return true; - } + private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snapshotsInProgress, ClusterChangedEvent event) { + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + if (entry.state() == State.STARTED) { + for (ObjectCursor index : entry.waitingIndices().keys()) { + if (event.indexRoutingTableChanged(index.value)) { + IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value); + for (ShardId shardId : entry.waitingIndices().get(index.value)) { + ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard(); + if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) { + return true; } } } @@ -917,28 +893,18 @@ private boolean waitingShardsStartedOrUnassigned(ClusterChangedEvent event) { return false; } - private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) { - SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - if (snapshotsInProgress == null) { - return false; + private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List removedNodes, + boolean newMaster) { + // We just replaced old master and snapshots in intermediate states needs to be cleaned + if (newMaster && snapshotsInProgress.entries().stream().anyMatch( + snapshot -> snapshot.state() == State.SUCCESS || snapshot.state() == State.INIT)) { + return true; } - // Check if we just became the master - boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster(); - for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { - if (newMaster && (snapshot.state() == State.SUCCESS || snapshot.state() == State.INIT)) { - // We just replaced old master and snapshots in intermediate states needs to be cleaned - return true; - } - for (DiscoveryNode node : event.nodesDelta().removedNodes()) { - for (ObjectCursor shardStatus : snapshot.shards().values()) { - if (!shardStatus.value.state().completed() && node.getId().equals(shardStatus.value.nodeId())) { - // At least one shard was running on the removed node - we need to fail it - return true; - } - } - } - } - return false; + // If at least one shard was running on a removed node - we need to fail it + return removedNodes.isEmpty() == false && snapshotsInProgress.entries().stream().flatMap(snapshot -> + StreamSupport.stream(((Iterable) () -> snapshot.shards().valuesIt()).spliterator(), false) + .filter(s -> s.state().completed() == false).map(ShardSnapshotStatus::nodeId)) + .anyMatch(removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())::contains); } /** @@ -971,7 +937,7 @@ private Tuple, Set> indicesWithMissingShards( * * @param entry snapshot */ - void endSnapshot(final SnapshotsInProgress.Entry entry) { + private void endSnapshot(final SnapshotsInProgress.Entry entry) { endSnapshot(entry, null); } @@ -986,7 +952,7 @@ void endSnapshot(final SnapshotsInProgress.Entry entry) { */ private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) { synchronized (endingSnapshots) { - if (endingSnapshots.add(entry.snapshot().getSnapshotId()) == false) { + if (endingSnapshots.add(entry.snapshot()) == false) { return; } } @@ -1010,7 +976,7 @@ protected void doRun() { entry.startTime(), failure, entry.shards().size(), - Collections.unmodifiableList(shardFailures), + unmodifiableList(shardFailures), entry.getRepositoryStateId(), entry.includeGlobalState()); removeSnapshotFromClusterState(snapshot, snapshotInfo, null); @@ -1028,7 +994,7 @@ public void onFailure(final Exception e) { /** * Removes record of running snapshot from cluster state - * @param snapshot snapshot + * @param snapshot snapshot * @param snapshotInfo snapshot info if snapshot was successful * @param e exception if snapshot failed */ @@ -1038,11 +1004,11 @@ private void removeSnapshotFromClusterState(final Snapshot snapshot, final Snaps /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete - * @param snapshot snapshot + * @param snapshot snapshot * @param failure exception if snapshot failed * @param listener listener to notify when snapshot information is removed from the cluster state */ - private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, final Exception failure, + private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable SnapshotInfo snapshotInfo, final Exception failure, @Nullable CleanupAfterErrorListener listener) { clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() { @@ -1060,8 +1026,8 @@ public ClusterState execute(ClusterState currentState) { } } if (changed) { - snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); } } return currentState; @@ -1070,7 +1036,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] failed to remove snapshot metadata", snapshot), e); - endedSnapshot(snapshot.getSnapshotId()); + endingSnapshots.remove(snapshot); if (listener != null) { listener.onFailure(e); } @@ -1078,7 +1044,7 @@ public void onFailure(String source, Exception e) { @Override public void onNoLongerMaster(String source) { - endedSnapshot(snapshot.getSnapshotId()); + endingSnapshots.remove(snapshot); if (listener != null) { listener.onNoLongerMaster(); } @@ -1098,7 +1064,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS logger.warn("Failed to notify listeners", e); } } - endedSnapshot(snapshot.getSnapshotId()); + endingSnapshots.remove(snapshot); if (listener != null) { listener.onResponse(snapshotInfo); } @@ -1106,12 +1072,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - private void endedSnapshot(SnapshotId snapshotId) { - synchronized (endingSnapshots) { - endingSnapshots.remove(snapshotId); - } - } - /** * Deletes a snapshot from the repository, looking up the {@link Snapshot} reference before deleting. * If the snapshot is still running cancels the snapshot first and then deletes it from the repository. @@ -1402,7 +1362,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS * @param indices list of indices to be snapshotted * @return list of shard to be included into current snapshot */ - private ImmutableOpenMap shards(ClusterState clusterState, List indices) { + private static ImmutableOpenMap shards(ClusterState clusterState, + List indices) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); MetaData metaData = clusterState.metaData(); for (IndexId index : indices) { From f0be60c14abcc64862117e2149434d7992a66949 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Feb 2019 18:28:45 +0100 Subject: [PATCH 05/14] worsk --- .../snapshots/DedicatedClusterSnapshotRestoreIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index e3254d785f5fc..b118d3a3d4933 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -988,7 +988,6 @@ public void testMasterShutdownDuringFailedSnapshot() throws Exception { * can be restored when the node the shrunken index was created on is no longer part of * the cluster. */ - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/38226") public void testRestoreShrinkIndex() throws Exception { logger.info("--> starting a master node and a data node"); internalCluster().startMasterOnlyNode(); From 70f41d2cbe913b0daf25e6f9a0e62ce0d7e3ef94 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Feb 2019 20:59:29 +0100 Subject: [PATCH 06/14] bck --- .../cluster/SnapshotsInProgress.java | 36 +++++++- .../blobstore/BlobStoreRepository.java | 3 - .../snapshots/SnapshotsService.java | 88 ++++++++++--------- .../discovery/SnapshotDisruptionIT.java | 3 - 4 files changed, 78 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 565c5134d1b38..e8efbba591b57 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -24,6 +24,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -87,9 +88,11 @@ public static class Entry { private final ImmutableOpenMap> waitingIndices; private final long startTime; private final long repositoryStateId; + @Nullable private final String failure; public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - long startTime, long repositoryStateId, ImmutableOpenMap shards) { + long startTime, long repositoryStateId, ImmutableOpenMap shards, + String failure) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -104,15 +107,26 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta this.waitingIndices = findWaitingIndices(shards); } this.repositoryStateId = repositoryStateId; + this.failure = failure; + } + + public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, + long startTime, long repositoryStateId, ImmutableOpenMap shards) { + this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null); } public Entry(Entry entry, State state, ImmutableOpenMap shards) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards); + entry.repositoryStateId, shards, null); + } + + public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) { + this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, + entry.repositoryStateId, shards, failure); } public Entry(Entry entry, ImmutableOpenMap shards) { - this(entry, entry.state, shards); + this(entry, entry.state, shards, entry.failure); } public Snapshot snapshot() { @@ -151,6 +165,10 @@ public long getRepositoryStateId() { return repositoryStateId; } + public String failure() { + return failure; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -427,6 +445,12 @@ public SnapshotsInProgress(StreamInput in) throws IOException { } } long repositoryStateId = in.readLong(); + final String failure; + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + failure = in.readOptionalString(); + } else { + failure = null; + } entries[i] = new Entry(snapshot, includeGlobalState, partial, @@ -434,7 +458,8 @@ public SnapshotsInProgress(StreamInput in) throws IOException { Collections.unmodifiableList(indexBuilder), startTime, repositoryStateId, - builder.build()); + builder.build(), + failure); } this.entries = Arrays.asList(entries); } @@ -463,6 +488,9 @@ public void writeTo(StreamOutput out) throws IOException { } } out.writeLong(entry.repositoryStateId); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeOptionalString(entry.failure); + } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index c8cdf0d4e0308..65ed3ce03cb4a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -516,9 +516,6 @@ private void deleteIndexMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotIn } } - /** - * {@inheritDoc} - */ @Override public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, final List indices, diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index be96f927db6cc..1540724c0664e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -123,7 +123,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final Map>> snapshotCompletionListeners = new ConcurrentHashMap<>(); - // Set of snapshots that are currently being + // Set of snapshots that are currently being initialized by this node + private final Set initializingSnapshots = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + // Set of snapshots that are currently being ended by this node private final Set endingSnapshots = Collections.newSetFromMap(new ConcurrentHashMap<>()); @Inject @@ -284,7 +287,10 @@ public ClusterState execute(ClusterState currentState) { snapshotIndices, System.currentTimeMillis(), repositoryData.getGenId(), + null, null); + final boolean added = initializingSnapshots.add(newSnapshot.snapshot()); + assert added; snapshots = new SnapshotsInProgress(newSnapshot); } else { throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); @@ -295,6 +301,9 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); + if (newSnapshot != null) { + initializingSnapshots.remove(newSnapshot.snapshot()); + } newSnapshot = null; listener.onFailure(e); } @@ -302,7 +311,21 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { if (newSnapshot != null) { - beginSnapshot(newState, newSnapshot, request.partial(), listener); + final Snapshot current = newSnapshot.snapshot(); + assert initializingSnapshots.contains(current); + beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener() { + @Override + public void onResponse(final Snapshot snapshot) { + initializingSnapshots.remove(snapshot); + listener.onResponse(snapshot); + } + + @Override + public void onFailure(final Exception e) { + initializingSnapshots.remove(current); + listener.onFailure(e); + } + }); } } @@ -375,6 +398,7 @@ private void beginSnapshot(final ClusterState clusterState, @Override protected void doRun() { + assert initializingSnapshots.contains(snapshot.snapshot()); Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository()); MetaData metaData = clusterState.metaData(); @@ -399,6 +423,7 @@ protected void doRun() { logger.info("snapshot [{}] started", snapshot.snapshot()); if (snapshot.indices().isEmpty()) { // No indices in this snapshot - we are done + endSnapshot(snapshot); userCreateSnapshotListener.onResponse(snapshot.snapshot()); return; } @@ -426,8 +451,6 @@ public ClusterState execute(ClusterState currentState) { Set missing = indicesWithMissingShards.v1(); Set closed = indicesWithMissingShards.v2(); if (missing.isEmpty() == false || closed.isEmpty() == false) { - entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, shards)); - final StringBuilder failureMessage = new StringBuilder(); if (missing.isEmpty() == false) { failureMessage.append("Indices don't have primary shards "); @@ -440,6 +463,7 @@ public ClusterState execute(ClusterState currentState) { failureMessage.append("Indices are closed "); failureMessage.append(closed); } + entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString())); continue; } } @@ -670,22 +694,25 @@ public void applyClusterState(ClusterChangedEvent event) { try { if (event.localNodeMaster()) { // We don't remove old master when master flips anymore. So, we need to check for change in master - final boolean newMaster = event.previousState().nodes().isLocalNodeElectedMaster() == false; final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); if (snapshotsInProgress != null) { - if (removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes(), newMaster)) { - processSnapshotsOnRemovedNodes(newMaster); + if (removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes())) { + processSnapshotsOnRemovedNodes(); } if (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) { processStartedShards(); } - // Removes finished snapshots from the cluster state, that the previous master failed to end properly before dying. - snapshotsInProgress.entries().stream() - .filter( - entry -> entry.state().completed() || (newMaster || entry.state() != State.INIT) && entry.shards().isEmpty()) - .forEach(this::endSnapshot); + // Cleanup all snapshots that have no more work left: + // 1. Completed snapshots + // 2. Snapshots in state INIT that the previous master failed to start + // 3. Snapshots in any other state that have all their shard tasks completed + snapshotsInProgress.entries().stream().filter( + entry -> entry.state().completed() + || entry.state() == State.INIT && initializingSnapshots.contains(entry.snapshot()) == false + || entry.state() != State.INIT && completed(entry.shards().values()) + ).forEach(this::endSnapshot); } - if (newMaster) { + if (event.previousState().nodes().isLocalNodeElectedMaster() == false) { finalizeSnapshotDeletionFromPreviousMaster(event); } } @@ -716,9 +743,8 @@ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterChangedEvent even /** * Cleans up shard snapshots that were running on removed nodes - * @param newMaster true if this master was not master in the previous state */ - private void processSnapshotsOnRemovedNodes(boolean newMaster) { + private void processSnapshotsOnRemovedNodes() { clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -759,7 +785,7 @@ public ClusterState execute(ClusterState currentState) { } } entries.add(updatedSnapshot); - } else if (snapshot.state() == State.INIT && newMaster) { + } else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) { changed = true; // Mark the snapshot as aborted as it failed to start from the previous master updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); @@ -893,13 +919,7 @@ private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snap return false; } - private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List removedNodes, - boolean newMaster) { - // We just replaced old master and snapshots in intermediate states needs to be cleaned - if (newMaster && snapshotsInProgress.entries().stream().anyMatch( - snapshot -> snapshot.state() == State.SUCCESS || snapshot.state() == State.INIT)) { - return true; - } + private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List removedNodes) { // If at least one shard was running on a removed node - we need to fail it return removedNodes.isEmpty() == false && snapshotsInProgress.entries().stream().flatMap(snapshot -> StreamSupport.stream(((Iterable) () -> snapshot.shards().valuesIt()).spliterator(), false) @@ -938,29 +958,15 @@ private Tuple, Set> indicesWithMissingShards( * @param entry snapshot */ private void endSnapshot(final SnapshotsInProgress.Entry entry) { - endSnapshot(entry, null); - } - - - /** - * Finalizes the shard in repository and then removes it from cluster state - *

- * This is non-blocking method that runs on a thread from SNAPSHOT thread pool - * - * @param entry snapshot - * @param failure failure reason or null if snapshot was successful - */ - private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) { - synchronized (endingSnapshots) { - if (endingSnapshots.add(entry.snapshot()) == false) { - return; - } + if (endingSnapshots.add(entry.snapshot()) == false) { + return; } threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @Override protected void doRun() { final Snapshot snapshot = entry.snapshot(); final Repository repository = repositoriesService.repository(snapshot.getRepository()); + final String failure = entry.failure(); logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); ArrayList shardFailures = new ArrayList<>(); for (ObjectObjectCursor shardStatus : entry.shards()) { @@ -1175,8 +1181,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { // snapshot is still initializing, mark it as aborted shards = snapshotEntry.shards(); assert shards.isEmpty(); - // No shards in this snapshot, we delete it right away since the SnapshotShardsService - // has no work to do. } else if (state == State.STARTED) { // snapshot is started - mark every non completed shard as aborted final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); diff --git a/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java index d732f51bd542a..16f2441ba6028 100644 --- a/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -156,9 +156,6 @@ public void clusterChanged(ClusterChangedEvent event) { logger.info("--> got exception from race in master operation retries"); } else { logger.info("--> got exception from hanged master", ex); - assertThat(cause, instanceOf(MasterNotDiscoveredException.class)); - cause = cause.getCause(); - assertThat(cause, instanceOf(FailedToCommitClusterStateException.class)); } } From a65dfed71053a4491566b01359f7f0a86490ce0e Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Feb 2019 21:05:30 +0100 Subject: [PATCH 07/14] add asserts --- .../java/org/elasticsearch/cluster/SnapshotsInProgress.java | 2 ++ .../java/org/elasticsearch/discovery/SnapshotDisruptionIT.java | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index e8efbba591b57..126ea82a82041 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -93,6 +93,8 @@ public static class Entry { public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, long startTime, long repositoryStateId, ImmutableOpenMap shards, String failure) { + assert state != State.MISSING && state != State.WAITING : state; + assert state != State.INIT || shards == null || shards.isEmpty(); this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; diff --git a/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java index 16f2441ba6028..8548b332c1ad1 100644 --- a/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; -import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -52,7 +51,6 @@ import org.elasticsearch.test.transport.MockTransportService; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.instanceOf; /** * Tests snapshot operations during disruptions. From 1207b60fbc942ec8408e06db4fb7aa7c60c99314 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Feb 2019 21:10:18 +0100 Subject: [PATCH 08/14] add asserts --- .../java/org/elasticsearch/cluster/SnapshotsInProgress.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 126ea82a82041..e8efbba591b57 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -93,8 +93,6 @@ public static class Entry { public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, long startTime, long repositoryStateId, ImmutableOpenMap shards, String failure) { - assert state != State.MISSING && state != State.WAITING : state; - assert state != State.INIT || shards == null || shards.isEmpty(); this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; From e9c4a96c5ce71cb9e68d596cd3282142bc0c58d2 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Feb 2019 21:12:50 +0100 Subject: [PATCH 09/14] remove pointless assert --- .../java/org/elasticsearch/snapshots/SnapshotsService.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 1540724c0664e..170597e30ecde 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -287,10 +287,8 @@ public ClusterState execute(ClusterState currentState) { snapshotIndices, System.currentTimeMillis(), repositoryData.getGenId(), - null, null); - final boolean added = initializingSnapshots.add(newSnapshot.snapshot()); - assert added; + initializingSnapshots.add(newSnapshot.snapshot()); snapshots = new SnapshotsInProgress(newSnapshot); } else { throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); From d70a0265e61b09f553c3ed4357e3e729a3d289c4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Feb 2019 21:24:38 +0100 Subject: [PATCH 10/14] cleaner --- .../main/java/org/elasticsearch/snapshots/SnapshotsService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 170597e30ecde..913dde18eef44 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -421,8 +421,8 @@ protected void doRun() { logger.info("snapshot [{}] started", snapshot.snapshot()); if (snapshot.indices().isEmpty()) { // No indices in this snapshot - we are done - endSnapshot(snapshot); userCreateSnapshotListener.onResponse(snapshot.snapshot()); + endSnapshot(snapshot); return; } clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { From 8a891be4fdde24be0b1dc1f6b800e01aa42062e1 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 Feb 2019 08:13:38 +0100 Subject: [PATCH 11/14] revert noisy change --- .../repositories/blobstore/BlobStoreRepository.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 65ed3ce03cb4a..c8cdf0d4e0308 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -516,6 +516,9 @@ private void deleteIndexMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotIn } } + /** + * {@inheritDoc} + */ @Override public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, final List indices, From 3de4346b8ad748d4d209e2304c8779d2558f14a4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 Feb 2019 08:24:00 +0100 Subject: [PATCH 12/14] some cleanups --- .../org/elasticsearch/cluster/SnapshotsInProgress.java | 2 +- .../java/org/elasticsearch/snapshots/SnapshotsService.java | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index e8efbba591b57..c839acf45fc80 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -117,7 +117,7 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta public Entry(Entry entry, State state, ImmutableOpenMap shards) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, null); + entry.repositoryStateId, shards, entry.failure); } public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 913dde18eef44..a1c1539281a8f 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1175,10 +1175,12 @@ public ClusterState execute(ClusterState currentState) throws Exception { final ImmutableOpenMap shards; final State state = snapshotEntry.state(); + final String failure; if (state == State.INIT) { // snapshot is still initializing, mark it as aborted shards = snapshotEntry.shards(); assert shards.isEmpty(); + failure = "Snapshot was aborted during initialization"; } else if (state == State.STARTED) { // snapshot is started - mark every non completed shard as aborted final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); @@ -1190,7 +1192,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { shardsBuilder.put(shardEntry.key, status); } shards = shardsBuilder.build(); - + failure = "Snapshot was aborted by deletion"; } else { boolean hasUncompletedShards = false; // Cleanup in case a node gone missing and snapshot wasn't updated for some reason @@ -1212,8 +1214,9 @@ public ClusterState execute(ClusterState currentState) throws Exception { logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); shards = snapshotEntry.shards(); } + failure = snapshotEntry.failure(); } - SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards); + SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure); clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(newSnapshot)); } return clusterStateBuilder.build(); From c042d0743021b87bce470eb4d608fe11f7ae73c6 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 Feb 2019 08:41:55 +0100 Subject: [PATCH 13/14] remove redundant guard --- .../java/org/elasticsearch/snapshots/SnapshotsService.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index a1c1539281a8f..3ebbc92e04cea 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -409,12 +409,6 @@ protected void doRun() { metaData = builder.build(); } - if (endingSnapshots.contains(snapshot.snapshot())) { - // Snapshot was already aborted concurrently, we're done here. - userCreateSnapshotListener.onResponse(snapshot.snapshot()); - return; - } - repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData); snapshotCreated = true; From f8d3582544a7d97c3ac9fc96a10b4aeb500dbc42 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 Feb 2019 14:55:08 +0100 Subject: [PATCH 14/14] cheaper set --- .../java/org/elasticsearch/snapshots/SnapshotsService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 3ebbc92e04cea..17306e1585ff8 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -124,10 +124,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final Map>> snapshotCompletionListeners = new ConcurrentHashMap<>(); // Set of snapshots that are currently being initialized by this node - private final Set initializingSnapshots = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set initializingSnapshots = Collections.synchronizedSet(new HashSet<>()); // Set of snapshots that are currently being ended by this node - private final Set endingSnapshots = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set endingSnapshots = Collections.synchronizedSet(new HashSet<>()); @Inject public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,