88
99import org .apache .logging .log4j .LogManager ;
1010import org .apache .logging .log4j .Logger ;
11- import org .elasticsearch .client .Client ;
1211import org .elasticsearch .cluster .ClusterChangedEvent ;
1312import org .elasticsearch .cluster .ClusterState ;
1413import org .elasticsearch .cluster .ClusterStateListener ;
2322import java .io .Closeable ;
2423import java .time .Clock ;
2524import java .util .Map ;
25+ import java .util .Set ;
26+ import java .util .function .Supplier ;
27+ import java .util .regex .Pattern ;
28+ import java .util .stream .Collectors ;
2629
2730/**
2831 * {@code SnapshotLifecycleService} manages snapshot policy scheduling and triggering of the
3235public class SnapshotLifecycleService implements LocalNodeMasterListener , Closeable , ClusterStateListener {
3336
3437 private static final Logger logger = LogManager .getLogger (SnapshotLifecycleMetadata .class );
38+ private static final String JOB_PATTERN_SUFFIX = "-\\ d+$" ;
3539
3640 private final SchedulerEngine scheduler ;
3741 private final ClusterService clusterService ;
3842 private final SnapshotLifecycleTask snapshotTask ;
3943 private final Map <String , SchedulerEngine .Job > scheduledTasks = ConcurrentCollections .newConcurrentMap ();
4044 private volatile boolean isMaster = false ;
4145
42- public SnapshotLifecycleService (Settings settings , Client client , ClusterService clusterService ,
46+ public SnapshotLifecycleService (Settings settings ,
47+ Supplier <SnapshotLifecycleTask > taskSupplier ,
48+ ClusterService clusterService ,
4349 Clock clock ) {
4450 this .scheduler = new SchedulerEngine (settings , clock );
4551 this .clusterService = clusterService ;
46- this .snapshotTask = new SnapshotLifecycleTask ( client );
52+ this .snapshotTask = taskSupplier . get ( );
4753 clusterService .addLocalNodeMasterListener (this ); // TODO: change this not to use 'this'
4854 clusterService .addListener (this );
4955 }
5056
5157 @ Override
52- public void clusterChanged (ClusterChangedEvent event ) {
58+ public void clusterChanged (final ClusterChangedEvent event ) {
5359 if (this .isMaster ) {
54- // TODO: handle modified policies (currently they are ignored)
55- // TODO: handle deleted policies
56- scheduleSnapshotJobs ( event . state () );
60+ final ClusterState state = event . state ();
61+ scheduleSnapshotJobs ( state );
62+ cleanupDeletedPolicies ( state );
5763 }
5864 }
5965
@@ -71,6 +77,11 @@ public void offMaster() {
7177 cancelSnapshotJobs ();
7278 }
7379
80+ // Only used for testing
81+ SchedulerEngine getScheduler () {
82+ return this .scheduler ;
83+ }
84+
7485 /**
7586 * Schedule all non-scheduled snapshot jobs contained in the cluster state
7687 */
@@ -81,35 +92,85 @@ public void scheduleSnapshotJobs(final ClusterState state) {
8192 }
8293 }
8394
95+ public void cleanupDeletedPolicies (final ClusterState state ) {
96+ SnapshotLifecycleMetadata snapMeta = state .metaData ().custom (SnapshotLifecycleMetadata .TYPE );
97+ if (snapMeta != null ) {
98+ // Retrieve all of the expected policy job ids from the policies in the metadata
99+ final Set <String > policyJobIds = snapMeta .getSnapshotConfigurations ().values ().stream ()
100+ .map (SnapshotLifecycleService ::getJobId )
101+ .collect (Collectors .toSet ());
102+
103+ // Cancel all jobs that are *NOT* in the scheduled tasks map
104+ scheduledTasks .keySet ().stream ()
105+ .filter (jobId -> policyJobIds .contains (jobId ) == false )
106+ .forEach (this ::cancelScheduledSnapshot );
107+ }
108+ }
109+
84110 /**
85- * Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. If the job already
86- * exists it is not interfered with.
111+ * Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. First checks
112+ * to see if any previous versions of the policy were scheduled, and if so, cancels those. If
113+ * the same version of a policy has already been scheduled it does not overwrite the job.
87114 */
88115 public void maybeScheduleSnapshot (final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy ) {
89- final String jobId = snapshotLifecyclePolicy .getPolicy ().getId ();
116+ final String jobId = getJobId (snapshotLifecyclePolicy );
117+ final Pattern existingJobPattern = Pattern .compile (snapshotLifecyclePolicy .getPolicy ().getId () + JOB_PATTERN_SUFFIX );
118+
119+ // Find and cancel any existing jobs for this policy
120+ final boolean existingJobsFoundAndCancelled = scheduledTasks .keySet ().stream ()
121+ // Find all jobs matching the `jobid-\d+` pattern
122+ .filter (jId -> existingJobPattern .matcher (jId ).matches ())
123+ // Filter out a job that has not been changed (matches the id exactly meaning the version is the same)
124+ .filter (jId -> jId .equals (jobId ) == false )
125+ .map (existingJobId -> {
126+ // Cancel existing job so the new one can be scheduled
127+ logger .debug ("removing existing snapshot lifecycle job [{}] as it has been updated" , existingJobId );
128+ scheduledTasks .remove (existingJobId );
129+ boolean existed = scheduler .remove (existingJobId );
130+ assert existed : "expected job for " + existingJobId + " to exist in scheduler" ;
131+ return existed ;
132+ })
133+ .reduce (false , (a , b ) -> a || b );
134+
135+ // Now atomically schedule the new job and add it to the scheduled tasks map. If the jobId
136+ // is identical to an existing job (meaning the version has not changed) then this does
137+ // not reschedule it.
90138 scheduledTasks .computeIfAbsent (jobId , id -> {
91139 final SchedulerEngine .Job job = new SchedulerEngine .Job (jobId ,
92140 new CronSchedule (snapshotLifecyclePolicy .getPolicy ().getSchedule ()));
93- logger .info ("scheduling snapshot lifecycle job [{}]" , jobId );
141+ if (existingJobsFoundAndCancelled ) {
142+ logger .info ("rescheduling updated snapshot lifecycle job [{}]" , jobId );
143+ } else {
144+ logger .info ("scheduling snapshot lifecycle job [{}]" , jobId );
145+ }
94146 scheduler .add (job );
95147 return job ;
96148 });
97149 }
98150
151+ /**
152+ * Generate the job id for a given policy metadata. The job id is {@code <policyid>-<version>}
153+ */
154+ static String getJobId (SnapshotLifecyclePolicyMetadata policyMeta ) {
155+ return policyMeta .getPolicy ().getId () + "-" + policyMeta .getVersion ();
156+ }
157+
99158 /**
100159 * Cancel all scheduled snapshot jobs
101160 */
102161 public void cancelSnapshotJobs () {
162+ logger .trace ("cancelling all snapshot lifecycle jobs" );
103163 scheduler .scheduledJobIds ().forEach (scheduler ::remove );
104164 scheduledTasks .clear ();
105165 }
106166
107167 /**
108- * Cancel the given snapshot lifecycle id
168+ * Cancel the given policy job id (from {@link #getJobId(SnapshotLifecyclePolicyMetadata)}
109169 */
110- public void cancelScheduledSnapshot (final String snapshotLifecycleId ) {
111- scheduledTasks .remove (snapshotLifecycleId );
112- scheduler .remove (snapshotLifecycleId );
170+ public void cancelScheduledSnapshot (final String lifecycleJobId ) {
171+ logger .debug ("cancelling snapshot lifecycle job [{}] as it no longer exists" , lifecycleJobId );
172+ scheduledTasks .remove (lifecycleJobId );
173+ scheduler .remove (lifecycleJobId );
113174 }
114175
115176 @ Override
0 commit comments