Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.transform.transforms;

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;

import java.io.IOException;

public class TransformTaskParamsTests extends AbstractSerializingTransformTestCase<TransformTaskParams> {

private static TransformTaskParams randomTransformTaskParams() {
return new TransformTaskParams(
randomAlphaOfLengthBetween(1, 10),
randomBoolean() ? VersionUtils.randomVersion(random()) : null,
randomBoolean() ? TimeValue.timeValueSeconds(randomLongBetween(1, 24 * 60 * 60)) : null,
randomBoolean()
);
}

@Override
protected TransformTaskParams doParseInstance(XContentParser parser) throws IOException {
return TransformTaskParams.PARSER.apply(parser, null);
}

@Override
protected TransformTaskParams createTestInstance() {
return randomTransformTaskParams();
}

@Override
protected Reader<TransformTaskParams> instanceReader() {
return TransformTaskParams::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsInRelativeOrder;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

/**
* Test runner for testing continuous transforms, testing
Expand Down Expand Up @@ -135,7 +137,7 @@ public void removePipelines() throws IOException {
deletePipeline(ContinuousTestCase.INGEST_PIPELINE);
}

public void testContinousEvents() throws Exception {
public void testContinuousEvents() throws Exception {
String sourceIndexName = ContinuousTestCase.CONTINUOUS_EVENTS_SOURCE_INDEX;
DecimalFormat numberFormat = new DecimalFormat("000", new DecimalFormatSymbols(Locale.ROOT));
String dateType = randomBoolean() ? "date_nanos" : "date";
Expand Down Expand Up @@ -257,7 +259,6 @@ public void testContinousEvents() throws Exception {

// start all transforms, wait until the processed all data and stop them
startTransforms();

waitUntilTransformsProcessedNewData(ContinuousTestCase.SYNC_DELAY, run);
stopTransforms();

Expand Down Expand Up @@ -481,16 +482,20 @@ private void waitUntilTransformsProcessedNewData(TimeValue delay, int iteration)
for (ContinuousTestCase testCase : transformTestCases) {
assertBusy(() -> {
var stats = getTransformStats(testCase.getName());
long lastSearchTime = (long) XContentMapValues.extractValue("checkpointing.last_search_time", stats);
Object lastSearchTimeObj = XContentMapValues.extractValue("checkpointing.last_search_time", stats);
assertThat(lastSearchTimeObj, is(notNullValue()));
long lastSearchTime = (long) lastSearchTimeObj;
assertThat(
"transform ["
+ testCase.getName()
+ "] does not progress, state: "
+ "] does not progress, iteration: "
+ iteration
+ ", state: "
+ stats.get("state")
+ ", reason: "
+ stats.get("reason"),
Instant.ofEpochMilli(lastSearchTime),
greaterThan(waitUntil)
is(greaterThan(waitUntil))
);
}, 30, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public void testUsage() throws Exception {
assertTrue((boolean) XContentMapValues.extractValue("transform.available", usageAsMap));
assertTrue((boolean) XContentMapValues.extractValue("transform.enabled", usageAsMap));
// no transforms, no stats
assertNull(XContentMapValues.extractValue("transform.transforms", usageAsMap));
assertNull(XContentMapValues.extractValue("transform.feature_counts", usageAsMap));
assertNull(XContentMapValues.extractValue("transform.stats", usageAsMap));
assertNull("full usage response: " + usageAsMap, XContentMapValues.extractValue("transform.transforms", usageAsMap));
assertNull("full usage response: " + usageAsMap, XContentMapValues.extractValue("transform.feature_counts", usageAsMap));
assertNull("full usage response: " + usageAsMap, XContentMapValues.extractValue("transform.stats", usageAsMap));

// create transforms
createPivotReviewsTransform("test_usage", "pivot_reviews", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.elasticsearch.xpack.core.action.SetResetModeActionRequest;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider;
Expand Down Expand Up @@ -105,6 +104,7 @@
import org.elasticsearch.xpack.transform.rest.action.RestUpdateTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestUpgradeTransformsAction;
import org.elasticsearch.xpack.transform.transforms.TransformPersistentTasksExecutor;
import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -133,7 +133,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
private final SetOnce<TransformServices> transformServices = new SetOnce<>();

public static final Integer DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000);
public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueSeconds(60);

public static final int DEFAULT_FAILURE_RETRIES = 10;
// How many times the transform task can retry on a non-critical failure.
Expand All @@ -148,6 +148,16 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
Setting.Property.Dynamic
);

public static final TimeValue DEFAULT_SCHEDULER_FREQUENCY = TimeValue.timeValueSeconds(5);
// How often does the transform scheduler process the tasks
public static final Setting<TimeValue> SCHEDULER_FREQUENCY = Setting.timeSetting(
"xpack.transform.transform_scheduler_frequency",
DEFAULT_SCHEDULER_FREQUENCY,
TimeValue.timeValueSeconds(1),
TimeValue.timeValueMinutes(1),
Setting.Property.NodeScope
);

public Transform(Settings settings) {
this.settings = settings;
}
Expand Down Expand Up @@ -230,14 +240,16 @@ public Collection<Object> createComponents(
xContentRegistry
);
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName(), clusterService);
Clock clock = Clock.systemUTC();
TransformCheckpointService checkpointService = new TransformCheckpointService(
Clock.systemUTC(),
clock,
settings,
clusterService,
configManager,
auditor
);
SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC());
TransformScheduler scheduler = new TransformScheduler(clock, threadPool, settings);
scheduler.start();

transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));

Expand Down Expand Up @@ -269,13 +281,13 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(

@Override
public List<Setting<?>> getSettings() {
return List.of(NUM_FAILURE_RETRIES_SETTING);
return List.of(NUM_FAILURE_RETRIES_SETTING, SCHEDULER_FREQUENCY);
}

@Override
public void close() {
if (transformServices.get() != null) {
transformServices.get().getSchedulerEngine().stop();
transformServices.get().getScheduler().stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

package org.elasticsearch.xpack.transform;

import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;

import java.util.Objects;

Expand All @@ -25,18 +25,18 @@ public final class TransformServices {
private final TransformConfigManager configManager;
private final TransformCheckpointService checkpointService;
private final TransformAuditor auditor;
private final SchedulerEngine schedulerEngine;
private final TransformScheduler scheduler;

public TransformServices(
TransformConfigManager transformConfigManager,
TransformConfigManager configManager,
TransformCheckpointService checkpointService,
TransformAuditor transformAuditor,
SchedulerEngine schedulerEngine
TransformAuditor auditor,
TransformScheduler scheduler
) {
this.configManager = Objects.requireNonNull(transformConfigManager);
this.configManager = Objects.requireNonNull(configManager);
this.checkpointService = Objects.requireNonNull(checkpointService);
this.auditor = Objects.requireNonNull(transformAuditor);
this.schedulerEngine = Objects.requireNonNull(schedulerEngine);
this.auditor = Objects.requireNonNull(auditor);
this.scheduler = Objects.requireNonNull(scheduler);
}

public TransformConfigManager getConfigManager() {
Expand All @@ -51,7 +51,7 @@ public TransformAuditor getAuditor() {
return auditor;
}

public SchedulerEngine getSchedulerEngine() {
return schedulerEngine;
public TransformScheduler getScheduler() {
return scheduler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected void taskOperation(Request request, TransformTask transformTask, Actio

if (ids.contains(transformTask.getTransformId())) {
// move the call to the generic thread pool, so we do not block the network thread
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
threadPool.generic().execute(() -> {
transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(r -> {
try {
transformTask.stop(request.isForce(), request.isWaitForCheckpoint());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class TransformContext {
public interface Listener {
void shutdown();

void failureCountChanged();

void fail(String failureMessage, ActionListener<Void> listener);
}

Expand All @@ -37,7 +39,7 @@ public interface Listener {
// Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
private final AtomicLong currentCheckpoint;

TransformContext(final TransformTaskState taskState, String stateReason, long currentCheckpoint, Listener taskListener) {
TransformContext(TransformTaskState taskState, String stateReason, long currentCheckpoint, Listener taskListener) {
this.taskState = new AtomicReference<>(taskState);
this.stateReason = new AtomicReference<>(stateReason);
this.currentCheckpoint = new AtomicLong(currentCheckpoint);
Expand All @@ -49,10 +51,6 @@ TransformTaskState getTaskState() {
return taskState.get();
}

void setTaskState(TransformTaskState newState) {
taskState.set(newState);
}

boolean setTaskState(TransformTaskState oldState, TransformTaskState newState) {
return taskState.compareAndSet(oldState, newState);
}
Expand All @@ -70,6 +68,7 @@ void setTaskStateToFailed(String reason) {
void resetReasonAndFailureCounter() {
stateReason.set(null);
failureCount.set(0);
taskListener.failureCountChanged();
}

String getStateReason() {
Expand Down Expand Up @@ -100,8 +99,10 @@ int getFailureCount() {
return failureCount.get();
}

int getAndIncrementFailureCount() {
return failureCount.getAndIncrement();
int incrementAndGetFailureCount() {
int newFailureCount = failureCount.incrementAndGet();
taskListener.failureCountChanged();
return newFailureCount;
}

void setChangesLastDetectedAt(Instant time) {
Expand Down Expand Up @@ -138,5 +139,4 @@ void markAsFailed(String failureMessage) {
failureCount.set(0);
}, e -> {}));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ void handleFailure(Exception e) {

int numFailureRetries = Optional.ofNullable(transformConfig.getSettings().getNumFailureRetries())
.orElse(context.getNumFailureRetries());
if (numFailureRetries != -1 && context.getAndIncrementFailureCount() > numFailureRetries) {
if (numFailureRetries != -1 && context.incrementAndGetFailureCount() > numFailureRetries) {
failIndexer(
"task encountered more than "
+ numFailureRetries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ private void startTask(
ActionListener<StartTransformAction.Response> listener
) {
// switch the threadpool to generic, because the caller is on the system_read threadpool
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
threadPool.generic().execute(() -> {
buildTask.initializeIndexer(indexerBuilder);
// TransformTask#start will fail if the task state is FAILED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener);
Expand Down Expand Up @@ -409,7 +409,7 @@ protected AllocatedPersistentTask createTask(
client,
persistentTask.getParams(),
(TransformState) persistentTask.getState(),
transformServices.getSchedulerEngine(),
transformServices.getScheduler(),
auditor,
threadPool,
headers
Expand Down
Loading