Skip to content

Commit 9b3eb69

Browse files
authored
[Transform] Introduce TransformScheduler central service (#84657)
1 parent 50c4d7b commit 9b3eb69

File tree

23 files changed

+1505
-108
lines changed

23 files changed

+1505
-108
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.transform.transforms;
9+
10+
import org.elasticsearch.common.io.stream.Writeable.Reader;
11+
import org.elasticsearch.core.TimeValue;
12+
import org.elasticsearch.test.VersionUtils;
13+
import org.elasticsearch.xcontent.XContentParser;
14+
import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;
15+
16+
import java.io.IOException;
17+
18+
public class TransformTaskParamsTests extends AbstractSerializingTransformTestCase<TransformTaskParams> {
19+
20+
private static TransformTaskParams randomTransformTaskParams() {
21+
return new TransformTaskParams(
22+
randomAlphaOfLengthBetween(1, 10),
23+
randomBoolean() ? VersionUtils.randomVersion(random()) : null,
24+
randomBoolean() ? TimeValue.timeValueSeconds(randomLongBetween(1, 24 * 60 * 60)) : null,
25+
randomBoolean()
26+
);
27+
}
28+
29+
@Override
30+
protected TransformTaskParams doParseInstance(XContentParser parser) throws IOException {
31+
return TransformTaskParams.PARSER.apply(parser, null);
32+
}
33+
34+
@Override
35+
protected TransformTaskParams createTestInstance() {
36+
return randomTransformTaskParams();
37+
}
38+
39+
@Override
40+
protected Reader<TransformTaskParams> instanceReader() {
41+
return TransformTaskParams::new;
42+
}
43+
}

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
4343
import static org.hamcrest.Matchers.containsInRelativeOrder;
4444
import static org.hamcrest.Matchers.greaterThan;
45+
import static org.hamcrest.Matchers.is;
46+
import static org.hamcrest.Matchers.notNullValue;
4547

4648
/**
4749
* Test runner for testing continuous transforms, testing
@@ -135,7 +137,7 @@ public void removePipelines() throws IOException {
135137
deletePipeline(ContinuousTestCase.INGEST_PIPELINE);
136138
}
137139

138-
public void testContinousEvents() throws Exception {
140+
public void testContinuousEvents() throws Exception {
139141
String sourceIndexName = ContinuousTestCase.CONTINUOUS_EVENTS_SOURCE_INDEX;
140142
DecimalFormat numberFormat = new DecimalFormat("000", new DecimalFormatSymbols(Locale.ROOT));
141143
String dateType = randomBoolean() ? "date_nanos" : "date";
@@ -257,7 +259,6 @@ public void testContinousEvents() throws Exception {
257259

258260
// start all transforms, wait until the processed all data and stop them
259261
startTransforms();
260-
261262
waitUntilTransformsProcessedNewData(ContinuousTestCase.SYNC_DELAY, run);
262263
stopTransforms();
263264

@@ -481,16 +482,20 @@ private void waitUntilTransformsProcessedNewData(TimeValue delay, int iteration)
481482
for (ContinuousTestCase testCase : transformTestCases) {
482483
assertBusy(() -> {
483484
var stats = getTransformStats(testCase.getName());
484-
long lastSearchTime = (long) XContentMapValues.extractValue("checkpointing.last_search_time", stats);
485+
Object lastSearchTimeObj = XContentMapValues.extractValue("checkpointing.last_search_time", stats);
486+
assertThat(lastSearchTimeObj, is(notNullValue()));
487+
long lastSearchTime = (long) lastSearchTimeObj;
485488
assertThat(
486489
"transform ["
487490
+ testCase.getName()
488-
+ "] does not progress, state: "
491+
+ "] does not progress, iteration: "
492+
+ iteration
493+
+ ", state: "
489494
+ stats.get("state")
490495
+ ", reason: "
491496
+ stats.get("reason"),
492497
Instant.ofEpochMilli(lastSearchTime),
493-
greaterThan(waitUntil)
498+
is(greaterThan(waitUntil))
494499
);
495500
}, 30, TimeUnit.SECONDS);
496501
}

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public void testUsage() throws Exception {
4040
assertTrue((boolean) XContentMapValues.extractValue("transform.available", usageAsMap));
4141
assertTrue((boolean) XContentMapValues.extractValue("transform.enabled", usageAsMap));
4242
// no transforms, no stats
43-
assertNull(XContentMapValues.extractValue("transform.transforms", usageAsMap));
44-
assertNull(XContentMapValues.extractValue("transform.feature_counts", usageAsMap));
45-
assertNull(XContentMapValues.extractValue("transform.stats", usageAsMap));
43+
assertNull("full usage response: " + usageAsMap, XContentMapValues.extractValue("transform.transforms", usageAsMap));
44+
assertNull("full usage response: " + usageAsMap, XContentMapValues.extractValue("transform.feature_counts", usageAsMap));
45+
assertNull("full usage response: " + usageAsMap, XContentMapValues.extractValue("transform.stats", usageAsMap));
4646

4747
// create transforms
4848
createPivotReviewsTransform("test_usage", "pivot_reviews", null);

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import org.elasticsearch.xpack.core.action.SetResetModeActionRequest;
5656
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
5757
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
58-
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
5958
import org.elasticsearch.xpack.core.transform.TransformField;
6059
import org.elasticsearch.xpack.core.transform.TransformMessages;
6160
import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider;
@@ -105,6 +104,7 @@
105104
import org.elasticsearch.xpack.transform.rest.action.RestUpdateTransformAction;
106105
import org.elasticsearch.xpack.transform.rest.action.RestUpgradeTransformsAction;
107106
import org.elasticsearch.xpack.transform.transforms.TransformPersistentTasksExecutor;
107+
import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;
108108

109109
import java.io.IOException;
110110
import java.io.UncheckedIOException;
@@ -133,7 +133,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
133133
private final SetOnce<TransformServices> transformServices = new SetOnce<>();
134134

135135
public static final Integer DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
136-
public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000);
136+
public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueSeconds(60);
137137

138138
public static final int DEFAULT_FAILURE_RETRIES = 10;
139139
// How many times the transform task can retry on a non-critical failure.
@@ -148,6 +148,16 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
148148
Setting.Property.Dynamic
149149
);
150150

151+
public static final TimeValue DEFAULT_SCHEDULER_FREQUENCY = TimeValue.timeValueSeconds(5);
152+
// How often does the transform scheduler process the tasks
153+
public static final Setting<TimeValue> SCHEDULER_FREQUENCY = Setting.timeSetting(
154+
"xpack.transform.transform_scheduler_frequency",
155+
DEFAULT_SCHEDULER_FREQUENCY,
156+
TimeValue.timeValueSeconds(1),
157+
TimeValue.timeValueMinutes(1),
158+
Setting.Property.NodeScope
159+
);
160+
151161
public Transform(Settings settings) {
152162
this.settings = settings;
153163
}
@@ -230,14 +240,16 @@ public Collection<Object> createComponents(
230240
xContentRegistry
231241
);
232242
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName(), clusterService);
243+
Clock clock = Clock.systemUTC();
233244
TransformCheckpointService checkpointService = new TransformCheckpointService(
234-
Clock.systemUTC(),
245+
clock,
235246
settings,
236247
clusterService,
237248
configManager,
238249
auditor
239250
);
240-
SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC());
251+
TransformScheduler scheduler = new TransformScheduler(clock, threadPool, settings);
252+
scheduler.start();
241253

242254
transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));
243255

@@ -269,13 +281,13 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
269281

270282
@Override
271283
public List<Setting<?>> getSettings() {
272-
return List.of(NUM_FAILURE_RETRIES_SETTING);
284+
return List.of(NUM_FAILURE_RETRIES_SETTING, SCHEDULER_FREQUENCY);
273285
}
274286

275287
@Override
276288
public void close() {
277289
if (transformServices.get() != null) {
278-
transformServices.get().getSchedulerEngine().stop();
290+
transformServices.get().getScheduler().stop();
279291
}
280292
}
281293

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77

88
package org.elasticsearch.xpack.transform;
99

10-
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
1110
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
1211
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
1312
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
13+
import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;
1414

1515
import java.util.Objects;
1616

@@ -25,18 +25,18 @@ public final class TransformServices {
2525
private final TransformConfigManager configManager;
2626
private final TransformCheckpointService checkpointService;
2727
private final TransformAuditor auditor;
28-
private final SchedulerEngine schedulerEngine;
28+
private final TransformScheduler scheduler;
2929

3030
public TransformServices(
31-
TransformConfigManager transformConfigManager,
31+
TransformConfigManager configManager,
3232
TransformCheckpointService checkpointService,
33-
TransformAuditor transformAuditor,
34-
SchedulerEngine schedulerEngine
33+
TransformAuditor auditor,
34+
TransformScheduler scheduler
3535
) {
36-
this.configManager = Objects.requireNonNull(transformConfigManager);
36+
this.configManager = Objects.requireNonNull(configManager);
3737
this.checkpointService = Objects.requireNonNull(checkpointService);
38-
this.auditor = Objects.requireNonNull(transformAuditor);
39-
this.schedulerEngine = Objects.requireNonNull(schedulerEngine);
38+
this.auditor = Objects.requireNonNull(auditor);
39+
this.scheduler = Objects.requireNonNull(scheduler);
4040
}
4141

4242
public TransformConfigManager getConfigManager() {
@@ -51,7 +51,7 @@ public TransformAuditor getAuditor() {
5151
return auditor;
5252
}
5353

54-
public SchedulerEngine getSchedulerEngine() {
55-
return schedulerEngine;
54+
public TransformScheduler getScheduler() {
55+
return scheduler;
5656
}
5757
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ protected void taskOperation(Request request, TransformTask transformTask, Actio
234234

235235
if (ids.contains(transformTask.getTransformId())) {
236236
// move the call to the generic thread pool, so we do not block the network thread
237-
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
237+
threadPool.generic().execute(() -> {
238238
transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(r -> {
239239
try {
240240
transformTask.stop(request.isForce(), request.isWaitForCheckpoint());

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ class TransformContext {
2121
public interface Listener {
2222
void shutdown();
2323

24+
void failureCountChanged();
25+
2426
void fail(String failureMessage, ActionListener<Void> listener);
2527
}
2628

@@ -37,7 +39,7 @@ public interface Listener {
3739
// Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
3840
private final AtomicLong currentCheckpoint;
3941

40-
TransformContext(final TransformTaskState taskState, String stateReason, long currentCheckpoint, Listener taskListener) {
42+
TransformContext(TransformTaskState taskState, String stateReason, long currentCheckpoint, Listener taskListener) {
4143
this.taskState = new AtomicReference<>(taskState);
4244
this.stateReason = new AtomicReference<>(stateReason);
4345
this.currentCheckpoint = new AtomicLong(currentCheckpoint);
@@ -49,10 +51,6 @@ TransformTaskState getTaskState() {
4951
return taskState.get();
5052
}
5153

52-
void setTaskState(TransformTaskState newState) {
53-
taskState.set(newState);
54-
}
55-
5654
boolean setTaskState(TransformTaskState oldState, TransformTaskState newState) {
5755
return taskState.compareAndSet(oldState, newState);
5856
}
@@ -70,6 +68,7 @@ void setTaskStateToFailed(String reason) {
7068
void resetReasonAndFailureCounter() {
7169
stateReason.set(null);
7270
failureCount.set(0);
71+
taskListener.failureCountChanged();
7372
}
7473

7574
String getStateReason() {
@@ -100,8 +99,10 @@ int getFailureCount() {
10099
return failureCount.get();
101100
}
102101

103-
int getAndIncrementFailureCount() {
104-
return failureCount.getAndIncrement();
102+
int incrementAndGetFailureCount() {
103+
int newFailureCount = failureCount.incrementAndGet();
104+
taskListener.failureCountChanged();
105+
return newFailureCount;
105106
}
106107

107108
void setChangesLastDetectedAt(Instant time) {
@@ -138,5 +139,4 @@ void markAsFailed(String failureMessage) {
138139
failureCount.set(0);
139140
}, e -> {}));
140141
}
141-
142142
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -957,7 +957,7 @@ void handleFailure(Exception e) {
957957

958958
int numFailureRetries = Optional.ofNullable(transformConfig.getSettings().getNumFailureRetries())
959959
.orElse(context.getNumFailureRetries());
960-
if (numFailureRetries != -1 && context.getAndIncrementFailureCount() > numFailureRetries) {
960+
if (numFailureRetries != -1 && context.incrementAndGetFailureCount() > numFailureRetries) {
961961
failIndexer(
962962
"task encountered more than "
963963
+ numFailureRetries

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ private void startTask(
381381
ActionListener<StartTransformAction.Response> listener
382382
) {
383383
// switch the threadpool to generic, because the caller is on the system_read threadpool
384-
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
384+
threadPool.generic().execute(() -> {
385385
buildTask.initializeIndexer(indexerBuilder);
386386
// TransformTask#start will fail if the task state is FAILED
387387
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener);
@@ -409,7 +409,7 @@ protected AllocatedPersistentTask createTask(
409409
client,
410410
persistentTask.getParams(),
411411
(TransformState) persistentTask.getState(),
412-
transformServices.getSchedulerEngine(),
412+
transformServices.getScheduler(),
413413
auditor,
414414
threadPool,
415415
headers

0 commit comments

Comments
 (0)