88
99import org .apache .logging .log4j .LogManager ;
1010import org .apache .logging .log4j .Logger ;
11+ import org .apache .logging .log4j .message .ParameterizedMessage ;
1112import org .elasticsearch .action .ActionListener ;
1213import org .elasticsearch .action .support .ActionFilters ;
1314import org .elasticsearch .action .support .master .TransportMasterNodeAction ;
15+ import org .elasticsearch .client .Client ;
1416import org .elasticsearch .cluster .AckedClusterStateUpdateTask ;
1517import org .elasticsearch .cluster .ClusterState ;
1618import org .elasticsearch .cluster .block .ClusterBlockException ;
1719import org .elasticsearch .cluster .block .ClusterBlockLevel ;
20+ import org .elasticsearch .cluster .metadata .IndexMetaData ;
1821import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
1922import org .elasticsearch .cluster .metadata .MetaData ;
2023import org .elasticsearch .cluster .service .ClusterService ;
24+ import org .elasticsearch .common .Nullable ;
25+ import org .elasticsearch .common .Strings ;
2126import org .elasticsearch .common .inject .Inject ;
2227import org .elasticsearch .common .io .stream .StreamInput ;
28+ import org .elasticsearch .common .xcontent .DeprecationHandler ;
29+ import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
30+ import org .elasticsearch .common .xcontent .XContentParser ;
31+ import org .elasticsearch .common .xcontent .json .JsonXContent ;
2332import org .elasticsearch .threadpool .ThreadPool ;
2433import org .elasticsearch .transport .TransportService ;
2534import org .elasticsearch .xpack .core .ClientHelper ;
35+ import org .elasticsearch .xpack .core .ilm .ErrorStep ;
2636import org .elasticsearch .xpack .core .ilm .IndexLifecycleMetadata ;
37+ import org .elasticsearch .xpack .core .ilm .LifecycleExecutionState ;
2738import org .elasticsearch .xpack .core .ilm .LifecyclePolicy ;
2839import org .elasticsearch .xpack .core .ilm .LifecyclePolicyMetadata ;
40+ import org .elasticsearch .xpack .core .ilm .LifecycleSettings ;
41+ import org .elasticsearch .xpack .core .ilm .PhaseExecutionInfo ;
42+ import org .elasticsearch .xpack .core .ilm .Step ;
2943import org .elasticsearch .xpack .core .ilm .action .PutLifecycleAction ;
3044import org .elasticsearch .xpack .core .ilm .action .PutLifecycleAction .Request ;
3145import org .elasticsearch .xpack .core .ilm .action .PutLifecycleAction .Response ;
46+ import org .elasticsearch .xpack .ilm .IndexLifecycleTransition ;
3247
3348import java .io .IOException ;
3449import java .time .Instant ;
50+ import java .util .LinkedHashSet ;
51+ import java .util .List ;
3552import java .util .Map ;
53+ import java .util .Set ;
3654import java .util .SortedMap ;
55+ import java .util .Spliterators ;
3756import java .util .TreeMap ;
3857import java .util .stream .Collectors ;
58+ import java .util .stream .StreamSupport ;
3959
4060/**
4161 * This class is responsible for bootstrapping {@link IndexLifecycleMetadata} into the cluster-state, as well
4464public class TransportPutLifecycleAction extends TransportMasterNodeAction <Request , Response > {
4565
4666 private static final Logger logger = LogManager .getLogger (TransportPutLifecycleAction .class );
67+ private final NamedXContentRegistry xContentRegistry ;
68+ private final Client client ;
4769
4870 @ Inject
4971 public TransportPutLifecycleAction (TransportService transportService , ClusterService clusterService , ThreadPool threadPool ,
50- ActionFilters actionFilters , IndexNameExpressionResolver indexNameExpressionResolver ) {
72+ ActionFilters actionFilters , IndexNameExpressionResolver indexNameExpressionResolver ,
73+ NamedXContentRegistry namedXContentRegistry , Client client ) {
5174 super (PutLifecycleAction .NAME , transportService , clusterService , threadPool , actionFilters , Request ::new ,
5275 indexNameExpressionResolver );
76+ this .xContentRegistry = namedXContentRegistry ;
77+ this .client = client ;
5378 }
5479
5580 @ Override
@@ -81,7 +106,7 @@ protected Response newResponse(boolean acknowledged) {
81106
82107 @ Override
83108 public ClusterState execute (ClusterState currentState ) throws Exception {
84- ClusterState .Builder newState = ClusterState .builder (currentState );
109+ ClusterState .Builder stateBuilder = ClusterState .builder (currentState );
85110 IndexLifecycleMetadata currentMetadata = currentState .metaData ().custom (IndexLifecycleMetadata .TYPE );
86111 if (currentMetadata == null ) { // first time using index-lifecycle feature, bootstrap metadata
87112 currentMetadata = IndexLifecycleMetadata .EMPTY ;
@@ -99,13 +124,195 @@ public ClusterState execute(ClusterState currentState) throws Exception {
99124 logger .info ("updating index lifecycle policy [{}]" , request .getPolicy ().getName ());
100125 }
101126 IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata (newPolicies , currentMetadata .getOperationMode ());
102- newState .metaData (MetaData .builder (currentState .getMetaData ())
127+ stateBuilder .metaData (MetaData .builder (currentState .getMetaData ())
103128 .putCustom (IndexLifecycleMetadata .TYPE , newMetadata ).build ());
104- return newState .build ();
129+ ClusterState nonRefreshedState = stateBuilder .build ();
130+ if (oldPolicy == null ) {
131+ return nonRefreshedState ;
132+ } else {
133+ try {
134+ return updateIndicesForPolicy (nonRefreshedState , xContentRegistry , client ,
135+ oldPolicy .getPolicy (), lifecyclePolicyMetadata );
136+ } catch (Exception e ) {
137+ logger .warn (new ParameterizedMessage ("unable to refresh indices phase JSON for updated policy [{}]" ,
138+ oldPolicy .getName ()), e );
139+ // Revert to the non-refreshed state
140+ return nonRefreshedState ;
141+ }
142+ }
105143 }
106144 });
107145 }
108146
147+ /**
148+ * Ensure that we have the minimum amount of metadata necessary to check for cache phase
149+ * refresh. This includes:
150+ * - An execution state
151+ * - Existing phase definition JSON
152+ * - A current step key
153+ * - A current phase in the step key
154+ * - Not currently in the ERROR step
155+ */
156+ static boolean eligibleToCheckForRefresh (final IndexMetaData metaData ) {
157+ LifecycleExecutionState executionState = LifecycleExecutionState .fromIndexMetadata (metaData );
158+ if (executionState == null || executionState .getPhaseDefinition () == null ) {
159+ return false ;
160+ }
161+
162+ Step .StepKey currentStepKey = LifecycleExecutionState .getCurrentStepKey (executionState );
163+ if (currentStepKey == null || currentStepKey .getPhase () == null ) {
164+ return false ;
165+ }
166+
167+ return ErrorStep .NAME .equals (currentStepKey .getName ()) == false ;
168+ }
169+
170+ /**
171+ * Parse the {@code phaseDef} phase definition to get the stepkeys for the given phase.
172+ * If there is an error parsing or if the phase definition is missing the required
173+ * information, returns null.
174+ */
175+ @ Nullable
176+ static Set <Step .StepKey > readStepKeys (final NamedXContentRegistry xContentRegistry , final Client client ,
177+ final String phaseDef , final String currentPhase ) {
178+ final PhaseExecutionInfo phaseExecutionInfo ;
179+ try (XContentParser parser = JsonXContent .jsonXContent .createParser (xContentRegistry ,
180+ DeprecationHandler .THROW_UNSUPPORTED_OPERATION , phaseDef )) {
181+ phaseExecutionInfo = PhaseExecutionInfo .parse (parser , currentPhase );
182+ } catch (Exception e ) {
183+ logger .trace (new ParameterizedMessage ("exception reading step keys checking for refreshability, phase definition: {}" ,
184+ phaseDef ), e );
185+ return null ;
186+ }
187+
188+ if (phaseExecutionInfo == null || phaseExecutionInfo .getPhase () == null ) {
189+ return null ;
190+ }
191+
192+ return phaseExecutionInfo .getPhase ().getActions ().values ().stream ()
193+ .flatMap (a -> a .toSteps (client , phaseExecutionInfo .getPhase ().getName (), null ).stream ())
194+ .map (Step ::getKey )
195+ .collect (Collectors .toCollection (LinkedHashSet ::new ));
196+ }
197+
198+ /**
199+ * Returns 'true' if the index's cached phase JSON can be safely reread, 'false' otherwise.
200+ */
201+ static boolean isIndexPhaseDefinitionUpdatable (final NamedXContentRegistry xContentRegistry , final Client client ,
202+ final IndexMetaData metaData , final LifecyclePolicy newPolicy ) {
203+ final String index = metaData .getIndex ().getName ();
204+ if (eligibleToCheckForRefresh (metaData ) == false ) {
205+ logger .debug ("[{}] does not contain enough information to check for eligibility of refreshing phase" , index );
206+ return false ;
207+ }
208+ final String policyId = newPolicy .getName ();
209+
210+ final LifecycleExecutionState executionState = LifecycleExecutionState .fromIndexMetadata (metaData );
211+ final Step .StepKey currentStepKey = LifecycleExecutionState .getCurrentStepKey (executionState );
212+ final String currentPhase = currentStepKey .getPhase ();
213+
214+ final Set <Step .StepKey > newStepKeys = newPolicy .toSteps (client ).stream ()
215+ .map (Step ::getKey )
216+ .collect (Collectors .toCollection (LinkedHashSet ::new ));
217+
218+ if (newStepKeys .contains (currentStepKey ) == false ) {
219+ // The index is on a step that doesn't exist in the new policy, we
220+ // can't safely re-read the JSON
221+ logger .debug ("[{}] updated policy [{}] does not contain the current step key [{}], so the policy phase will not be refreshed" ,
222+ index , policyId , currentStepKey );
223+ return false ;
224+ }
225+
226+ final String phaseDef = executionState .getPhaseDefinition ();
227+ final Set <Step .StepKey > oldStepKeys = readStepKeys (xContentRegistry , client , phaseDef , currentPhase );
228+ if (oldStepKeys == null ) {
229+ logger .debug ("[{}] unable to parse phase definition for cached policy [{}], policy phase will not be refreshed" ,
230+ index , policyId );
231+ return false ;
232+ }
233+
234+ final Set <Step .StepKey > oldPhaseStepKeys = oldStepKeys .stream ()
235+ .filter (sk -> currentPhase .equals (sk .getPhase ()))
236+ .collect (Collectors .toCollection (LinkedHashSet ::new ));
237+
238+ final PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo (policyId , newPolicy .getPhases ().get (currentPhase ), 1L , 1L );
239+ final String peiJson = Strings .toString (phaseExecutionInfo );
240+
241+ final Set <Step .StepKey > newPhaseStepKeys = readStepKeys (xContentRegistry , client , peiJson , currentPhase );
242+ if (newPhaseStepKeys == null ) {
243+ logger .debug (new ParameterizedMessage ("[{}] unable to parse phase definition for policy [{}] " +
244+ "to determine if it could be refreshed" , index , policyId ));
245+ return false ;
246+ }
247+
248+ if (newPhaseStepKeys .equals (oldPhaseStepKeys )) {
249+ // The new and old phase have the same stepkeys for this current phase, so we can
250+ // refresh the definition because we know it won't change the execution flow.
251+ logger .debug ("[{}] updated policy [{}] contains the same phase step keys and can be refreshed" , index , policyId );
252+ return true ;
253+ } else {
254+ logger .debug ("[{}] updated policy [{}] has different phase step keys and will NOT refresh phase " +
255+ "definition as it differs too greatly. old: {}, new: {}" ,
256+ index , policyId , oldPhaseStepKeys , newPhaseStepKeys );
257+ return false ;
258+ }
259+ }
260+
261+ /**
262+ * Rereads the phase JSON for the given index, returning a new cluster state.
263+ */
264+ static ClusterState refreshPhaseDefinition (final ClusterState state , final String index , final LifecyclePolicyMetadata updatedPolicy ) {
265+ final IndexMetaData idxMeta = state .metaData ().index (index );
266+ assert eligibleToCheckForRefresh (idxMeta ) : "index " + index + " is missing crucial information needed to refresh phase definition" ;
267+
268+ logger .trace ("[{}] updating cached phase definition for policy [{}]" , index , updatedPolicy .getName ());
269+ LifecycleExecutionState currentExState = LifecycleExecutionState .fromIndexMetadata (idxMeta );
270+
271+ String currentPhase = currentExState .getPhase ();
272+ PhaseExecutionInfo pei = new PhaseExecutionInfo (updatedPolicy .getName (),
273+ updatedPolicy .getPolicy ().getPhases ().get (currentPhase ), updatedPolicy .getVersion (), updatedPolicy .getModifiedDate ());
274+
275+ LifecycleExecutionState newExState = LifecycleExecutionState .builder (currentExState )
276+ .setPhaseDefinition (Strings .toString (pei , false , false ))
277+ .build ();
278+
279+ return IndexLifecycleTransition .newClusterStateWithLifecycleState (idxMeta .getIndex (), state , newExState ).build ();
280+ }
281+
282+ /**
283+ * For the given new policy, returns a new cluster with all updateable indices' phase JSON refreshed.
284+ */
285+ static ClusterState updateIndicesForPolicy (final ClusterState state , final NamedXContentRegistry xContentRegistry , final Client client ,
286+ final LifecyclePolicy oldPolicy , final LifecyclePolicyMetadata newPolicy ) {
287+ assert oldPolicy .getName ().equals (newPolicy .getName ()) : "expected both policies to have the same id but they were: [" +
288+ oldPolicy .getName () + "] vs. [" + newPolicy .getName () + "]" ;
289+
290+ // No need to update anything if the policies are identical in contents
291+ if (oldPolicy .equals (newPolicy .getPolicy ())) {
292+ logger .debug ("policy [{}] is unchanged and no phase definition refresh is needed" , oldPolicy .getName ());
293+ return state ;
294+ }
295+
296+ final List <String > indicesThatCanBeUpdated =
297+ StreamSupport .stream (Spliterators .spliteratorUnknownSize (state .metaData ().indices ().valuesIt (), 0 ), false )
298+ .filter (meta -> newPolicy .getName ().equals (LifecycleSettings .LIFECYCLE_NAME_SETTING .get (meta .getSettings ())))
299+ .filter (meta -> isIndexPhaseDefinitionUpdatable (xContentRegistry , client , meta , newPolicy .getPolicy ()))
300+ .map (meta -> meta .getIndex ().getName ())
301+ .collect (Collectors .toList ());
302+
303+ ClusterState updatedState = state ;
304+ for (String index : indicesThatCanBeUpdated ) {
305+ try {
306+ updatedState = refreshPhaseDefinition (updatedState , index , newPolicy );
307+ } catch (Exception e ) {
308+ logger .warn (new ParameterizedMessage ("[{}] unable to refresh phase definition for updated policy [{}]" ,
309+ index , newPolicy .getName ()), e );
310+ }
311+ }
312+
313+ return updatedState ;
314+ }
315+
109316 @ Override
110317 protected ClusterBlockException checkBlock (Request request , ClusterState state ) {
111318 return state .blocks ().globalBlockedException (ClusterBlockLevel .METADATA_WRITE );
0 commit comments