@@ -372,26 +372,32 @@ private void beginSnapshot(final ClusterState clusterState,
372372 return ;
373373 }
374374 clusterService .submitStateUpdateTask ("update_snapshot [" + snapshot .snapshot () + "]" , new ClusterStateUpdateTask () {
375- boolean accepted = false ;
376- SnapshotsInProgress .Entry updatedSnapshot ;
375+
376+ SnapshotsInProgress .Entry endSnapshot ;
377377 String failure = null ;
378378
379379 @ Override
380380 public ClusterState execute (ClusterState currentState ) {
381381 SnapshotsInProgress snapshots = currentState .custom (SnapshotsInProgress .TYPE );
382382 List <SnapshotsInProgress .Entry > entries = new ArrayList <>();
383383 for (SnapshotsInProgress .Entry entry : snapshots .entries ()) {
384- if (entry .snapshot ().equals (snapshot .snapshot ()) && entry .state () != State .ABORTED ) {
385- // Replace the snapshot that was just created
384+ if (entry .snapshot ().equals (snapshot .snapshot ()) == false ) {
385+ entries .add (entry );
386+ continue ;
387+ }
388+
389+ if (entry .state () != State .ABORTED ) {
390+ // Replace the snapshot that was just intialized
386391 ImmutableOpenMap <ShardId , SnapshotsInProgress .ShardSnapshotStatus > shards = shards (currentState , entry .indices ());
387392 if (!partial ) {
388393 Tuple <Set <String >, Set <String >> indicesWithMissingShards = indicesWithMissingShards (shards , currentState .metaData ());
389394 Set <String > missing = indicesWithMissingShards .v1 ();
390395 Set <String > closed = indicesWithMissingShards .v2 ();
391396 if (missing .isEmpty () == false || closed .isEmpty () == false ) {
392- StringBuilder failureMessage = new StringBuilder ();
393- updatedSnapshot = new SnapshotsInProgress .Entry (entry , State .FAILED , shards );
394- entries .add (updatedSnapshot );
397+ endSnapshot = new SnapshotsInProgress .Entry (entry , State .FAILED , shards );
398+ entries .add (endSnapshot );
399+
400+ final StringBuilder failureMessage = new StringBuilder ();
395401 if (missing .isEmpty () == false ) {
396402 failureMessage .append ("Indices don't have primary shards " );
397403 failureMessage .append (missing );
@@ -407,13 +413,16 @@ public ClusterState execute(ClusterState currentState) {
407413 continue ;
408414 }
409415 }
410- updatedSnapshot = new SnapshotsInProgress .Entry (entry , State .STARTED , shards );
416+ SnapshotsInProgress . Entry updatedSnapshot = new SnapshotsInProgress .Entry (entry , State .STARTED , shards );
411417 entries .add (updatedSnapshot );
412- if (! completed (shards .values ())) {
413- accepted = true ;
418+ if (completed (shards .values ())) {
419+ endSnapshot = updatedSnapshot ;
414420 }
415421 } else {
416- entries .add (entry );
422+ assert entry .state () == State .ABORTED : "expecting snapshot to be aborted during initialization" ;
423+ failure = "snapshot was aborted during initialization" ;
424+ endSnapshot = entry ;
425+ entries .add (endSnapshot );
417426 }
418427 }
419428 return ClusterState .builder (currentState )
@@ -448,8 +457,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
448457 // We should end snapshot only if 1) we didn't accept it for processing (which happens when there
449458 // is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
450459 // go ahead and continue working on this snapshot rather then end here.
451- if (! accepted && updatedSnapshot != null ) {
452- endSnapshot (updatedSnapshot , failure );
460+ if (endSnapshot != null ) {
461+ endSnapshot (endSnapshot , failure );
453462 }
454463 }
455464 });
@@ -749,6 +758,11 @@ public ClusterState execute(ClusterState currentState) throws Exception {
749758 }
750759 entries .add (updatedSnapshot );
751760 } else if (snapshot .state () == State .INIT && newMaster ) {
761+ changed = true ;
762+ // Mark the snapshot as aborted as it failed to start from the previous master
763+ updatedSnapshot = new SnapshotsInProgress .Entry (snapshot , State .ABORTED , snapshot .shards ());
764+ entries .add (updatedSnapshot );
765+
752766 // Clean up the snapshot that failed to start from the old master
753767 deleteSnapshot (snapshot .snapshot (), new DeleteSnapshotListener () {
754768 @ Override
@@ -934,7 +948,7 @@ private Tuple<Set<String>, Set<String>> indicesWithMissingShards(ImmutableOpenMa
934948 *
935949 * @param entry snapshot
936950 */
937- void endSnapshot (SnapshotsInProgress .Entry entry ) {
951+ void endSnapshot (final SnapshotsInProgress .Entry entry ) {
938952 endSnapshot (entry , null );
939953 }
940954
@@ -1142,24 +1156,26 @@ public ClusterState execute(ClusterState currentState) throws Exception {
11421156 } else {
11431157 // This snapshot is currently running - stopping shards first
11441158 waitForSnapshot = true ;
1145- ImmutableOpenMap <ShardId , ShardSnapshotStatus > shards ;
1146- if (snapshotEntry .state () == State .STARTED && snapshotEntry .shards () != null ) {
1147- // snapshot is currently running - stop started shards
1148- ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder = ImmutableOpenMap .builder ();
1159+
1160+ final ImmutableOpenMap <ShardId , ShardSnapshotStatus > shards ;
1161+
1162+ final State state = snapshotEntry .state ();
1163+ if (state == State .INIT ) {
1164+ // snapshot is still initializing, mark it as aborted
1165+ shards = snapshotEntry .shards ();
1166+
1167+ } else if (state == State .STARTED ) {
1168+ // snapshot is started - mark every non completed shard as aborted
1169+ final ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder = ImmutableOpenMap .builder ();
11491170 for (ObjectObjectCursor <ShardId , ShardSnapshotStatus > shardEntry : snapshotEntry .shards ()) {
11501171 ShardSnapshotStatus status = shardEntry .value ;
1151- if (!status .state ().completed ()) {
1152- shardsBuilder .put (shardEntry .key , new ShardSnapshotStatus (status .nodeId (), State .ABORTED ,
1153- "aborted by snapshot deletion" ));
1154- } else {
1155- shardsBuilder .put (shardEntry .key , status );
1172+ if (status .state ().completed () == false ) {
1173+ status = new ShardSnapshotStatus (status .nodeId (), State .ABORTED , "aborted by snapshot deletion" );
11561174 }
1175+ shardsBuilder .put (shardEntry .key , status );
11571176 }
11581177 shards = shardsBuilder .build ();
1159- } else if (snapshotEntry .state () == State .INIT ) {
1160- // snapshot hasn't started yet - end it
1161- shards = snapshotEntry .shards ();
1162- endSnapshot (snapshotEntry );
1178+
11631179 } else {
11641180 boolean hasUncompletedShards = false ;
11651181 // Cleanup in case a node gone missing and snapshot wasn't updated for some reason
@@ -1176,7 +1192,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
11761192 logger .debug ("trying to delete completed snapshot - should wait for shards to finalize on all nodes" );
11771193 return currentState ;
11781194 } else {
1179- // no shards to wait for - finish the snapshot
1195+ // no shards to wait for but a node is gone - this is the only case
1196+ // where we force to finish the snapshot
11801197 logger .debug ("trying to delete completed snapshot with no finalizing shards - can delete immediately" );
11811198 shards = snapshotEntry .shards ();
11821199 endSnapshot (snapshotEntry );
0 commit comments