@@ -40,7 +40,6 @@ public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {
4040 private final ClusterService clusterService ;
4141 private final ThreadPool threadPool ;
4242 private final EnrichPolicyLocks enrichPolicyLocks ;
43- private final ConcurrentMap <String , Boolean > markedIndices = new ConcurrentHashMap <>();
4443
4544 private volatile Scheduler .Cancellable cancellable ;
4645
@@ -86,7 +85,7 @@ private void scheduleNext() {
8685 cancellable = threadPool .schedule (this ::execute , new TimeValue (15 , TimeUnit .MINUTES ), ThreadPool .Names .GENERIC );
8786 } catch (EsRejectedExecutionException e ) {
8887 if (e .isExecutorShutdown ()) {
89- logger .debug ("failed to schedule next maintenance task; shutting down" , e );
88+ logger .debug ("failed to schedule next [enrich] maintenance task; shutting down" , e );
9089 } else {
9190 throw e ;
9291 }
@@ -95,38 +94,11 @@ private void scheduleNext() {
9594
9695 private void execute () {
9796 logger .debug ("triggering scheduled [enrich] maintenance task" );
98- deleteMarkedIndices ();
97+ cleanUpEnrichIndices ();
9998 scheduleNext ();
10099 }
101100
102- private void deleteMarkedIndices () {
103- final String [] markedForDeletion = markedIndices .keySet ().toArray (String []::new );
104- if (markedForDeletion .length == 0 ) {
105- markNewIndicesForDeletion ();
106- } else {
107- DeleteIndexRequest deleteIndices = new DeleteIndexRequest ()
108- .indices (markedForDeletion )
109- .indicesOptions (IGNORE_UNAVAILABLE );
110- client .admin ().indices ().delete (deleteIndices , new ActionListener <>() {
111- @ Override
112- public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
113- logger .debug ("Completed deletion of stale enrich indices [{}]" , () -> Arrays .toString (markedForDeletion ));
114- for (String deletedIndex : markedForDeletion ) {
115- markedIndices .remove (deletedIndex );
116- }
117- markNewIndicesForDeletion ();
118- }
119-
120- @ Override
121- public void onFailure (Exception e ) {
122- logger .error ("Could not delete enrich indices that were marked for deletion" , e );
123- markNewIndicesForDeletion ();
124- }
125- });
126- }
127- }
128-
129- private void markNewIndicesForDeletion () {
101+ private void cleanUpEnrichIndices () {
130102 final Map <String , EnrichPolicy > policies = EnrichStore .getPolicies (clusterService .state ());
131103 GetIndexRequest indices = new GetIndexRequest ()
132104 .indices (EnrichPolicy .ENRICH_INDEX_NAME_BASE + "*" )
@@ -141,11 +113,12 @@ public void onResponse(GetIndexResponse getIndexResponse) {
141113 // If executions were kicked off, we can't be sure that the indices we are about to process are a
142114 // stable state of the system (they could be new indices created by a policy that hasn't been published yet).
143115 if (enrichPolicyLocks .isSafe (lockState )) {
144- for (String indexName : getIndexResponse .getIndices ()) {
145- if (shouldRemoveIndex (getIndexResponse , policies , indexName )) {
146- markedIndices .put (indexName , true );
147- }
148- }
116+ String [] removeIndices = Arrays .stream (getIndexResponse .getIndices ())
117+ .filter (indexName -> shouldRemoveIndex (getIndexResponse , policies , indexName ))
118+ .toArray (String []::new );
119+ deleteIndices (removeIndices );
120+ } else {
121+ logger .debug ("Skipping enrich index cleanup since enrich policy was executed while gathering indices" );
149122 }
150123 }
151124
@@ -177,4 +150,23 @@ private boolean shouldRemoveIndex(GetIndexResponse getIndexResponse, Map<String,
177150 // Index is not currently published to the enrich alias. Should be marked for removal.
178151 return hasAlias == false ;
179152 }
153+
154+ private void deleteIndices (String [] removeIndices ) {
155+ if (removeIndices .length != 0 ) {
156+ DeleteIndexRequest deleteIndices = new DeleteIndexRequest ()
157+ .indices (removeIndices )
158+ .indicesOptions (IGNORE_UNAVAILABLE );
159+ client .admin ().indices ().delete (deleteIndices , new ActionListener <>() {
160+ @ Override
161+ public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
162+ logger .debug ("Completed deletion of stale enrich indices [{}]" , () -> Arrays .toString (removeIndices ));
163+ }
164+
165+ @ Override
166+ public void onFailure (Exception e ) {
167+ logger .error ("Could not delete enrich indices that were marked for deletion" , e );
168+ }
169+ });
170+ }
171+ }
180172}
0 commit comments