Skip to content

Commit 0fd42ce

Browse files
authored
[ML Data Frame] Start directly data frame rather than via the scheduler (#42224)
Trigger indexer start directly to put the indexer in INDEXING state immediately
1 parent 75425ae commit 0fd42ce

File tree

5 files changed

+46
-37
lines changed

5 files changed

+46
-37
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import static org.hamcrest.Matchers.greaterThan;
7373
import static org.hamcrest.Matchers.hasSize;
7474
import static org.hamcrest.Matchers.is;
75+
import static org.hamcrest.Matchers.oneOf;
7576

7677
public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
7778

@@ -264,7 +265,8 @@ public void testStartStop() throws IOException {
264265
GetDataFrameTransformStatsResponse statsResponse = execute(new GetDataFrameTransformStatsRequest(id),
265266
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
266267
assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1));
267-
assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState());
268+
IndexerState indexerState = statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState();
269+
assertThat(indexerState, is(oneOf(IndexerState.STARTED, IndexerState.INDEXING)));
268270

269271
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
270272
StopDataFrameTransformResponse stopResponse =

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,6 @@ static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat
106106
protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
107107
final String transformId = params.getId();
108108
final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
109-
final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(DataFrameTransformTask.SCHEDULE_NAME + "_" + transformId,
110-
next());
111109
final DataFrameTransformState transformState = (DataFrameTransformState) state;
112110

113111
final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
@@ -137,15 +135,15 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
137135
stats -> {
138136
indexerBuilder.setInitialStats(stats);
139137
buildTask.initializeIndexer(indexerBuilder);
140-
scheduleAndStartTask(buildTask, schedulerJob, startTaskListener);
138+
startTask(buildTask, startTaskListener);
141139
},
142140
error -> {
143141
if (error instanceof ResourceNotFoundException == false) {
144142
logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
145143
}
146144
indexerBuilder.setInitialStats(new DataFrameIndexerTransformStats(transformId));
147145
buildTask.initializeIndexer(indexerBuilder);
148-
scheduleAndStartTask(buildTask, schedulerJob, startTaskListener);
146+
startTask(buildTask, startTaskListener);
149147
}
150148
);
151149

@@ -218,30 +216,20 @@ private void markAsFailed(DataFrameTransformTask task, String reason) {
218216
}
219217
}
220218

221-
private void scheduleAndStartTask(DataFrameTransformTask buildTask,
222-
SchedulerEngine.Job schedulerJob,
223-
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
224-
// Note that while the task is added to the scheduler here, the internal state will prevent
225-
// it from doing any work until the task is "started" via the StartTransform api
226-
schedulerEngine.register(buildTask);
227-
schedulerEngine.add(schedulerJob);
228-
logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
219+
private void startTask(DataFrameTransformTask buildTask,
220+
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
229221
// If we are stopped, and it is an initial run, this means we have never been started,
230222
// attempt to start the task
231223
if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) {
224+
logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
232225
buildTask.start(listener);
226+
233227
} else {
234228
logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState());
235229
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
236230
}
237231
}
238232

