1818import org .elasticsearch .cluster .ClusterState ;
1919import org .elasticsearch .cluster .service .ClusterService ;
2020import org .elasticsearch .common .Strings ;
21+ import org .elasticsearch .common .unit .TimeValue ;
22+ import org .elasticsearch .snapshots .SnapshotId ;
2123import org .elasticsearch .snapshots .SnapshotInfo ;
24+ import org .elasticsearch .snapshots .SnapshotState ;
2225import org .elasticsearch .xpack .core .ClientHelper ;
26+ import org .elasticsearch .xpack .core .ilm .LifecycleSettings ;
2327import org .elasticsearch .xpack .core .scheduler .SchedulerEngine ;
2428import org .elasticsearch .xpack .core .slm .SnapshotLifecycleMetadata ;
2529import org .elasticsearch .xpack .core .slm .SnapshotLifecyclePolicy ;
3539import java .util .concurrent .CountDownLatch ;
3640import java .util .concurrent .atomic .AtomicBoolean ;
3741import java .util .function .Consumer ;
42+ import java .util .function .LongSupplier ;
3843import java .util .stream .Collectors ;
3944
4045/**
@@ -50,10 +55,12 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
5055
5156 private final Client client ;
5257 private final ClusterService clusterService ;
58+ private final LongSupplier nowNanoSupplier ;
5359
54- public SnapshotRetentionTask (Client client , ClusterService clusterService ) {
60+ public SnapshotRetentionTask (Client client , ClusterService clusterService , LongSupplier nowNanoSupplier ) {
5561 this .client = new OriginSettingClient (client , ClientHelper .INDEX_LIFECYCLE_ORIGIN );
5662 this .clusterService = clusterService ;
63+ this .nowNanoSupplier = nowNanoSupplier ;
5764 }
5865
5966 @ Override
@@ -64,6 +71,7 @@ public void triggered(SchedulerEngine.Event event) {
6471 try {
6572 logger .info ("starting SLM retention snapshot cleanup task" );
6673 final ClusterState state = clusterService .state ();
74+ final TimeValue maxDeletionTime = LifecycleSettings .SLM_RETENTION_DURATION_SETTING .get (state .metaData ().settings ());
6775
6876 // Find all SLM policies that have retention enabled
6977 final Map <String , SnapshotLifecyclePolicy > policiesWithRetention = getAllPoliciesWithRetentionEnabled (state );
@@ -74,7 +82,7 @@ public void triggered(SchedulerEngine.Event event) {
7482 .map (SnapshotLifecyclePolicy ::getRepository )
7583 .collect (Collectors .toSet ());
7684
77- getAllSnapshots (repositioriesToFetch , new ActionListener <>() {
85+ getAllSuccessfulSnapshots (repositioriesToFetch , new ActionListener <>() {
7886 @ Override
7987 public void onResponse (Map <String , List <SnapshotInfo >> allSnapshots ) {
8088 // Find all the snapshots that are past their retention date
@@ -85,7 +93,7 @@ public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
8593 .collect (Collectors .toList ())));
8694
8795 // Finally, delete the snapshots that need to be deleted
88- deleteSnapshots (snapshotsToBeDeleted );
96+ deleteSnapshots (snapshotsToBeDeleted , maxDeletionTime );
8997 }
9098
9199 @ Override
@@ -160,8 +168,8 @@ static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map<String, Li
160168 return eligible ;
161169 }
162170
163- void getAllSnapshots (Collection <String > repositories , ActionListener <Map <String , List <SnapshotInfo >>> listener ,
164- Consumer <Exception > errorHandler ) {
171+ void getAllSuccessfulSnapshots (Collection <String > repositories , ActionListener <Map <String , List <SnapshotInfo >>> listener ,
172+ Consumer <Exception > errorHandler ) {
165173 if (repositories .isEmpty ()) {
166174 // Skip retrieving anything if there are no repositories to fetch
167175 listener .onResponse (Collections .emptyMap ());
@@ -175,7 +183,11 @@ void getAllSnapshots(Collection<String> repositories, ActionListener<Map<String,
175183 public void onResponse (final GetSnapshotsResponse resp ) {
176184 Map <String , List <SnapshotInfo >> snapshots = new HashMap <>();
177185 repositories .forEach (repo -> {
178- snapshots .put (repo , resp .getSnapshots (repo ));
186+ snapshots .put (repo ,
187+ // Only return snapshots in the SUCCESS state
188+ resp .getSnapshots (repo ).stream ()
189+ .filter (info -> info .state () == SnapshotState .SUCCESS )
190+ .collect (Collectors .toList ()));
179191 });
180192 listener .onResponse (snapshots );
181193 }
@@ -188,42 +200,64 @@ public void onFailure(Exception e) {
188200 });
189201 }
190202
191- void deleteSnapshots (Map <String , List <SnapshotInfo >> snapshotsToDelete ) {
192- // TODO: make this more resilient and possibly only delete for a certain amount of time
203+ void deleteSnapshots (Map <String , List <SnapshotInfo >> snapshotsToDelete , TimeValue maximumTime ) {
193204 int count = snapshotsToDelete .values ().stream ().mapToInt (List ::size ).sum ();
194205 if (count == 0 ) {
195206 logger .debug ("no snapshots are eligible for deletion" );
196207 return ;
197208 }
209+
198210 logger .info ("starting snapshot retention deletion for [{}] snapshots" , count );
199- snapshotsToDelete .forEach ((repo , snapshots ) -> {
200- snapshots .forEach (info -> {
201- logger .info ("[{}] snapshot retention deleting snapshot [{}]" , repo , info .snapshotId ());
202- CountDownLatch latch = new CountDownLatch (1 );
203- client .admin ().cluster ().prepareDeleteSnapshot (repo , info .snapshotId ().getName ())
204- .execute (new LatchedActionListener <>(new ActionListener <>() {
205- @ Override
206- public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
207- if (acknowledgedResponse .isAcknowledged ()) {
208- logger .debug ("[{}] snapshot [{}] deleted successfully" , repo , info .snapshotId ());
209- }
210- }
211-
212- @ Override
213- public void onFailure (Exception e ) {
214- logger .warn (new ParameterizedMessage ("[{}] failed to delete snapshot [{}] for retention" ,
215- repo , info .snapshotId ()), e );
216- }
217- }, latch ));
218- try {
219- // Deletes cannot occur simultaneously, so wait for this
220- // deletion to complete before attempting the next one
221- latch .await ();
222- } catch (InterruptedException e ) {
223- logger .error (new ParameterizedMessage ("[{}] deletion of snapshot [{}] interrupted" ,
224- repo , info .snapshotId ()), e );
211+ long startTime = nowNanoSupplier .getAsLong ();
212+ int deleted = 0 ;
213+ for (Map .Entry <String , List <SnapshotInfo >> entry : snapshotsToDelete .entrySet ()) {
214+ String repo = entry .getKey ();
215+ List <SnapshotInfo > snapshots = entry .getValue ();
216+ for (SnapshotInfo info : snapshots ) {
217+ deleteSnapshot (repo , info .snapshotId ());
218+ deleted ++;
219+ // Check whether we have exceeded the maximum time allowed to spend deleting
220+ // snapshots, if we have, short-circuit the rest of the deletions
221+ TimeValue elapsedDeletionTime = TimeValue .timeValueNanos (nowNanoSupplier .getAsLong () - startTime );
222+ logger .trace ("elapsed time for deletion of [{}] snapshot: {}" , info .snapshotId (), elapsedDeletionTime );
223+ if (elapsedDeletionTime .compareTo (maximumTime ) > 0 ) {
224+ logger .info ("maximum snapshot retention deletion time reached, time spent: [{}]," +
225+ " maximum allowed time: [{}], deleted {} out of {} snapshots scheduled for deletion" ,
226+ elapsedDeletionTime , maximumTime , deleted , count );
227+ return ;
225228 }
226- });
227- });
229+ }
230+ }
231+ }
232+
233+ /**
234+ * Delete the given snapshot from the repository in blocking manner
235+ */
236+ void deleteSnapshot (String repo , SnapshotId snapshot ) {
237+ logger .info ("[{}] snapshot retention deleting snapshot [{}]" , repo , snapshot );
238+ CountDownLatch latch = new CountDownLatch (1 );
239+ client .admin ().cluster ().prepareDeleteSnapshot (repo , snapshot .getName ())
240+ .execute (new LatchedActionListener <>(new ActionListener <>() {
241+ @ Override
242+ public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
243+ if (acknowledgedResponse .isAcknowledged ()) {
244+ logger .debug ("[{}] snapshot [{}] deleted successfully" , repo , snapshot );
245+ }
246+ }
247+
248+ @ Override
249+ public void onFailure (Exception e ) {
250+ logger .warn (new ParameterizedMessage ("[{}] failed to delete snapshot [{}] for retention" ,
251+ repo , snapshot ), e );
252+ }
253+ }, latch ));
254+ try {
255+ // Deletes cannot occur simultaneously, so wait for this
256+ // deletion to complete before attempting the next one
257+ latch .await ();
258+ } catch (InterruptedException e ) {
259+ logger .error (new ParameterizedMessage ("[{}] deletion of snapshot [{}] interrupted" ,
260+ repo , snapshot ), e );
261+ }
228262 }
229263}
0 commit comments