88package org .elasticsearch .xpack .enrich ;
99
1010import org .elasticsearch .action .ActionListener ;
11+ import org .elasticsearch .action .admin .cluster .node .tasks .get .GetTaskRequest ;
1112import org .elasticsearch .client .Client ;
1213import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
1314import org .elasticsearch .cluster .service .ClusterService ;
1415import org .elasticsearch .common .settings .Settings ;
1516import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
16- import org .elasticsearch .tasks .Task ;
17- import org .elasticsearch .tasks .TaskAwareRequest ;
18- import org .elasticsearch .tasks .TaskId ;
19- import org .elasticsearch .tasks .TaskListener ;
20- import org .elasticsearch .tasks .TaskManager ;
2117import org .elasticsearch .threadpool .ThreadPool ;
2218import org .elasticsearch .xpack .core .enrich .EnrichPolicy ;
2319import org .elasticsearch .xpack .core .enrich .action .ExecuteEnrichPolicyAction ;
2420import org .elasticsearch .xpack .core .enrich .action .ExecuteEnrichPolicyStatus ;
21+ import org .elasticsearch .xpack .enrich .action .InternalExecutePolicyAction ;
2522
26- import java .util .Map ;
2723import java .util .concurrent .Semaphore ;
28- import java .util .function .BiConsumer ;
2924import java .util .function .LongSupplier ;
3025
3126public class EnrichPolicyExecutor {
@@ -34,7 +29,6 @@ public class EnrichPolicyExecutor {
3429
3530 private final ClusterService clusterService ;
3631 private final Client client ;
37- private final TaskManager taskManager ;
3832 private final ThreadPool threadPool ;
3933 private final IndexNameExpressionResolver indexNameExpressionResolver ;
4034 private final LongSupplier nowSupplier ;
@@ -48,15 +42,13 @@ public EnrichPolicyExecutor(
4842 Settings settings ,
4943 ClusterService clusterService ,
5044 Client client ,
51- TaskManager taskManager ,
5245 ThreadPool threadPool ,
5346 IndexNameExpressionResolver indexNameExpressionResolver ,
5447 EnrichPolicyLocks policyLocks ,
5548 LongSupplier nowSupplier
5649 ) {
5750 this .clusterService = clusterService ;
5851 this .client = client ;
59- this .taskManager = taskManager ;
6052 this .threadPool = threadPool ;
6153 this .indexNameExpressionResolver = indexNameExpressionResolver ;
6254 this .nowSupplier = nowSupplier ;
@@ -67,6 +59,43 @@ public EnrichPolicyExecutor(
6759 this .policyExecutionPermits = new Semaphore (maximumConcurrentPolicyExecutions );
6860 }
6961
62+ public void coordinatePolicyExecution (
63+ ExecuteEnrichPolicyAction .Request request ,
64+ ActionListener <ExecuteEnrichPolicyAction .Response > listener
65+ ) {
66+ tryLockingPolicy (request .getName ());
67+ try {
68+ client .execute (InternalExecutePolicyAction .INSTANCE , request , ActionListener .wrap (response -> {
69+ if (response .getStatus () != null ) {
70+ releasePolicy (request .getName ());
71+ listener .onResponse (response );
72+ } else {
73+ waitAndThenRelease (request .getName (), response );
74+ listener .onResponse (response );
75+ }
76+ }, e -> {
77+ releasePolicy (request .getName ());
78+ listener .onFailure (e );
79+ }));
80+ } catch (Exception e ) {
81+ // Be sure to unlock if submission failed.
82+ releasePolicy (request .getName ());
83+ throw e ;
84+ }
85+ }
86+
87+ public void runPolicyLocally (ExecuteEnrichPolicyTask task , String policyName , ActionListener <ExecuteEnrichPolicyStatus > listener ) {
88+ try {
89+ EnrichPolicy policy = EnrichStore .getPolicy (policyName , clusterService .state ());
90+ task .setStatus (new ExecuteEnrichPolicyStatus (ExecuteEnrichPolicyStatus .PolicyPhases .SCHEDULED ));
91+ Runnable runnable = createPolicyRunner (policyName , policy , task , listener );
92+ threadPool .executor (ThreadPool .Names .GENERIC ).execute (runnable );
93+ } catch (Exception e ) {
94+ task .setStatus (new ExecuteEnrichPolicyStatus (ExecuteEnrichPolicyStatus .PolicyPhases .FAILED ));
95+ throw e ;
96+ }
97+ }
98+
7099 private void tryLockingPolicy (String policyName ) {
71100 policyLocks .lockPolicy (policyName );
72101 if (policyExecutionPermits .tryAcquire () == false ) {
@@ -91,49 +120,14 @@ private void releasePolicy(String policyName) {
91120 }
92121 }
93122
94- private class PolicyCompletionListener implements ActionListener <ExecuteEnrichPolicyStatus > {
95- private final String policyName ;
96- private final ExecuteEnrichPolicyTask task ;
97- private final BiConsumer <Task , ExecuteEnrichPolicyStatus > onResponse ;
98- private final BiConsumer <Task , Exception > onFailure ;
99-
100- PolicyCompletionListener (
101- String policyName ,
102- ExecuteEnrichPolicyTask task ,
103- BiConsumer <Task , ExecuteEnrichPolicyStatus > onResponse ,
104- BiConsumer <Task , Exception > onFailure
105- ) {
106- this .policyName = policyName ;
107- this .task = task ;
108- this .onResponse = onResponse ;
109- this .onFailure = onFailure ;
110- }
111-
112- @ Override
113- public void onResponse (ExecuteEnrichPolicyStatus status ) {
114- assert ExecuteEnrichPolicyStatus .PolicyPhases .COMPLETE .equals (status .getPhase ()) : "incomplete task returned" ;
115- releasePolicy (policyName );
116- try {
117- taskManager .unregister (task );
118- } finally {
119- onResponse .accept (task , status );
120- }
121- }
122-
123- @ Override
124- public void onFailure (Exception e ) {
125- // Set task status to failed to avoid having to catch and rethrow exceptions everywhere
126- task .setStatus (new ExecuteEnrichPolicyStatus (ExecuteEnrichPolicyStatus .PolicyPhases .FAILED ));
127- releasePolicy (policyName );
128- try {
129- taskManager .unregister (task );
130- } finally {
131- onFailure .accept (task , e );
132- }
133- }
123+ private void waitAndThenRelease (String policyName , ExecuteEnrichPolicyAction .Response response ) {
124+ GetTaskRequest getTaskRequest = new GetTaskRequest ();
125+ getTaskRequest .setTaskId (response .getTaskId ());
126+ getTaskRequest .setWaitForCompletion (true );
127+ client .admin ().cluster ().getTask (getTaskRequest , ActionListener .wrap (() -> releasePolicy (policyName )));
134128 }
135129
136- protected Runnable createPolicyRunner (
130+ private Runnable createPolicyRunner (
137131 String policyName ,
138132 EnrichPolicy policy ,
139133 ExecuteEnrichPolicyTask task ,
@@ -153,94 +147,4 @@ protected Runnable createPolicyRunner(
153147 );
154148 }
155149
156- private EnrichPolicy getPolicy (ExecuteEnrichPolicyAction .Request request ) {
157- // Look up policy in policy store and execute it
158- EnrichPolicy policy = EnrichStore .getPolicy (request .getName (), clusterService .state ());
159- if (policy == null ) {
160- throw new IllegalArgumentException ("Policy execution failed. Could not locate policy with id [" + request .getName () + "]" );
161- }
162- return policy ;
163- }
164-
165- public Task runPolicy (ExecuteEnrichPolicyAction .Request request , ActionListener <ExecuteEnrichPolicyStatus > listener ) {
166- return runPolicy (request , getPolicy (request ), listener );
167- }
168-
169- public Task runPolicy (ExecuteEnrichPolicyAction .Request request , TaskListener <ExecuteEnrichPolicyStatus > listener ) {
170- return runPolicy (request , getPolicy (request ), listener );
171- }
172-
173- public Task runPolicy (
174- ExecuteEnrichPolicyAction .Request request ,
175- EnrichPolicy policy ,
176- ActionListener <ExecuteEnrichPolicyStatus > listener
177- ) {
178- return runPolicy (request , policy , (t , r ) -> listener .onResponse (r ), (t , e ) -> listener .onFailure (e ));
179- }
180-
181- public Task runPolicy (
182- ExecuteEnrichPolicyAction .Request request ,
183- EnrichPolicy policy ,
184- TaskListener <ExecuteEnrichPolicyStatus > listener
185- ) {
186- return runPolicy (request , policy , listener ::onResponse , listener ::onFailure );
187- }
188-
189- private Task runPolicy (
190- ExecuteEnrichPolicyAction .Request request ,
191- EnrichPolicy policy ,
192- BiConsumer <Task , ExecuteEnrichPolicyStatus > onResponse ,
193- BiConsumer <Task , Exception > onFailure
194- ) {
195- tryLockingPolicy (request .getName ());
196- try {
197- return runPolicyTask (request , policy , onResponse , onFailure );
198- } catch (Exception e ) {
199- // Be sure to unlock if submission failed.
200- releasePolicy (request .getName ());
201- throw e ;
202- }
203- }
204-
205- private Task runPolicyTask (
206- final ExecuteEnrichPolicyAction .Request request ,
207- EnrichPolicy policy ,
208- BiConsumer <Task , ExecuteEnrichPolicyStatus > onResponse ,
209- BiConsumer <Task , Exception > onFailure
210- ) {
211- Task asyncTask = taskManager .register ("enrich" , TASK_ACTION , new TaskAwareRequest () {
212- @ Override
213- public void setParentTask (TaskId taskId ) {
214- request .setParentTask (taskId );
215- }
216-
217- @ Override
218- public TaskId getParentTask () {
219- return request .getParentTask ();
220- }
221-
222- @ Override
223- public Task createTask (long id , String type , String action , TaskId parentTaskId , Map <String , String > headers ) {
224- return new ExecuteEnrichPolicyTask (id , type , action , getDescription (), parentTaskId , headers );
225- }
226-
227- @ Override
228- public String getDescription () {
229- return request .getName ();
230- }
231- });
232- ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask ) asyncTask ;
233- try {
234- task .setStatus (new ExecuteEnrichPolicyStatus (ExecuteEnrichPolicyStatus .PolicyPhases .SCHEDULED ));
235- PolicyCompletionListener completionListener = new PolicyCompletionListener (request .getName (), task , onResponse , onFailure );
236- Runnable runnable = createPolicyRunner (request .getName (), policy , task , completionListener );
237- threadPool .executor (ThreadPool .Names .GENERIC ).execute (runnable );
238- return asyncTask ;
239- } catch (Exception e ) {
240- // Unregister task in case of exception
241- task .setStatus (new ExecuteEnrichPolicyStatus (ExecuteEnrichPolicyStatus .PolicyPhases .FAILED ));
242- taskManager .unregister (asyncTask );
243- throw e ;
244- }
245- }
246150}
0 commit comments