239-
static SchedulerEngine.Schedule next() {
240-
return (startTime, now) -> {
241-
return now + 1000; // to be fixed, hardcode something
242-
};
243-
}
244-
245233
@Override
246234
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
247235
PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> persistentTask, Map<String, String> headers) {

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@ public synchronized void start(ActionListener<Response> listener) {
208208
persistStateToClusterState(state, ActionListener.wrap(
209209
task -> {
210210
auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
211+
long now = System.currentTimeMillis();
212+
// kick off the indexer
213+
triggered(new Event(schedulerJobName(), now, now));
214+
registerWithSchedulerJob();
211215
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
212216
},
213217
exc -> {
@@ -238,7 +242,7 @@ public synchronized void triggered(Event event) {
238242
return;
239243
}
240244
// for now no rerun, so only trigger if checkpoint == 0
241-
if (currentCheckpoint.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) {
245+
if (currentCheckpoint.get() == 0 && event.getJobName().equals(schedulerJobName())) {
242246
logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState());
243247
getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
244248
}
@@ -249,13 +253,7 @@ public synchronized void triggered(Event event) {
249253
* This tries to remove the job from the scheduler and completes the persistent task
250254
*/
251255
synchronized void shutdown() {
252-
try {
253-
schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId());
254-
schedulerEngine.unregister(this);
255-
} catch (Exception e) {
256-
markAsFailed(e);
257-
return;
258-
}
256+
deregisterSchedulerJob();
259257
markAsCompleted();
260258
}
261259

@@ -311,6 +309,27 @@ public synchronized void onCancelled() {
311309
}
312310
}
313311

312+
private void registerWithSchedulerJob() {
313+
schedulerEngine.register(this);
314+
final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(schedulerJobName(), next());
315+
schedulerEngine.add(schedulerJob);
316+
}
317+
318+
private void deregisterSchedulerJob() {
319+
schedulerEngine.remove(schedulerJobName());
320+
schedulerEngine.unregister(this);
321+
}
322+
323+
private String schedulerJobName() {
324+
return DataFrameTransformTask.SCHEDULE_NAME + "_" + getTransformId();
325+
}
326+
327+
private SchedulerEngine.Schedule next() {
328+
return (startTime, now) -> {
329+
return now + 1000; // to be fixed, hardcode something
330+
};
331+
}
332+
314333
synchronized void initializeIndexer(ClientDataFrameIndexerBuilder indexerBuilder) {
315334
indexer.set(indexerBuilder.build(this));
316335
}

x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ teardown:
100100
transform_id: "airline-transform-start-stop"
101101
- match: { count: 1 }
102102
- match: { transforms.0.id: "airline-transform-start-stop" }
103-
- match: { transforms.0.state.indexer_state: "started" }
103+
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
104104
- match: { transforms.0.state.task_state: "started" }
105105

106106
- do:
@@ -127,7 +127,7 @@ teardown:
127127
transform_id: "airline-transform-start-stop"
128128
- match: { count: 1 }
129129
- match: { transforms.0.id: "airline-transform-start-stop" }
130-
- match: { transforms.0.state.indexer_state: "started" }
130+
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
131131
- match: { transforms.0.state.task_state: "started" }
132132

133133
---
@@ -168,7 +168,7 @@ teardown:
168168
transform_id: "airline-transform-start-stop"
169169
- match: { count: 1 }
170170
- match: { transforms.0.id: "airline-transform-start-stop" }
171-
- match: { transforms.0.state.indexer_state: "started" }
171+
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
172172
- match: { transforms.0.state.task_state: "started" }
173173

174174
- do:
@@ -194,7 +194,7 @@ teardown:
194194
transform_id: "airline-transform-start-later"
195195
- match: { count: 1 }
196196
- match: { transforms.0.id: "airline-transform-start-later" }
197-
- match: { transforms.0.state.indexer_state: "started" }
197+
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
198198
- match: { transforms.0.state.task_state: "started" }
199199

200200
- do:

x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,18 @@ teardown:
4747
transform_id: "airline-transform-stats"
4848
- match: { count: 1 }
4949
- match: { transforms.0.id: "airline-transform-stats" }
50-
- match: { transforms.0.state.indexer_state: "started" }
50+
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
5151
- match: { transforms.0.state.task_state: "started" }
5252
- match: { transforms.0.state.checkpoint: 0 }
53-
- match: { transforms.0.stats.pages_processed: 0 }
53+
- lte: { transforms.0.stats.pages_processed: 1 }
5454
- match: { transforms.0.stats.documents_processed: 0 }
5555
- match: { transforms.0.stats.documents_indexed: 0 }
56-
- match: { transforms.0.stats.trigger_count: 0 }
56+
- match: { transforms.0.stats.trigger_count: 1 }
5757
- match: { transforms.0.stats.index_time_in_ms: 0 }
5858
- match: { transforms.0.stats.index_total: 0 }
5959
- match: { transforms.0.stats.index_failures: 0 }
60-
- match: { transforms.0.stats.search_time_in_ms: 0 }
61-
- match: { transforms.0.stats.search_total: 0 }
60+
- gte: { transforms.0.stats.search_time_in_ms: 0 }
61+
- lte: { transforms.0.stats.search_total: 1 }
6262
- match: { transforms.0.stats.search_failures: 0 }
6363

6464
---
@@ -172,7 +172,7 @@ teardown:
172172
transform_id: "_all"
173173
- match: { count: 2 }
174174
- match: { transforms.0.id: "airline-transform-stats" }
175-
- match: { transforms.0.state.indexer_state: "started" }
175+
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
176176
- match: { transforms.1.id: "airline-transform-stats-dos" }
177177
- match: { transforms.1.state.indexer_state: "stopped" }
178178

0 commit comments

Comments
 (0)