Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;

public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {

Expand Down Expand Up @@ -264,7 +265,8 @@ public void testStartStop() throws IOException {
GetDataFrameTransformStatsResponse statsResponse = execute(new GetDataFrameTransformStatsRequest(id),
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1));
assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState());
IndexerState indexerState = statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState();
assertThat(indexerState, is(oneOf(IndexerState.STARTED, IndexerState.INDEXING)));

StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
StopDataFrameTransformResponse stopResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat
protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
final String transformId = params.getId();
final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(DataFrameTransformTask.SCHEDULE_NAME + "_" + transformId,
next());
final DataFrameTransformState transformState = (DataFrameTransformState) state;

final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
Expand Down Expand Up @@ -137,15 +135,15 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
stats -> {
indexerBuilder.setInitialStats(stats);
buildTask.initializeIndexer(indexerBuilder);
scheduleAndStartTask(buildTask, schedulerJob, startTaskListener);
startTask(buildTask, startTaskListener);
},
error -> {
if (error instanceof ResourceNotFoundException == false) {
logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
}
indexerBuilder.setInitialStats(new DataFrameIndexerTransformStats(transformId));
buildTask.initializeIndexer(indexerBuilder);
scheduleAndStartTask(buildTask, schedulerJob, startTaskListener);
startTask(buildTask, startTaskListener);
}
);

Expand Down Expand Up @@ -218,30 +216,20 @@ private void markAsFailed(DataFrameTransformTask task, String reason) {
}
}

private void scheduleAndStartTask(DataFrameTransformTask buildTask,
SchedulerEngine.Job schedulerJob,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
// Note that while the task is added to the scheduler here, the internal state will prevent
// it from doing any work until the task is "started" via the StartTransform api
schedulerEngine.register(buildTask);
schedulerEngine.add(schedulerJob);
logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
private void startTask(DataFrameTransformTask buildTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
// If we are stopped, and it is an initial run, this means we have never been started,
// attempt to start the task
if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) {
logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
buildTask.start(listener);

} else {
logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState());
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
}
}

static SchedulerEngine.Schedule next() {
return (startTime, now) -> {
return now + 1000; // to be fixed, hardcode something
};
}

@Override
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> persistentTask, Map<String, String> headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ public synchronized void start(ActionListener<Response> listener) {
persistStateToClusterState(state, ActionListener.wrap(
task -> {
auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
long now = System.currentTimeMillis();
// kick off the indexer
triggered(new Event(schedulerJobName(), now, now));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me, once we add a tad more logic to the SchedulerEngine.Schedule this won't be necessary because it will know if it has to be triggered right now, later, or not at all.

But this is a good stop-gap until we fix the created schedule.

registerWithSchedulerJob();
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
},
exc -> {
Expand Down Expand Up @@ -237,7 +241,7 @@ public synchronized void triggered(Event event) {
return;
}
// for now no rerun, so only trigger if checkpoint == 0
if (currentCheckpoint.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) {
if (currentCheckpoint.get() == 0 && event.getJobName().equals(schedulerJobName())) {
logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState());
getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
}
Expand All @@ -248,13 +252,7 @@ public synchronized void triggered(Event event) {
* This tries to remove the job from the scheduler and completes the persistent task
*/
synchronized void shutdown() {
try {
schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId());
schedulerEngine.unregister(this);
} catch (Exception e) {
markAsFailed(e);
return;
}
deregisterSchedulerJob();
markAsCompleted();
}

Expand Down Expand Up @@ -310,6 +308,27 @@ public synchronized void onCancelled() {
}
}

private void registerWithSchedulerJob() {
schedulerEngine.register(this);
final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(schedulerJobName(), next());
schedulerEngine.add(schedulerJob);
}

private void deregisterSchedulerJob() {
schedulerEngine.remove(schedulerJobName());
schedulerEngine.unregister(this);
}

private String schedulerJobName() {
return DataFrameTransformTask.SCHEDULE_NAME + "_" + getTransformId();
}

private SchedulerEngine.Schedule next() {
return (startTime, now) -> {
return now + 1000; // to be fixed, hardcode something
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure future work will be needed here so that we can adjust for failures, etc.

};
}

synchronized void initializeIndexer(ClientDataFrameIndexerBuilder indexerBuilder) {
indexer.set(indexerBuilder.build(this));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }

- do:
Expand All @@ -127,7 +127,7 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }

---
Expand Down Expand Up @@ -168,7 +168,7 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }

- do:
Expand All @@ -194,7 +194,7 @@ teardown:
transform_id: "airline-transform-start-later"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-later" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }

- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ teardown:
transform_id: "airline-transform-stats"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }
- match: { transforms.0.state.checkpoint: 0 }
- match: { transforms.0.stats.pages_processed: 0 }
- match: { transforms.0.stats.documents_processed: 0 }
- match: { transforms.0.stats.documents_indexed: 0 }
- match: { transforms.0.stats.trigger_count: 0 }
- match: { transforms.0.stats.trigger_count: 1 }
- match: { transforms.0.stats.index_time_in_ms: 0 }
- match: { transforms.0.stats.index_total: 0 }
- match: { transforms.0.stats.index_failures: 0 }
Expand Down Expand Up @@ -172,7 +172,7 @@ teardown:
transform_id: "_all"
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.1.id: "airline-transform-stats-dos" }
- match: { transforms.1.state.indexer_state: "stopped" }

Expand Down