|
14 | 14 | import org.elasticsearch.client.Response; |
15 | 15 | import org.elasticsearch.client.ResponseException; |
16 | 16 | import org.elasticsearch.client.RestClient; |
| 17 | +import org.elasticsearch.common.CheckedRunnable; |
17 | 18 | import org.elasticsearch.common.xcontent.XContentHelper; |
18 | 19 | import org.elasticsearch.common.xcontent.json.JsonXContent; |
19 | 20 | import org.elasticsearch.test.ESTestCase; |
|
29 | 30 | import java.util.List; |
30 | 31 | import java.util.Map; |
31 | 32 | import java.util.concurrent.atomic.AtomicReference; |
| 33 | +import java.util.function.Predicate; |
32 | 34 |
|
| 35 | +import static org.elasticsearch.test.ESTestCase.assertBusy; |
33 | 36 | import static org.junit.Assert.assertEquals; |
34 | 37 |
|
35 | 38 | public final class XPackRestTestHelper { |
@@ -84,34 +87,54 @@ public static void waitForMlTemplates(RestClient client) throws InterruptedExcep |
84 | 87 | } |
85 | 88 |
|
86 | 89 | /** |
87 | | - * Waits for pending tasks to complete |
| 90 | + * Wait for outstanding tasks to complete. The specified admin client is used to check the outstanding tasks and this is done using |
| 91 | + * {@link ESTestCase#assertBusy(CheckedRunnable)} to give a chance to any outstanding tasks to complete. |
| 92 | + * |
| 93 | + * @param adminClient the admin client |
| 94 | + * @throws Exception if an exception is thrown while checking the outstanding tasks |
88 | 95 | */ |
89 | | - public static void waitForPendingTasks(RestClient adminClient) throws Exception { |
90 | | - ESTestCase.assertBusy(() -> { |
| 96 | + public static void waitForPendingTasks(final RestClient adminClient) throws Exception { |
| 97 | + waitForPendingTasks(adminClient, taskName -> false); |
| 98 | + } |
| 99 | + |
| 100 | + /** |
| 101 | + * Wait for outstanding tasks to complete. The specified admin client is used to check the outstanding tasks and this is done using |
| 102 | + * {@link ESTestCase#assertBusy(CheckedRunnable)} to give a chance to any outstanding tasks to complete. The specified filter is used |
| 103 | + * to filter out outstanding tasks that are expected to be there. |
| 104 | + * |
| 105 | + * @param adminClient the admin client |
| 106 | + * @param taskFilter predicate used to filter tasks that are expected to be there |
| 107 | + * @throws Exception if an exception is thrown while checking the outstanding tasks |
| 108 | + */ |
| 109 | + public static void waitForPendingTasks(final RestClient adminClient, final Predicate<String> taskFilter) throws Exception { |
| 110 | + assertBusy(() -> { |
91 | 111 | try { |
92 | | - Request request = new Request("GET", "/_cat/tasks"); |
| 112 | + final Request request = new Request("GET", "/_cat/tasks"); |
93 | 113 | request.addParameter("detailed", "true"); |
94 | | - Response response = adminClient.performRequest(request); |
95 | | - // Check to see if there are tasks still active. We exclude the |
96 | | - // list tasks |
97 | | - // actions tasks form this otherwise we will always fail |
| 114 | + final Response response = adminClient.performRequest(request); |
| 115 | + /* |
| 116 | + * Check to see if there are outstanding tasks; we exclude the list task itself, and any expected outstanding tasks using |
| 117 | + * the specified task filter. |
| 118 | + */ |
98 | 119 | if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { |
99 | 120 | try (BufferedReader responseReader = new BufferedReader( |
100 | 121 | new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { |
101 | 122 | int activeTasks = 0; |
102 | 123 | String line; |
103 | | - StringBuilder tasksListString = new StringBuilder(); |
| 124 | + final StringBuilder tasksListString = new StringBuilder(); |
104 | 125 | while ((line = responseReader.readLine()) != null) { |
105 | | - if (line.startsWith(ListTasksAction.NAME) == false) { |
106 | | - activeTasks++; |
107 | | - tasksListString.append(line); |
108 | | - tasksListString.append('\n'); |
| 126 | + final String taskName = line.split("\\s+")[0]; |
| 127 | + if (taskName.startsWith(ListTasksAction.NAME) || taskFilter.test(taskName)) { |
| 128 | + continue; |
109 | 129 | } |
| 130 | + activeTasks++; |
| 131 | + tasksListString.append(line); |
| 132 | + tasksListString.append('\n'); |
110 | 133 | } |
111 | 134 | assertEquals(activeTasks + " active tasks found:\n" + tasksListString, 0, activeTasks); |
112 | 135 | } |
113 | 136 | } |
114 | | - } catch (IOException e) { |
| 137 | + } catch (final IOException e) { |
115 | 138 | throw new AssertionError("Error getting active tasks list", e); |
116 | 139 | } |
117 | 140 | }); |
|
0 commit comments