2020package org .elasticsearch .action .admin .indices .rollover ;
2121
2222import org .elasticsearch .action .ActionListener ;
23- import org .elasticsearch .action .admin .indices .alias .IndicesAliasesClusterStateUpdateRequest ;
2423import org .elasticsearch .action .admin .indices .create .CreateIndexClusterStateUpdateRequest ;
2524import org .elasticsearch .action .admin .indices .create .CreateIndexRequest ;
2625import org .elasticsearch .action .admin .indices .stats .IndicesStatsAction ;
@@ -150,56 +149,44 @@ public void onResponse(IndicesStatsResponse statsResponse) {
150149 new RolloverResponse (sourceIndexName , rolloverIndexName , conditionResults , true , false , false , false ));
151150 return ;
152151 }
153- List <Condition <?>> metConditions = rolloverRequest .getConditions ().values ().stream ()
152+ List <Condition <?>> metConditions = rolloverRequest .getConditions ().values ().stream ()
154153 .filter (condition -> conditionResults .get (condition .toString ())).collect (Collectors .toList ());
155154 if (conditionResults .size () == 0 || metConditions .size () > 0 ) {
156- CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest (unresolvedName , rolloverIndexName ,
157- rolloverRequest );
158- createIndexService .createIndex (updateRequest , ActionListener .wrap (createIndexClusterStateUpdateResponse -> {
159- final IndicesAliasesClusterStateUpdateRequest aliasesUpdateRequest ;
160- if (explicitWriteIndex ) {
161- aliasesUpdateRequest = prepareRolloverAliasesWriteIndexUpdateRequest (sourceIndexName ,
162- rolloverIndexName , rolloverRequest );
163- } else {
164- aliasesUpdateRequest = prepareRolloverAliasesUpdateRequest (sourceIndexName ,
165- rolloverIndexName , rolloverRequest );
155+ CreateIndexClusterStateUpdateRequest createIndexRequest = prepareCreateIndexRequest (unresolvedName ,
156+ rolloverIndexName , rolloverRequest );
157+ clusterService .submitStateUpdateTask ("rollover_index source [" + sourceIndexName + "] to target ["
158+ + rolloverIndexName + "]" , new ClusterStateUpdateTask () {
159+ @ Override
160+ public ClusterState execute (ClusterState currentState ) throws Exception {
161+ ClusterState newState = createIndexService .applyCreateIndexRequest (currentState , createIndexRequest );
162+ newState = indexAliasesService .applyAliasActions (newState ,
163+ rolloverAliasToNewIndex (sourceIndexName , rolloverIndexName , rolloverRequest , explicitWriteIndex ));
164+ RolloverInfo rolloverInfo = new RolloverInfo (rolloverRequest .getAlias (), metConditions ,
165+ threadPool .absoluteTimeInMillis ());
166+ return ClusterState .builder (newState )
167+ .metaData (MetaData .builder (newState .metaData ())
168+ .put (IndexMetaData .builder (newState .metaData ().index (sourceIndexName ))
169+ .putRolloverInfo (rolloverInfo ))).build ();
166170 }
167- indexAliasesService .indicesAliases (aliasesUpdateRequest ,
168- ActionListener .wrap (aliasClusterStateUpdateResponse -> {
169- if (aliasClusterStateUpdateResponse .isAcknowledged ()) {
170- clusterService .submitStateUpdateTask ("update_rollover_info" , new ClusterStateUpdateTask () {
171- @ Override
172- public ClusterState execute (ClusterState currentState ) {
173- RolloverInfo rolloverInfo = new RolloverInfo (rolloverRequest .getAlias (), metConditions ,
174- threadPool .absoluteTimeInMillis ());
175- return ClusterState .builder (currentState )
176- .metaData (MetaData .builder (currentState .metaData ())
177- .put (IndexMetaData .builder (currentState .metaData ().index (sourceIndexName ))
178- .putRolloverInfo (rolloverInfo ))).build ();
179- }
180171
181- @ Override
182- public void onFailure (String source , Exception e ) {
183- listener .onFailure (e );
184- }
172+ @ Override
173+ public void onFailure (String source , Exception e ) {
174+ listener .onFailure (e );
175+ }
185176
186- @ Override
187- public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
188- activeShardsObserver .waitForActiveShards (new String []{rolloverIndexName },
189- rolloverRequest .getCreateIndexRequest ().waitForActiveShards (),
190- rolloverRequest .masterNodeTimeout (),
191- isShardsAcknowledged -> listener .onResponse (new RolloverResponse (
192- sourceIndexName , rolloverIndexName , conditionResults , false , true , true ,
193- isShardsAcknowledged )),
194- listener ::onFailure );
195- }
196- });
197- } else {
198- listener .onResponse (new RolloverResponse (sourceIndexName , rolloverIndexName , conditionResults ,
199- false , true , false , false ));
200- }
201- }, listener ::onFailure ));
202- }, listener ::onFailure ));
177+ @ Override
178+ public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
179+ if (newState .equals (oldState ) == false ) {
180+ activeShardsObserver .waitForActiveShards (new String []{rolloverIndexName },
181+ rolloverRequest .getCreateIndexRequest ().waitForActiveShards (),
182+ rolloverRequest .masterNodeTimeout (),
183+ isShardsAcknowledged -> listener .onResponse (new RolloverResponse (
184+ sourceIndexName , rolloverIndexName , conditionResults , false , true , true ,
185+ isShardsAcknowledged )),
186+ listener ::onFailure );
187+ }
188+ }
189+ });
203190 } else {
204191 // conditions not met
205192 listener .onResponse (
@@ -216,29 +203,24 @@ public void onFailure(Exception e) {
216203 );
217204 }
218205
219- static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesUpdateRequest (String oldIndex , String newIndex ,
220- RolloverRequest request ) {
221- List <AliasAction > actions = unmodifiableList (Arrays .asList (
222- new AliasAction .Add (newIndex , request .getAlias (), null , null , null , null ),
223- new AliasAction .Remove (oldIndex , request .getAlias ())));
224- final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest (actions )
225- .ackTimeout (request .ackTimeout ())
226- .masterNodeTimeout (request .masterNodeTimeout ());
227- return updateRequest ;
228- }
229-
230- static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesWriteIndexUpdateRequest (String oldIndex , String newIndex ,
231- RolloverRequest request ) {
232- List <AliasAction > actions = unmodifiableList (Arrays .asList (
233- new AliasAction .Add (newIndex , request .getAlias (), null , null , null , true ),
234- new AliasAction .Add (oldIndex , request .getAlias (), null , null , null , false )));
235- final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest (actions )
236- .ackTimeout (request .ackTimeout ())
237- .masterNodeTimeout (request .masterNodeTimeout ());
238- return updateRequest ;
206+ /**
207+ * Creates the alias actions to reflect the alias rollover from the old (source) index to the new (target/rolled over) index. An
208+ * alias pointing to multiple indices will have to be an explicit write index (ie. the old index alias has is_write_index set to true)
209+ * in which case, after the rollover, the new index will need to be the explicit write index.
210+ */
211+ static List <AliasAction > rolloverAliasToNewIndex (String oldIndex , String newIndex , RolloverRequest request ,
212+ boolean explicitWriteIndex ) {
213+ if (explicitWriteIndex ) {
214+ return unmodifiableList (Arrays .asList (
215+ new AliasAction .Add (newIndex , request .getAlias (), null , null , null , true ),
216+ new AliasAction .Add (oldIndex , request .getAlias (), null , null , null , false )));
217+ } else {
218+ return unmodifiableList (Arrays .asList (
219+ new AliasAction .Add (newIndex , request .getAlias (), null , null , null , null ),
220+ new AliasAction .Remove (oldIndex , request .getAlias ())));
221+ }
239222 }
240223
241-
242224 static String generateRolloverIndexName (String sourceIndexName , IndexNameExpressionResolver indexNameExpressionResolver ) {
243225 String resolvedName = indexNameExpressionResolver .resolveDateMathExpression (sourceIndexName );
244226 final boolean isDateMath = sourceIndexName .equals (resolvedName ) == false ;
0 commit comments