@@ -152,7 +152,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
152152 // Set of snapshots that are currently being ended by this node
153153 private final Set <Snapshot > endingSnapshots = Collections .synchronizedSet (new HashSet <>());
154154
155- private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor ();
156155 private final UpdateSnapshotStatusAction updateSnapshotStatusHandler ;
157156
158157 private final TransportService transportService ;
@@ -1909,101 +1908,130 @@ public boolean assertAllListenersResolved() {
19091908 return true ;
19101909 }
19111910
1912- private static class SnapshotStateExecutor implements ClusterStateTaskExecutor <UpdateIndexShardSnapshotStatusRequest > {
1913-
1914- @ Override
1915- public ClusterTasksResult <UpdateIndexShardSnapshotStatusRequest >
1916- execute (ClusterState currentState , List <UpdateIndexShardSnapshotStatusRequest > tasks ) {
1917- int changedCount = 0 ;
1918- int startedCount = 0 ;
1919- final List <SnapshotsInProgress .Entry > entries = new ArrayList <>();
1920- // Tasks to check for updates for running snapshots.
1921- final List <UpdateIndexShardSnapshotStatusRequest > unconsumedTasks = new ArrayList <>(tasks );
1922- // Tasks that were used to complete an existing in-progress shard snapshot
1923- final Set <UpdateIndexShardSnapshotStatusRequest > executedTasks = new HashSet <>();
1924- for (SnapshotsInProgress .Entry entry : currentState .custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY ).entries ()) {
1925- if (entry .state ().completed ()) {
1926- entries .add (entry );
1911+ private static final ClusterStateTaskExecutor <ShardSnapshotUpdate > SHARD_STATE_EXECUTOR = (currentState , tasks ) -> {
1912+ int changedCount = 0 ;
1913+ int startedCount = 0 ;
1914+ final List <SnapshotsInProgress .Entry > entries = new ArrayList <>();
1915+ // Tasks to check for updates for running snapshots.
1916+ final List <ShardSnapshotUpdate > unconsumedTasks = new ArrayList <>(tasks );
1917+ // Tasks that were used to complete an existing in-progress shard snapshot
1918+ final Set <ShardSnapshotUpdate > executedTasks = new HashSet <>();
1919+ for (SnapshotsInProgress .Entry entry : currentState .custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY ).entries ()) {
1920+ if (entry .state ().completed ()) {
1921+ entries .add (entry );
1922+ continue ;
1923+ }
1924+ ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shards = null ;
1925+ for (Iterator <ShardSnapshotUpdate > iterator = unconsumedTasks .iterator (); iterator .hasNext (); ) {
1926+ final ShardSnapshotUpdate updateSnapshotState = iterator .next ();
1927+ final Snapshot updatedSnapshot = updateSnapshotState .snapshot ;
1928+ final String updatedRepository = updatedSnapshot .getRepository ();
1929+ if (entry .repository ().equals (updatedRepository ) == false ) {
19271930 continue ;
19281931 }
1929- ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shards = null ;
1930- for (Iterator <UpdateIndexShardSnapshotStatusRequest > iterator = unconsumedTasks .iterator (); iterator .hasNext (); ) {
1931- final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = iterator .next ();
1932- final Snapshot updatedSnapshot = updateSnapshotState .snapshot ();
1933- final String updatedRepository = updatedSnapshot .getRepository ();
1934- if (entry .repository ().equals (updatedRepository ) == false ) {
1932+ final ShardId finishedShardId = updateSnapshotState .shardId ;
1933+ if (entry .snapshot ().getSnapshotId ().equals (updatedSnapshot .getSnapshotId ())) {
1934+ final ShardSnapshotStatus existing = entry .shards ().get (finishedShardId );
1935+ if (existing == null ) {
1936+ logger .warn ("Received shard snapshot status update [{}] but this shard is not tracked in [{}]" ,
1937+ updateSnapshotState , entry );
1938+ assert false : "This should never happen, data nodes should only send updates for expected shards" ;
19351939 continue ;
19361940 }
1937- final ShardId finishedShardId = updateSnapshotState .shardId ();
1938- if (entry .snapshot ().getSnapshotId ().equals (updatedSnapshot .getSnapshotId ())) {
1939- final ShardSnapshotStatus existing = entry .shards ().get (finishedShardId );
1940- if (existing == null ) {
1941- logger .warn ("Received shard snapshot status update [{}] but this shard is not tracked in [{}]" ,
1942- updateSnapshotState , entry );
1943- assert false : "This should never happen, data nodes should only send updates for expected shards" ;
1944- continue ;
1945- }
1946- if (existing .state ().completed ()) {
1947- // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
1948- iterator .remove ();
1949- continue ;
1950- }
1951- logger .trace ("[{}] Updating shard [{}] with status [{}]" , updatedSnapshot ,
1952- finishedShardId , updateSnapshotState .status ().state ());
1953- if (shards == null ) {
1954- shards = ImmutableOpenMap .builder (entry .shards ());
1955- }
1956- shards .put (finishedShardId , updateSnapshotState .status ());
1957- executedTasks .add (updateSnapshotState );
1958- changedCount ++;
1959- } else if (executedTasks .contains (updateSnapshotState )) {
1960- // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
1961- final ShardSnapshotStatus existingStatus = entry .shards ().get (finishedShardId );
1962- if (existingStatus == null || existingStatus .state () != ShardState .QUEUED ) {
1963- continue ;
1964- }
1965- if (shards == null ) {
1966- shards = ImmutableOpenMap .builder (entry .shards ());
1967- }
1968- final ShardSnapshotStatus finishedStatus = updateSnapshotState .status ();
1969- logger .trace ("Starting [{}] on [{}] with generation [{}]" , finishedShardId ,
1970- finishedStatus .nodeId (), finishedStatus .generation ());
1971- shards .put (finishedShardId , new ShardSnapshotStatus (finishedStatus .nodeId (), finishedStatus .generation ()));
1941+ if (existing .state ().completed ()) {
1942+ // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
19721943 iterator .remove ();
1973- startedCount ++;
1944+ continue ;
1945+ }
1946+ logger .trace ("[{}] Updating shard [{}] with status [{}]" , updatedSnapshot ,
1947+ finishedShardId , updateSnapshotState .updatedState .state ());
1948+ if (shards == null ) {
1949+ shards = ImmutableOpenMap .builder (entry .shards ());
1950+ }
1951+ shards .put (finishedShardId , updateSnapshotState .updatedState );
1952+ executedTasks .add (updateSnapshotState );
1953+ changedCount ++;
1954+ } else if (executedTasks .contains (updateSnapshotState )) {
1955+ // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
1956+ final ShardSnapshotStatus existingStatus = entry .shards ().get (finishedShardId );
1957+ if (existingStatus == null || existingStatus .state () != ShardState .QUEUED ) {
1958+ continue ;
1959+ }
1960+ if (shards == null ) {
1961+ shards = ImmutableOpenMap .builder (entry .shards ());
19741962 }
1963+ final ShardSnapshotStatus finishedStatus = updateSnapshotState .updatedState ;
1964+ logger .trace ("Starting [{}] on [{}] with generation [{}]" , finishedShardId ,
1965+ finishedStatus .nodeId (), finishedStatus .generation ());
1966+ shards .put (finishedShardId , new ShardSnapshotStatus (finishedStatus .nodeId (), finishedStatus .generation ()));
1967+ iterator .remove ();
1968+ startedCount ++;
19751969 }
1970+ }
19761971
1977- if (shards == null ) {
1978- entries .add (entry );
1979- } else {
1980- entries .add (entry .withShardStates (shards .build ()));
1981- }
1972+ if (shards == null ) {
1973+ entries .add (entry );
1974+ } else {
1975+ entries .add (entry .withShardStates (shards .build ()));
19821976 }
1983- if (changedCount > 0 ) {
1984- logger .trace ("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
1985- "[{}] shard snapshots" , changedCount , startedCount );
1986- return ClusterTasksResult .<UpdateIndexShardSnapshotStatusRequest >builder ().successes (tasks )
1987- .build (ClusterState .builder (currentState ).putCustom (SnapshotsInProgress .TYPE ,
1988- SnapshotsInProgress .of (entries )).build ());
1977+ }
1978+ if (changedCount > 0 ) {
1979+ logger .trace ("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
1980+ "[{}] shard snapshots" , changedCount , startedCount );
1981+ return ClusterStateTaskExecutor .ClusterTasksResult .<ShardSnapshotUpdate >builder ().successes (tasks ).build (
1982+ ClusterState .builder (currentState ).putCustom (SnapshotsInProgress .TYPE , SnapshotsInProgress .of (entries )).build ());
1983+ }
1984+ return ClusterStateTaskExecutor .ClusterTasksResult .<ShardSnapshotUpdate >builder ().successes (tasks ).build (currentState );
1985+ };
1986+
1987+ /**
1988+ * An update to the snapshot state of a shard.
1989+ */
1990+ private static final class ShardSnapshotUpdate {
1991+
1992+ private final Snapshot snapshot ;
1993+
1994+ private final ShardId shardId ;
1995+
1996+ private final ShardSnapshotStatus updatedState ;
1997+
1998+ private ShardSnapshotUpdate (Snapshot snapshot , ShardId shardId , ShardSnapshotStatus updatedState ) {
1999+ this .snapshot = snapshot ;
2000+ this .shardId = shardId ;
2001+ this .updatedState = updatedState ;
2002+ }
2003+
2004+ @ Override
2005+ public boolean equals (Object other ) {
2006+ if (this == other ) {
2007+ return true ;
19892008 }
1990- return ClusterTasksResult .<UpdateIndexShardSnapshotStatusRequest >builder ().successes (tasks ).build (currentState );
2009+ if ((other instanceof ShardSnapshotUpdate ) == false ) {
2010+ return false ;
2011+ }
2012+ final ShardSnapshotUpdate that = (ShardSnapshotUpdate ) other ;
2013+ return this .snapshot .equals (that .snapshot ) && this .shardId .equals (that .shardId ) && this .updatedState == that .updatedState ;
2014+ }
2015+
2016+
2017+ @ Override
2018+ public int hashCode () {
2019+ return Objects .hash (snapshot , shardId , updatedState );
19912020 }
19922021 }
19932022
19942023 /**
1995- * Updates the shard status on master node
2024+ * Updates the shard status in the cluster state
19962025 *
1997- * @param request update shard status request
2026+ * @param update shard snapshot status update
19982027 */
1999- private void innerUpdateSnapshotState (final UpdateIndexShardSnapshotStatusRequest request ,
2000- ActionListener <UpdateIndexShardSnapshotStatusResponse > listener ) {
2001- logger .trace ("received updated snapshot restore state [{}]" , request );
2028+ private void innerUpdateSnapshotState (ShardSnapshotUpdate update , ActionListener <Void > listener ) {
2029+ logger .trace ("received updated snapshot restore state [{}]" , update );
20022030 clusterService .submitStateUpdateTask (
20032031 "update snapshot state" ,
2004- request ,
2032+ update ,
20052033 ClusterStateTaskConfig .build (Priority .NORMAL ),
2006- snapshotStateExecutor ,
2034+ SHARD_STATE_EXECUTOR ,
20072035 new ClusterStateTaskListener () {
20082036 @ Override
20092037 public void onFailure (String source , Exception e ) {
@@ -2013,13 +2041,13 @@ public void onFailure(String source, Exception e) {
20132041 @ Override
20142042 public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
20152043 try {
2016- listener .onResponse (new UpdateIndexShardSnapshotStatusResponse () );
2044+ listener .onResponse (null );
20172045 } finally {
20182046 // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
20192047 // state update we check if its state is completed and end it if it is.
2020- if (endingSnapshots .contains (request .snapshot () ) == false ) {
2048+ if (endingSnapshots .contains (update .snapshot ) == false ) {
20212049 final SnapshotsInProgress snapshotsInProgress = newState .custom (SnapshotsInProgress .TYPE );
2022- final SnapshotsInProgress .Entry updatedEntry = snapshotsInProgress .snapshot (request .snapshot () );
2050+ final SnapshotsInProgress .Entry updatedEntry = snapshotsInProgress .snapshot (update .snapshot );
20232051 // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
20242052 if (updatedEntry != null && updatedEntry .state ().completed ()) {
20252053 endSnapshot (updatedEntry , newState .metadata (), null );
@@ -2047,13 +2075,14 @@ protected String executor() {
20472075
20482076 @ Override
20492077 protected UpdateIndexShardSnapshotStatusResponse read (StreamInput in ) throws IOException {
2050- return new UpdateIndexShardSnapshotStatusResponse ( in ) ;
2078+ return UpdateIndexShardSnapshotStatusResponse . INSTANCE ;
20512079 }
20522080
20532081 @ Override
20542082 protected void masterOperation (Task task , UpdateIndexShardSnapshotStatusRequest request , ClusterState state ,
20552083 ActionListener <UpdateIndexShardSnapshotStatusResponse > listener ) {
2056- innerUpdateSnapshotState (request , listener );
2084+ innerUpdateSnapshotState (new ShardSnapshotUpdate (request .snapshot (), request .shardId (), request .status ()),
2085+ ActionListener .delegateFailure (listener , (l , v ) -> l .onResponse (UpdateIndexShardSnapshotStatusResponse .INSTANCE )));
20572086 }
20582087
20592088 @ Override
0 commit comments