2222import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
2323import org .apache .logging .log4j .message .ParameterizedMessage ;
2424import org .apache .logging .log4j .util .Supplier ;
25+ import org .apache .lucene .util .SetOnce ;
2526import org .elasticsearch .ExceptionsHelper ;
2627import org .elasticsearch .Version ;
2728import org .elasticsearch .action .ActionListener ;
6768import org .elasticsearch .repositories .IndexId ;
6869import org .elasticsearch .repositories .Repository ;
6970import org .elasticsearch .threadpool .ThreadPool ;
70- import org .elasticsearch .transport .EmptyTransportResponseHandler ;
7171import org .elasticsearch .transport .TransportChannel ;
7272import org .elasticsearch .transport .TransportRequest ;
7373import org .elasticsearch .transport .TransportRequestHandler ;
9090import static java .util .Collections .emptyMap ;
9191import static java .util .Collections .unmodifiableMap ;
9292import static org .elasticsearch .cluster .SnapshotsInProgress .completed ;
93+ import static org .elasticsearch .transport .EmptyTransportResponseHandler .INSTANCE_SAME ;
9394
9495/**
9596 * This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for
@@ -167,7 +168,6 @@ protected void doStop() {
167168 } finally {
168169 shutdownLock .unlock ();
169170 }
170-
171171 }
172172
173173 @ Override
@@ -178,14 +178,16 @@ protected void doClose() {
178178 @ Override
179179 public void clusterChanged (ClusterChangedEvent event ) {
180180 try {
181- SnapshotsInProgress prev = event .previousState ().custom (SnapshotsInProgress .TYPE );
182- SnapshotsInProgress curr = event .state ().custom (SnapshotsInProgress .TYPE );
183-
184- if (( prev == null && curr != null ) || (prev != null && prev .equals (curr ) == false )) {
181+ SnapshotsInProgress previousSnapshots = event .previousState ().custom (SnapshotsInProgress .TYPE );
182+ SnapshotsInProgress currentSnapshots = event .state ().custom (SnapshotsInProgress .TYPE );
183+ if (( previousSnapshots == null && currentSnapshots != null )
184+ || (previousSnapshots != null && previousSnapshots .equals (currentSnapshots ) == false )) {
185185 processIndexShardSnapshots (event );
186186 }
187- String masterNodeId = event .state ().nodes ().getMasterNodeId ();
188- if (masterNodeId != null && masterNodeId .equals (event .previousState ().nodes ().getMasterNodeId ()) == false ) {
187+
188+ String previousMasterNodeId = event .previousState ().nodes ().getMasterNodeId ();
189+ String currentMasterNodeId = event .state ().nodes ().getMasterNodeId ();
190+ if (currentMasterNodeId != null && currentMasterNodeId .equals (previousMasterNodeId ) == false ) {
189191 syncShardStatsOnNewMaster (event );
190192 }
191193
@@ -302,17 +304,18 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
302304 snapshotStatus .abort ();
303305 break ;
304306 case FINALIZE :
305- logger .debug ("[{}] trying to cancel snapshot on shard [{}] that is finalizing, letting it finish" , entry .snapshot (), shard .key );
307+ logger .debug ("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " +
308+ "letting it finish" , entry .snapshot (), shard .key );
306309 break ;
307310 case DONE :
308- logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master" , entry . snapshot (), shard . key );
309- updateIndexShardSnapshotStatus ( entry .snapshot (), shard .key ,
310- new ShardSnapshotStatus ( localNodeId , State . SUCCESS ) , masterNode );
311+ logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that is already done, " +
312+ "updating status on the master" , entry .snapshot (), shard .key );
313+ notifySuccessfulSnapshotShard ( entry . snapshot (), shard . key , localNodeId , masterNode );
311314 break ;
312315 case FAILURE :
313- logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master" , entry . snapshot (), shard . key );
314- updateIndexShardSnapshotStatus ( entry .snapshot (), shard .key ,
315- new ShardSnapshotStatus ( localNodeId , State . FAILED , snapshotStatus .failure () ), masterNode );
316+ logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " +
317+ "updating status on the master" , entry .snapshot (), shard .key );
318+ notifyFailedSnapshotShard ( entry . snapshot (), shard . key , localNodeId , snapshotStatus .failure (), masterNode );
316319 break ;
317320 default :
318321 throw new IllegalStateException ("Unknown snapshot shard stage " + snapshotStatus .stage ());
@@ -341,34 +344,47 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
341344 if (newSnapshots .isEmpty () == false ) {
342345 Executor executor = threadPool .executor (ThreadPool .Names .SNAPSHOT );
343346 for (final Map .Entry <Snapshot , Map <ShardId , IndexShardSnapshotStatus >> entry : newSnapshots .entrySet ()) {
344- Map <String , IndexId > indicesMap = snapshotIndices .get (entry .getKey ());
347+ final Snapshot snapshot = entry .getKey ();
348+ final Map <String , IndexId > indicesMap = snapshotIndices .get (snapshot );
345349 assert indicesMap != null ;
350+
346351 for (final Map .Entry <ShardId , IndexShardSnapshotStatus > shardEntry : entry .getValue ().entrySet ()) {
347352 final ShardId shardId = shardEntry .getKey ();
348- try {
349- final IndexShard indexShard = indicesService .indexServiceSafe (shardId .getIndex ()).getShardOrNull (shardId .id ());
350- final IndexId indexId = indicesMap .get (shardId .getIndexName ());
351- assert indexId != null ;
352- executor .execute (new AbstractRunnable () {
353- @ Override
354- public void doRun () {
355- snapshot (indexShard , entry .getKey (), indexId , shardEntry .getValue ());
356- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
357- new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
358- }
353+ final IndexShard indexShard = indicesService .indexServiceSafe (shardId .getIndex ()).getShardOrNull (shardId .id ());
354+ final IndexId indexId = indicesMap .get (shardId .getIndexName ());
355+ assert indexId != null ;
356+ executor .execute (new AbstractRunnable () {
359357
360- @ Override
361- public void onFailure (Exception e ) {
362- logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to create snapshot" , shardId , entry .getKey ()), e );
363- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
364- new ShardSnapshotStatus (localNodeId , State .FAILED , ExceptionsHelper .detailedMessage (e )), masterNode );
365- }
358+ final SetOnce <Exception > failure = new SetOnce <>();
366359
367- });
368- } catch (Exception e ) {
369- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
370- new ShardSnapshotStatus (localNodeId , State .FAILED , ExceptionsHelper .detailedMessage (e )), masterNode );
371- }
360+ @ Override
361+ public void doRun () {
362+ snapshot (indexShard , snapshot , indexId , shardEntry .getValue ());
363+ }
364+
365+ @ Override
366+ public void onFailure (Exception e ) {
367+ logger .warn ((Supplier <?>) () ->
368+ new ParameterizedMessage ("[{}][{}] failed to snapshot shard" , shardId , snapshot ), e );
369+ failure .set (e );
370+ }
371+
372+ @ Override
373+ public void onRejection (Exception e ) {
374+ failure .set (e );
375+ }
376+
377+ @ Override
378+ public void onAfter () {
379+ final Exception exception = failure .get ();
380+ if (exception != null ) {
381+ final String failure = ExceptionsHelper .detailedMessage (exception );
382+ notifyFailedSnapshotShard (snapshot , shardId , localNodeId , failure , masterNode );
383+ } else {
384+ notifySuccessfulSnapshotShard (snapshot , shardId , localNodeId , masterNode );
385+ }
386+ }
387+ });
372388 }
373389 }
374390 }
@@ -381,34 +397,36 @@ public void onFailure(Exception e) {
381397 * @param snapshotStatus snapshot status
382398 */
383399 private void snapshot (final IndexShard indexShard , final Snapshot snapshot , final IndexId indexId , final IndexShardSnapshotStatus snapshotStatus ) {
384- Repository repository = snapshotsService .getRepositoriesService ().repository (snapshot .getRepository ());
385- ShardId shardId = indexShard .shardId ();
386- if (!indexShard .routingEntry ().primary ()) {
400+ final ShardId shardId = indexShard .shardId ();
401+ if (indexShard .routingEntry ().primary () == false ) {
387402 throw new IndexShardSnapshotFailedException (shardId , "snapshot should be performed only on primary" );
388403 }
389404 if (indexShard .routingEntry ().relocating ()) {
390405 // do not snapshot when in the process of relocation of primaries so we won't get conflicts
391406 throw new IndexShardSnapshotFailedException (shardId , "cannot snapshot while relocating" );
392407 }
393- if (indexShard .state () == IndexShardState .CREATED || indexShard .state () == IndexShardState .RECOVERING ) {
408+
409+ final IndexShardState indexShardState = indexShard .state ();
410+ if (indexShardState == IndexShardState .CREATED || indexShardState == IndexShardState .RECOVERING ) {
394411 // shard has just been created, or still recovering
395412 throw new IndexShardSnapshotFailedException (shardId , "shard didn't fully recover yet" );
396413 }
397414
415+ final Repository repository = snapshotsService .getRepositoriesService ().repository (snapshot .getRepository ());
398416 try {
399417 // we flush first to make sure we get the latest writes snapshotted
400418 try (Engine .IndexCommitRef snapshotRef = indexShard .acquireIndexCommit (true )) {
401419 repository .snapshotShard (indexShard , snapshot .getSnapshotId (), indexId , snapshotRef .getIndexCommit (), snapshotStatus );
402420 if (logger .isDebugEnabled ()) {
403- StringBuilder sb = new StringBuilder ();
404- sb .append (" index : version [" ).append (snapshotStatus .indexVersion ()).append ("], number_of_files [" ).append (snapshotStatus .numberOfFiles ()).append ("] with total_size [" ).append (new ByteSizeValue (snapshotStatus .totalSize ())).append ("]\n " );
421+ StringBuilder details = new StringBuilder ();
422+ details .append (" index : version [" ).append (snapshotStatus .indexVersion ());
423+ details .append ("], number_of_files [" ).append (snapshotStatus .numberOfFiles ());
424+ details .append ("] with total_size [" ).append (new ByteSizeValue (snapshotStatus .totalSize ())).append ("]\n " );
405425 logger .debug ("snapshot ({}) completed to {}, took [{}]\n {}" , snapshot , repository ,
406- TimeValue .timeValueMillis (snapshotStatus .time ()), sb );
426+ TimeValue .timeValueMillis (snapshotStatus .time ()), details );
407427 }
408428 }
409- } catch (SnapshotFailedEngineException e ) {
410- throw e ;
411- } catch (IndexShardSnapshotFailedException e ) {
429+ } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e ) {
412430 throw e ;
413431 } catch (Exception e ) {
414432 throw new IndexShardSnapshotFailedException (shardId , "Failed to snapshot" , e );
@@ -423,6 +441,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
423441 if (snapshotsInProgress == null ) {
424442 return ;
425443 }
444+
426445 final String localNodeId = event .state ().nodes ().getLocalNodeId ();
427446 final DiscoveryNode masterNode = event .state ().nodes ().getMasterNode ();
428447 for (SnapshotsInProgress .Entry snapshot : snapshotsInProgress .entries ()) {
@@ -438,15 +457,16 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
438457 // Master knows about the shard and thinks it has not completed
439458 if (localShardStatus .stage () == Stage .DONE ) {
440459 // but we think the shard is done - we need to make new master know that the shard is done
441- logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master" , snapshot .snapshot (), shardId );
442- updateIndexShardSnapshotStatus (snapshot .snapshot (), shardId ,
443- new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
460+ logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " +
461+ "updating status on the master" , snapshot .snapshot (), shardId );
462+ notifySuccessfulSnapshotShard (snapshot .snapshot (), shardId , localNodeId , masterNode );
463+
444464 } else if (localShard .getValue ().stage () == Stage .FAILURE ) {
445465 // but we think the shard failed - we need to make new master know that the shard failed
446- logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master" , snapshot . snapshot (), shardId );
447- updateIndexShardSnapshotStatus ( snapshot .snapshot (), shardId ,
448- new ShardSnapshotStatus ( localNodeId , State . FAILED , localShardStatus .failure ()), masterNode );
449-
466+ logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " +
467+ "updating status on master" , snapshot .snapshot (), shardId );
468+ final String failure = localShardStatus .failure ();
469+ notifyFailedSnapshotShard ( snapshot . snapshot (), shardId , localNodeId , failure , masterNode );
450470 }
451471 }
452472 }
@@ -466,7 +486,6 @@ private SnapshotShards(Map<ShardId, IndexShardSnapshotStatus> shards) {
466486 }
467487 }
468488
469-
470489 /**
471490 * Internal request that is used to send changes in snapshot status to master
472491 */
@@ -526,17 +545,35 @@ public String toString() {
526545 }
527546 }
528547
529- /**
530- * Updates the shard status
531- */
532- public void updateIndexShardSnapshotStatus (Snapshot snapshot , ShardId shardId , ShardSnapshotStatus status , DiscoveryNode master ) {
548+ /** Notify the master node that the given shard has been successfully snapshotted **/
549+ void notifySuccessfulSnapshotShard (final Snapshot snapshot ,
550+ final ShardId shardId ,
551+ final String localNodeId ,
552+ final DiscoveryNode masterNode ) {
553+ sendSnapshotShardUpdate (snapshot , shardId , new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
554+ }
555+
556+ /** Notify the master node that the given shard failed to be snapshotted **/
557+ void notifyFailedSnapshotShard (final Snapshot snapshot ,
558+ final ShardId shardId ,
559+ final String localNodeId ,
560+ final String failure ,
561+ final DiscoveryNode masterNode ) {
562+ sendSnapshotShardUpdate (snapshot , shardId , new ShardSnapshotStatus (localNodeId , State .FAILED , failure ), masterNode );
563+ }
564+
565+ /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
566+ void sendSnapshotShardUpdate (final Snapshot snapshot ,
567+ final ShardId shardId ,
568+ final ShardSnapshotStatus status ,
569+ final DiscoveryNode masterNode ) {
533570 try {
534- if (master .getVersion ().onOrAfter (Version .V_6_1_0 )) {
571+ if (masterNode .getVersion ().onOrAfter (Version .V_6_1_0 )) {
535572 UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest (snapshot , shardId , status );
536- transportService .sendRequest (transportService .getLocalNode (), UPDATE_SNAPSHOT_STATUS_ACTION_NAME , request , EmptyTransportResponseHandler . INSTANCE_SAME );
573+ transportService .sendRequest (transportService .getLocalNode (), UPDATE_SNAPSHOT_STATUS_ACTION_NAME , request , INSTANCE_SAME );
537574 } else {
538575 UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6 (snapshot , shardId , status );
539- transportService .sendRequest (master , UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 , requestV6 , EmptyTransportResponseHandler . INSTANCE_SAME );
576+ transportService .sendRequest (masterNode , UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 , requestV6 , INSTANCE_SAME );
540577 }
541578 } catch (Exception e ) {
542579 logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to update snapshot state" , snapshot , status ), e );
0 commit comments