diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParamsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParamsTests.java new file mode 100644 index 0000000000000..d068872096250 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParamsTests.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.transforms; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase; + +import java.io.IOException; + +public class TransformTaskParamsTests extends AbstractSerializingTransformTestCase { + + private static TransformTaskParams randomTransformTaskParams() { + return new TransformTaskParams( + randomAlphaOfLengthBetween(1, 10), + randomBoolean() ? VersionUtils.randomVersion(random()) : null, + randomBoolean() ? TimeValue.timeValueSeconds(randomLongBetween(1, 24 * 60 * 60)) : null, + randomBoolean() + ); + } + + @Override + protected TransformTaskParams doParseInstance(XContentParser parser) throws IOException { + return TransformTaskParams.PARSER.apply(parser, null); + } + + @Override + protected TransformTaskParams createTestInstance() { + return randomTransformTaskParams(); + } + + @Override + protected Reader instanceReader() { + return TransformTaskParams::new; + } +} diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java index 84a53f89cb141..2eb4567752869 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java @@ -42,6 +42,8 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsInRelativeOrder; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; /** * Test runner for testing continuous transforms, testing @@ -135,7 +137,7 @@ public void removePipelines() throws IOException { deletePipeline(ContinuousTestCase.INGEST_PIPELINE); } - public void testContinousEvents() throws Exception { + public void testContinuousEvents() throws Exception { String sourceIndexName = ContinuousTestCase.CONTINUOUS_EVENTS_SOURCE_INDEX; DecimalFormat numberFormat = new DecimalFormat("000", new DecimalFormatSymbols(Locale.ROOT)); String dateType = randomBoolean() ? "date_nanos" : "date"; @@ -257,7 +259,6 @@ public void testContinousEvents() throws Exception { // start all transforms, wait until the processed all data and stop them startTransforms(); - waitUntilTransformsProcessedNewData(ContinuousTestCase.SYNC_DELAY, run); stopTransforms(); @@ -481,16 +482,20 @@ private void waitUntilTransformsProcessedNewData(TimeValue delay, int iteration) for (ContinuousTestCase testCase : transformTestCases) { assertBusy(() -> { var stats = getTransformStats(testCase.getName()); - long lastSearchTime = (long) XContentMapValues.extractValue("checkpointing.last_search_time", stats); + Object lastSearchTimeObj = XContentMapValues.extractValue("checkpointing.last_search_time", stats); + assertThat(lastSearchTimeObj, is(notNullValue())); + long lastSearchTime = (long) lastSearchTimeObj; assertThat( "transform [" + testCase.getName() - + "] does not progress, state: " + + "] does not progress, iteration: " + + iteration + + ", state: " + stats.get("state") + ", reason: " + stats.get("reason"), Instant.ofEpochMilli(lastSearchTime), - greaterThan(waitUntil) + is(greaterThan(waitUntil)) ); }, 30, TimeUnit.SECONDS); } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java index 40c9ac686d027..317fad1f99aa3 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java @@ -40,9 +40,9 @@ public void testUsage() throws Exception { assertTrue((boolean) XContentMapValues.extractValue("transform.available", usageAsMap)); assertTrue((boolean) XContentMapValues.extractValue("transform.enabled", usageAsMap)); // no transforms, no stats - assertNull(XContentMapValues.extractValue("transform.transforms", usageAsMap)); - assertNull(XContentMapValues.extractValue("transform.feature_counts", usageAsMap)); - assertNull(XContentMapValues.extractValue("transform.stats", usageAsMap)); + assertNull("full usage response: " + usageAsMap, XContentMapValues.extractValue("transform.transforms", usageAsMap)); + assertNull("full usage response: " + usageAsMap, XContentMapValues.extractValue("transform.feature_counts", usageAsMap)); + assertNull("full usage response: " + usageAsMap, XContentMapValues.extractValue("transform.stats", usageAsMap)); // create transforms createPivotReviewsTransform("test_usage", "pivot_reviews", null); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index adb538d125c87..f6c396e887862 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -55,7 +55,6 @@ import org.elasticsearch.xpack.core.action.SetResetModeActionRequest; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider; @@ -105,6 +104,7 @@ import org.elasticsearch.xpack.transform.rest.action.RestUpdateTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestUpgradeTransformsAction; import org.elasticsearch.xpack.transform.transforms.TransformPersistentTasksExecutor; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; import java.io.IOException; import java.io.UncheckedIOException; @@ -133,7 +133,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa private final SetOnce transformServices = new SetOnce<>(); public static final Integer DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500); - public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000); + public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueSeconds(60); public static final int DEFAULT_FAILURE_RETRIES = 10; // How many times the transform task can retry on a non-critical failure. @@ -148,6 +148,16 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa Setting.Property.Dynamic ); + public static final TimeValue DEFAULT_SCHEDULER_FREQUENCY = TimeValue.timeValueSeconds(5); + // How often does the transform scheduler process the tasks + public static final Setting SCHEDULER_FREQUENCY = Setting.timeSetting( + "xpack.transform.transform_scheduler_frequency", + DEFAULT_SCHEDULER_FREQUENCY, + TimeValue.timeValueSeconds(1), + TimeValue.timeValueMinutes(1), + Setting.Property.NodeScope + ); + public Transform(Settings settings) { this.settings = settings; } @@ -230,14 +240,16 @@ public Collection createComponents( xContentRegistry ); TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName(), clusterService); + Clock clock = Clock.systemUTC(); TransformCheckpointService checkpointService = new TransformCheckpointService( - Clock.systemUTC(), + clock, settings, clusterService, configManager, auditor ); - SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC()); + TransformScheduler scheduler = new TransformScheduler(clock, threadPool, settings); + scheduler.start(); transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler)); @@ -269,13 +281,13 @@ public List> getPersistentTasksExecutor( @Override public List> getSettings() { - return List.of(NUM_FAILURE_RETRIES_SETTING); + return List.of(NUM_FAILURE_RETRIES_SETTING, SCHEDULER_FREQUENCY); } @Override public void close() { if (transformServices.get() != null) { - transformServices.get().getSchedulerEngine().stop(); + transformServices.get().getScheduler().stop(); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java index 8592e589af0fb..a7b5aec64bfae 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java @@ -7,10 +7,10 @@ package org.elasticsearch.xpack.transform; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; import java.util.Objects; @@ -25,18 +25,18 @@ public final class TransformServices { private final TransformConfigManager configManager; private final TransformCheckpointService checkpointService; private final TransformAuditor auditor; - private final SchedulerEngine schedulerEngine; + private final TransformScheduler scheduler; public TransformServices( - TransformConfigManager transformConfigManager, + TransformConfigManager configManager, TransformCheckpointService checkpointService, - TransformAuditor transformAuditor, - SchedulerEngine schedulerEngine + TransformAuditor auditor, + TransformScheduler scheduler ) { - this.configManager = Objects.requireNonNull(transformConfigManager); + this.configManager = Objects.requireNonNull(configManager); this.checkpointService = Objects.requireNonNull(checkpointService); - this.auditor = Objects.requireNonNull(transformAuditor); - this.schedulerEngine = Objects.requireNonNull(schedulerEngine); + this.auditor = Objects.requireNonNull(auditor); + this.scheduler = Objects.requireNonNull(scheduler); } public TransformConfigManager getConfigManager() { @@ -51,7 +51,7 @@ public TransformAuditor getAuditor() { return auditor; } - public SchedulerEngine getSchedulerEngine() { - return schedulerEngine; + public TransformScheduler getScheduler() { + return scheduler; } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index 1581c9c487ce5..57ef3901cab17 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -234,7 +234,7 @@ protected void taskOperation(Request request, TransformTask transformTask, Actio if (ids.contains(transformTask.getTransformId())) { // move the call to the generic thread pool, so we do not block the network thread - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + threadPool.generic().execute(() -> { transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(r -> { try { transformTask.stop(request.isForce(), request.isWaitForCheckpoint()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index 1be96aa36154f..3b2a3c0be2b19 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -21,6 +21,8 @@ class TransformContext { public interface Listener { void shutdown(); + void failureCountChanged(); + void fail(String failureMessage, ActionListener listener); } @@ -37,7 +39,7 @@ public interface Listener { // Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished private final AtomicLong currentCheckpoint; - TransformContext(final TransformTaskState taskState, String stateReason, long currentCheckpoint, Listener taskListener) { + TransformContext(TransformTaskState taskState, String stateReason, long currentCheckpoint, Listener taskListener) { this.taskState = new AtomicReference<>(taskState); this.stateReason = new AtomicReference<>(stateReason); this.currentCheckpoint = new AtomicLong(currentCheckpoint); @@ -49,10 +51,6 @@ TransformTaskState getTaskState() { return taskState.get(); } - void setTaskState(TransformTaskState newState) { - taskState.set(newState); - } - boolean setTaskState(TransformTaskState oldState, TransformTaskState newState) { return taskState.compareAndSet(oldState, newState); } @@ -70,6 +68,7 @@ void setTaskStateToFailed(String reason) { void resetReasonAndFailureCounter() { stateReason.set(null); failureCount.set(0); + taskListener.failureCountChanged(); } String getStateReason() { @@ -100,8 +99,10 @@ int getFailureCount() { return failureCount.get(); } - int getAndIncrementFailureCount() { - return failureCount.getAndIncrement(); + int incrementAndGetFailureCount() { + int newFailureCount = failureCount.incrementAndGet(); + taskListener.failureCountChanged(); + return newFailureCount; } void setChangesLastDetectedAt(Instant time) { @@ -138,5 +139,4 @@ void markAsFailed(String failureMessage) { failureCount.set(0); }, e -> {})); } - } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 5293acd23c06b..17c733a4459a2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -950,7 +950,7 @@ void handleFailure(Exception e) { int numFailureRetries = Optional.ofNullable(transformConfig.getSettings().getNumFailureRetries()) .orElse(context.getNumFailureRetries()); - if (numFailureRetries != -1 && context.getAndIncrementFailureCount() > numFailureRetries) { + if (numFailureRetries != -1 && context.incrementAndGetFailureCount() > numFailureRetries) { failIndexer( "task encountered more than " + numFailureRetries diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 7397e9f6403c4..4f48176f24cca 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -381,7 +381,7 @@ private void startTask( ActionListener listener ) { // switch the threadpool to generic, because the caller is on the system_read threadpool - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + threadPool.generic().execute(() -> { buildTask.initializeIndexer(indexerBuilder); // TransformTask#start will fail if the task state is FAILED buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener); @@ -409,7 +409,7 @@ protected AllocatedPersistentTask createTask( client, persistentTask.getParams(), (TransformState) persistentTask.getState(), - transformServices.getSchedulerEngine(), + transformServices.getScheduler(), auditor, threadPool, headers diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 4c96f36b7e84f..d960c8a8f40b2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -18,7 +18,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Strings; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; @@ -28,8 +27,6 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; @@ -41,9 +38,9 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; -import org.elasticsearch.xpack.transform.Transform; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; import java.util.Arrays; import java.util.Collection; @@ -56,16 +53,15 @@ import static org.elasticsearch.xpack.core.transform.TransformMessages.CANNOT_START_FAILED_TRANSFORM; import static org.elasticsearch.xpack.core.transform.TransformMessages.CANNOT_STOP_FAILED_TRANSFORM; -public class TransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener, TransformContext.Listener { +public class TransformTask extends AllocatedPersistentTask implements TransformScheduler.Listener, TransformContext.Listener { // Default interval the scheduler sends an event if the config does not specify a frequency private static final Logger logger = LogManager.getLogger(TransformTask.class); private static final IndexerState[] RUNNING_STATES = new IndexerState[] { IndexerState.STARTED, IndexerState.INDEXING }; - public static final String SCHEDULE_NAME = TransformField.TASK_NAME + "/schedule"; private final ParentTaskAssigningClient parentTaskClient; private final TransformTaskParams transform; - private final SchedulerEngine schedulerEngine; + private final TransformScheduler transformScheduler; private final ThreadPool threadPool; private final TransformAuditor auditor; private final TransformIndexerPosition initialPosition; @@ -81,7 +77,7 @@ public TransformTask( Client client, TransformTaskParams transform, TransformState state, - SchedulerEngine schedulerEngine, + TransformScheduler transformScheduler, TransformAuditor auditor, ThreadPool threadPool, Map headers @@ -89,7 +85,7 @@ public TransformTask( super(id, type, action, TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers); this.parentTaskClient = new ParentTaskAssigningClient(client, parentTask); this.transform = transform; - this.schedulerEngine = schedulerEngine; + this.transformScheduler = transformScheduler; this.threadPool = threadPool; this.auditor = auditor; IndexerState initialState = IndexerState.STOPPED; @@ -279,10 +275,7 @@ void start(Long startingCheckpoint, ActionListener { auditor.info(transform.getId(), "Updated transform state to [" + state.getTaskState() + "]."); - long now = System.currentTimeMillis(); - // kick off the indexer - triggered(new Event(schedulerJobName(), now, now)); - registerWithSchedulerJob(); + transformScheduler.registerTransform(transform, this); listener.onResponse(new StartTransformAction.Response(true)); }, exc -> { auditor.warning( @@ -401,9 +394,10 @@ protected void init( } @Override - public void triggered(Event event) { + public void triggered(TransformScheduler.Event event) { + logger.trace(() -> format("[{}] triggered(event={}) ", getTransformId(), event)); // Ignore if event is not for this job - if (event.getJobName().equals(schedulerJobName()) == false) { + if (event.transformId().equals(getTransformId()) == false) { return; } @@ -431,7 +425,7 @@ public void triggered(Event event) { return; } - logger.debug("[{}] transform indexer schedule has triggered, state: [{}].", event.getJobName(), indexerState); + logger.debug("[{}] transform indexer schedule has triggered, state: [{}].", getTransformId(), indexerState); // if it runs for the 1st time we just do it, if not we check for changes if (context.getCheckpoint() == 0) { @@ -456,7 +450,7 @@ public boolean shouldCancelChildrenOnCancellation() { @Override public void shutdown() { logger.debug("[{}] shutdown of transform requested", transform.getId()); - deregisterSchedulerJob(); + transformScheduler.deregisterTransform(getTransformId()); markAsCompleted(); } @@ -470,6 +464,11 @@ void persistStateToClusterState(TransformState state, ActionListener listener) { synchronized (context) { @@ -500,7 +499,7 @@ public void fail(String reason, ActionListener listener) { auditor.error(transform.getId(), reason); // We should not keep retrying. Either the task will be stopped, or started // If it is started again, it is registered again. - deregisterSchedulerJob(); + transformScheduler.deregisterTransform(getTransformId()); // The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again, // we should set this flag to false. context.setShouldStopAtCheckpoint(false); @@ -542,28 +541,6 @@ TransformTask setNumFailureRetries(int numFailureRetries) { return this; } - private void registerWithSchedulerJob() { - schedulerEngine.register(this); - final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(schedulerJobName(), next()); - schedulerEngine.add(schedulerJob); - } - - private void deregisterSchedulerJob() { - schedulerEngine.remove(schedulerJobName()); - schedulerEngine.unregister(this); - } - - private String schedulerJobName() { - return TransformTask.SCHEDULE_NAME + "_" + getTransformId(); - } - - private SchedulerEngine.Schedule next() { - return (startTime, now) -> { - TimeValue frequency = transform.getFrequency(); - return now + (frequency == null ? Transform.DEFAULT_TRANSFORM_FREQUENCY.getMillis() : frequency.getMillis()); - }; - } - void initializeIndexer(ClientTransformIndexerBuilder indexerBuilder) { indexer.set(indexerBuilder.build(getThreadPool(), context)); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTask.java new file mode 100644 index 0000000000000..b91f9bd611fe5 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTask.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.transforms.scheduling; + +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xpack.transform.Transform; + +import java.time.Duration; +import java.util.Objects; + +/** + * {@link TransformScheduledTask} is a structure describing the scheduled task in the queue. + *

+ * This class is immutable. + */ +final class TransformScheduledTask { + + /** + * Minimum delay that can be applied after a failure. + */ + private static final long MIN_DELAY_MILLIS = Duration.ofSeconds(1).toMillis(); + /** + * Maximum delay that can be applied after a failure. + */ + private static final long MAX_DELAY_MILLIS = Duration.ofHours(1).toMillis(); + + private final String transformId; + private final TimeValue frequency; + private final Long lastTriggeredTimeMillis; + private final int failureCount; + private final long nextScheduledTimeMillis; + private final TransformScheduler.Listener listener; + + TransformScheduledTask( + String transformId, + TimeValue frequency, + Long lastTriggeredTimeMillis, + int failureCount, + long nextScheduledTimeMillis, + TransformScheduler.Listener listener + ) { + this.transformId = Objects.requireNonNull(transformId); + this.frequency = frequency != null ? frequency : Transform.DEFAULT_TRANSFORM_FREQUENCY; + this.lastTriggeredTimeMillis = lastTriggeredTimeMillis; + this.failureCount = failureCount; + this.nextScheduledTimeMillis = nextScheduledTimeMillis; + this.listener = Objects.requireNonNull(listener); + } + + TransformScheduledTask( + String transformId, + TimeValue frequency, + Long lastTriggeredTimeMillis, + int failureCount, + TransformScheduler.Listener listener + ) { + this( + transformId, + frequency, + lastTriggeredTimeMillis, + failureCount, + failureCount == 0 + ? lastTriggeredTimeMillis + frequency.millis() + : calculateNextScheduledTimeAfterFailure(lastTriggeredTimeMillis, failureCount), + listener + ); + } + + // Visible for testing + + /** + * Calculates the appropriate next scheduled time after a number of failures. + * This method implements exponential backoff approach. + * + * @param lastTriggeredTimeMillis the last time (in millis) the task was triggered + * @param failureCount the number of failures that happened since the task was triggered + * @return next scheduled time for a task + */ + static long calculateNextScheduledTimeAfterFailure(long lastTriggeredTimeMillis, int failureCount) { + // Math.min(failureCount, 32) is applied in order to avoid overflow. + long delayMillis = Math.min(Math.max((1L << Math.min(failureCount, 32)) * 1000, MIN_DELAY_MILLIS), MAX_DELAY_MILLIS); + return lastTriggeredTimeMillis + delayMillis; + } + + String getTransformId() { + return transformId; + } + + TimeValue getFrequency() { + return frequency; + } + + Long getLastTriggeredTimeMillis() { + return lastTriggeredTimeMillis; + } + + int getFailureCount() { + return failureCount; + } + + long getNextScheduledTimeMillis() { + return nextScheduledTimeMillis; + } + + TransformScheduler.Listener getListener() { + return listener; + } + + @Override + public boolean equals(Object other) { + if (this == other) return true; + if (other == null || getClass() != other.getClass()) return false; + TransformScheduledTask that = (TransformScheduledTask) other; + return Objects.equals(this.transformId, that.transformId) + && Objects.equals(this.frequency, that.frequency) + && Objects.equals(this.lastTriggeredTimeMillis, that.lastTriggeredTimeMillis) + && this.failureCount == that.failureCount + && this.nextScheduledTimeMillis == that.nextScheduledTimeMillis + && this.listener == that.listener; // Yes, we purposedly compare the references here + } + + @Override + public int hashCode() { + // To ensure the "equals" and "hashCode" methods have the same view on equality, we use listener's system identity here. + return Objects.hash( + transformId, + frequency, + lastTriggeredTimeMillis, + failureCount, + nextScheduledTimeMillis, + System.identityHashCode(listener) + ); + } + + @Override + public String toString() { + return new StringBuilder("TransformScheduledTask[").append("transformId=") + .append(transformId) + .append(",frequency=") + .append(frequency) + .append(",lastTriggeredTimeMillis=") + .append(lastTriggeredTimeMillis) + .append(",failureCount=") + .append(failureCount) + .append(",nextScheduledTimeMillis=") + .append(nextScheduledTimeMillis) + .append(",listener=") + .append(listener) + .append("]") + .toString(); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueue.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueue.java new file mode 100644 index 0000000000000..76c957a61a940 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueue.java @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.transforms.scheduling; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.function.Function; + +/** + * {@link TransformScheduledTaskQueue} class provides a priority queue functionality with the additional capability of quickly finding a + * task by its transform's id. + * + * The implementation of this queue is thread-safe and utilizes locks. + */ +class TransformScheduledTaskQueue { + + private static final Logger logger = LogManager.getLogger(TransformScheduledTaskQueue.class); + + /** {@link SortedSet} allows us to quickly retrieve the task with the lowest priority. */ + private final SortedSet tasks; + /** {@link Map} allows us to quickly retrieve the task by the transform's id. */ + private final Map tasksById; + + TransformScheduledTaskQueue() { + this.tasks = new TreeSet<>( + Comparator.comparing(TransformScheduledTask::getNextScheduledTimeMillis).thenComparing(TransformScheduledTask::getTransformId) + ); + this.tasksById = new HashMap<>(); + } + + /** + * @return whether the queue is empty. + */ + public synchronized boolean isEmpty() { + return tasks.isEmpty(); + } + + /** + * @return the task with the *lowest* priority. + */ + public synchronized TransformScheduledTask first() { + return tasks.first(); + } + + /** + * Adds a task to the queue. + * + * @param task task to add + * @return whether the task was added + */ + public synchronized boolean add(TransformScheduledTask task) { + String transformId = task.getTransformId(); + logger.trace("add({}): {}", transformId, task); + if (tasksById.containsKey(transformId)) { + logger.debug("add({}) is a no-op as the task for this transform already exists", transformId); + return false; + } + tasksById.put(transformId, task); + tasks.add(task); + return true; + } + + /** + * Updates the task with the given transform id. + * + * @param transformId id of the transform to update + * @param transformer function used to modify the task. Must not modify the transform id + */ + public synchronized void update(String transformId, Function transformer) { + TransformScheduledTask task = remove(transformId); + if (task == null) { + return; + } + TransformScheduledTask updatedTask = transformer.apply(task); + if (transformId.equals(updatedTask.getTransformId()) == false) { + throw new IllegalStateException("Must not modify the transform's id during update"); + } + add(updatedTask); + } + + /** + * Removes the task with the given transform id from the queue. + * + * @param transformId id of the transform to remove + * @return the removed task or {@code null} if the task does not exist + */ + public synchronized TransformScheduledTask remove(String transformId) { + logger.trace("remove({})", transformId); + TransformScheduledTask task = tasksById.remove(transformId); + if (task == null) { + logger.debug("remove({}) is a no-op as the task for this transform does not exist", transformId); + return null; + } + tasks.remove(task); + return task; + } + + // Visible for testing + /** + * @return the set of all the transform ids + * + * Should not be used in production as it creates the new set every time. + */ + synchronized Set getTransformIds() { + return Collections.unmodifiableSet(new HashSet<>(tasksById.keySet())); + } + + // Visible for testing + /** + * @return queue current contents + * + * Should not be used in production as it creates the new set every time. + */ + synchronized List listScheduledTasks() { + return tasks.stream().toList(); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java new file mode 100644 index 0000000000000..9c8d309453217 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java @@ -0,0 +1,249 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.transforms.scheduling; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Strings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; +import org.elasticsearch.xpack.transform.Transform; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * {@link TransformScheduler} class is responsible for scheduling transform tasks according to their configured frequency as well as + * retrying policy. + */ +public final class TransformScheduler { + + /** + * {@link Event} record encapsulates data that might be useful to the listener when the listener is being triggered. + */ + public record Event(String transformId, long scheduledTime, long triggeredTime) { + @Override + public String toString() { + return new StringBuilder("Event[").append("transformId=") + .append(transformId) + .append(",scheduledTime=") + .append(Instant.ofEpochMilli(scheduledTime)) + .append(",triggeredTime=") + .append(Instant.ofEpochMilli(triggeredTime)) + .append("]") + .toString(); + } + } + + /** + * {@link Listener} interface allows receiving events about the scheduled task being triggered. + */ + public interface Listener { + /** + * Called whenever scheduled task is being triggered. + * + * @param event event structure containing data that might be useful to the listener + */ + void triggered(Event event); + } + + private static final Logger logger = LogManager.getLogger(TransformScheduler.class); + + private final Clock clock; + private final ThreadPool threadPool; + private final TimeValue schedulerFrequency; + private final TransformScheduledTaskQueue scheduledTasks; + /** + * Prevents two concurrent invocations of the "processScheduledTasks" method. + * + * Set to {@code true} before processing + * Set to {@code false} after processing (doesn't matter whether successful or not). + */ + private final AtomicBoolean isProcessingActive; + /** + * Stored the scheduled execution for future cancellation. + */ + private Scheduler.Cancellable scheduledFuture; + + public TransformScheduler(Clock clock, ThreadPool threadPool, Settings settings) { + this.clock = Objects.requireNonNull(clock); + this.threadPool = Objects.requireNonNull(threadPool); + this.schedulerFrequency = Transform.SCHEDULER_FREQUENCY.get(settings); + this.scheduledTasks = new TransformScheduledTaskQueue(); + this.isProcessingActive = new AtomicBoolean(); + } + + /** + * Starts the processing. + * Method {@code processScheduledTasks} is invoked according to its configured {@code schedulerFrequency}. + */ + public void start() { + if (scheduledFuture == null) { + scheduledFuture = threadPool.scheduleWithFixedDelay(this::processScheduledTasks, schedulerFrequency, ThreadPool.Names.GENERIC); + } + } + + // Visible for testing + void processScheduledTasks() { + // Prevent two concurrent invocations of the "processScheduledTasks" method + if (isProcessingActive.compareAndSet(false, true) == false) { + return; + } + logger.trace("Processing scheduled tasks started"); + final boolean isTraceEnabled = logger.isTraceEnabled(); + Instant processingStarted = isTraceEnabled ? clock.instant() : null; + final boolean taskWasProcessed; + try { + taskWasProcessed = processScheduledTasksInternal(); + } finally { + // Make sure we clear the "isProcessingActive" bit regardless of whether there was a success or failure. + // Otherwise, the processing would be stuck forever. + isProcessingActive.set(false); + } + if (isTraceEnabled) { + Instant processingFinished = clock.instant(); + logger.trace( + Strings.format( + "Processing scheduled tasks finished, took {}ms", + Duration.between(processingStarted, processingFinished).toMillis() + ) + ); + } + if (taskWasProcessed == false) { + return; + } + // If we happened to process the task, there may be other tasks also eligible for processing. + // We try to process them ASAP as we don't want to wait the `delay` for every task. + // Tail call optimization is enforced by making the following call the last call in this method. + processScheduledTasks(); + } + + private boolean processScheduledTasksInternal() { + if (scheduledTasks.isEmpty()) { + // There are no scheduled tasks, hence, nothing to do + return false; + } + long currentTimeMillis = clock.millis(); + TransformScheduledTask scheduledTask = scheduledTasks.first(); + // Check if the task is eligible for processing + if (currentTimeMillis < scheduledTask.getNextScheduledTimeMillis()) { + // It is too early to process this task. + // Consequently, it is too early to process other tasks because the tasks are sorted by their next scheduled time. + // Try again later. + return false; + } + // Create event that will be sent to the listener + Event event = new Event(scheduledTask.getTransformId(), scheduledTask.getNextScheduledTimeMillis(), currentTimeMillis); + // Trigger the listener + scheduledTask.getListener().triggered(event); + // Update the task's last_triggered_time to current time (next_scheduled_time gets automatically re-calculated) + scheduledTasks.update(scheduledTask.getTransformId(), task -> { + if (task.equals(scheduledTask) == false) { + logger.debug( + () -> Strings.format( + "[{}] task object got modified while processing. Expected: {}, was: {}", + scheduledTask.getTransformId(), + scheduledTask, + task + ) + ); + } + return new TransformScheduledTask( + task.getTransformId(), + task.getFrequency(), + currentTimeMillis, + task.getFailureCount(), + task.getListener() + ); + }); + return true; + } + + /** + * Stops the processing. + */ + public void stop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + scheduledFuture = null; + } + } + + /** + * Registers and triggers the transform. + * The transform is registered by adding it to the queue and then the transform is triggered immediately so that it does not have to + * wait until the next scheduled run. + * + * @param transformTaskParams structure containing transform's id and frequency + * @param listener listener to be triggered + */ + public void registerTransform(TransformTaskParams transformTaskParams, Listener listener) { + String transformId = transformTaskParams.getId(); + logger.trace(() -> Strings.format("[{}] register the transform", transformId)); + long currentTimeMillis = clock.millis(); + TransformScheduledTask transformScheduledTask = new TransformScheduledTask( + transformId, + transformTaskParams.getFrequency(), + null, // this task has not been triggered yet + 0, // this task has not failed yet + currentTimeMillis, // we schedule this task at current clock time so that it is processed ASAP + listener + ); + scheduledTasks.add(transformScheduledTask); + processScheduledTasks(); + } + + /** + * Updates the transform task's failure count. + * Updating the failure count affects the task's next_scheduled_time and may result in the task being processed earlier that it would + * normally (i.e.: according to its frequency) be. + * + * @param transformId id of the transform to update + * @param failureCount new value of transform task's failure count + */ + public void handleTransformFailureCountChanged(String transformId, int failureCount) { + logger.trace(() -> Strings.format("[{}] handle transform failure count change to {}", transformId, failureCount)); + // Update the task's failure count (next_scheduled_time gets automatically re-calculated) + scheduledTasks.update( + transformId, + task -> new TransformScheduledTask( + task.getTransformId(), + task.getFrequency(), + task.getLastTriggeredTimeMillis(), + failureCount, + task.getListener() + ) + ); + } + + /** + * De-registers the given transform by removing it from the queue. + * + * @param transformId id of the transform to de-register + */ + public void deregisterTransform(String transformId) { + Objects.requireNonNull(transformId); + logger.trace(() -> Strings.format("[{}] de-register the transform", transformId)); + scheduledTasks.remove(transformId); + } + + // Visible for testing + /** + * @return queue current contents + */ + List getTransformScheduledTasks() { + return scheduledTasks.listScheduledTasks(); + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index 928e972fae54a..4ffa628127321 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.search.SearchContextMissingException; @@ -37,7 +38,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.indexing.IndexerState; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -52,7 +52,9 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; +import java.time.Clock; import java.time.Instant; import java.util.Arrays; import java.util.Collections; @@ -132,7 +134,7 @@ public void testPitInjection() throws InterruptedException { mock(IndexBasedTransformConfigManager.class), mock(TransformCheckpointService.class), mock(TransformAuditor.class), - mock(SchedulerEngine.class) + new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY) ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -232,7 +234,7 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException { mock(IndexBasedTransformConfigManager.class), mock(TransformCheckpointService.class), mock(TransformAuditor.class), - mock(SchedulerEngine.class) + new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY) ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -307,7 +309,7 @@ public void testDisablePit() throws InterruptedException { mock(IndexBasedTransformConfigManager.class), mock(TransformCheckpointService.class), mock(TransformAuditor.class), - mock(SchedulerEngine.class) + new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY) ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), @@ -538,7 +540,7 @@ private ClientTransformIndexer createTestIndexer(Client client) { mock(IndexBasedTransformConfigManager.class), mock(TransformCheckpointService.class), mock(TransformAuditor.class), - mock(SchedulerEngine.class) + new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY) ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java new file mode 100644 index 0000000000000..89c3064226a26 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.transforms; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.junit.After; +import org.junit.Before; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class TransformContextTests extends ESTestCase { + + private TransformContext.Listener listener; + + @Before + public void setUpMocks() { + listener = mock(TransformContext.Listener.class); + } + + @After + public void verifyNoMoreInteractionsOnMocks() { + verifyNoMoreInteractions(listener); + } + + public void testFailureCount() { + TransformContext context = new TransformContext(null, null, 0, listener); + assertThat(context.incrementAndGetFailureCount(), is(equalTo(1))); + assertThat(context.getFailureCount(), is(equalTo(1))); + assertThat(context.incrementAndGetFailureCount(), is(equalTo(2))); + assertThat(context.getFailureCount(), is(equalTo(2))); + context.resetReasonAndFailureCounter(); + assertThat(context.getFailureCount(), is(equalTo(0))); + + // Verify that the listener is notified every time the failure count is incremented or reset + verify(listener, times(3)).failureCountChanged(); + } + + public void testCheckpoint() { + TransformContext context = new TransformContext(null, null, 13, listener); + assertThat(context.getCheckpoint(), is(equalTo(13L))); + assertThat(context.incrementAndGetCheckpoint(), is(equalTo(14L))); + assertThat(context.getCheckpoint(), is(equalTo(14L))); + context.setCheckpoint(25); + assertThat(context.getCheckpoint(), is(equalTo(25L))); + assertThat(context.incrementAndGetCheckpoint(), is(equalTo(26L))); + assertThat(context.getCheckpoint(), is(equalTo(26L))); + } + + public void testTaskState() { + TransformContext context = new TransformContext(TransformTaskState.STARTED, null, 0, listener); + assertThat(context.getTaskState(), is(equalTo(TransformTaskState.STARTED))); + assertThat(context.setTaskState(TransformTaskState.STOPPED, TransformTaskState.STOPPED), is(false)); + assertThat(context.getTaskState(), is(equalTo(TransformTaskState.STARTED))); + assertThat(context.setTaskState(TransformTaskState.STARTED, TransformTaskState.STOPPED), is(true)); + assertThat(context.getTaskState(), is(equalTo(TransformTaskState.STOPPED))); + context.resetTaskState(); + assertThat(context.getTaskState(), is(equalTo(TransformTaskState.STARTED))); + context.setTaskStateToFailed(null); + assertThat(context.getTaskState(), is(equalTo(TransformTaskState.FAILED))); + } + + public void testStateReason() { + TransformContext context = new TransformContext(TransformTaskState.STARTED, null, 0, listener); + assertThat(context.getStateReason(), is(nullValue())); + context.setTaskStateToFailed("some-reason"); + assertThat(context.getStateReason(), is(equalTo("some-reason"))); + context.setTaskStateToFailed("some-other-reason"); + assertThat(context.getStateReason(), is(equalTo("some-other-reason"))); + context.resetTaskState(); + assertThat(context.getStateReason(), is(nullValue())); + context.setTaskStateToFailed("yet-another-reason"); + assertThat(context.getStateReason(), is(equalTo("yet-another-reason"))); + context.resetReasonAndFailureCounter(); + assertThat(context.getStateReason(), is(nullValue())); + + verify(listener).failureCountChanged(); + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index 039c9c6ce2c66..3acb33985817f 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.breaker.CircuitBreaker.Durability; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -38,7 +39,6 @@ import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; @@ -56,11 +56,13 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; import org.junit.After; import org.junit.Before; import java.io.PrintWriter; import java.io.StringWriter; +import java.time.Clock; import java.util.Collections; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -127,7 +129,7 @@ static class MockedTransformIndexer extends ClientTransformIndexer { transformsConfigManager, mock(TransformCheckpointService.class), auditor, - mock(SchedulerEngine.class) + new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY) ), checkpointProvider, initialState, @@ -896,13 +898,13 @@ private void testHandleFailure( context ); - for (int i = 0; i <= expectedEffectiveNumFailureRetries; ++i) { + for (int i = 0; i < expectedEffectiveNumFailureRetries; ++i) { indexer.handleFailure(new Exception("exception no. " + (i + 1))); assertFalse(failIndexerCalled.get()); assertThat(failureMessage.get(), is(nullValue())); assertThat(context.getFailureCount(), is(equalTo(i + 1))); } - indexer.handleFailure(new Exception("exception no. " + (expectedEffectiveNumFailureRetries + 2))); + indexer.handleFailure(new Exception("exception no. " + (expectedEffectiveNumFailureRetries + 1))); assertTrue(failIndexerCalled.get()); assertThat( failureMessage.get(), @@ -911,11 +913,11 @@ private void testHandleFailure( "task encountered more than " + expectedEffectiveNumFailureRetries + " failures; latest failure: exception no. " - + (expectedEffectiveNumFailureRetries + 2) + + (expectedEffectiveNumFailureRetries + 1) ) ) ); - assertThat(context.getFailureCount(), is(equalTo(expectedEffectiveNumFailureRetries + 2))); + assertThat(context.getFailureCount(), is(equalTo(expectedEffectiveNumFailureRetries + 1))); auditor.assertAllExpectationsMatched(); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 854648e53c0b0..90f5ee5a14735 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollTask; @@ -34,7 +35,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -51,9 +51,11 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; import org.junit.After; import org.junit.Before; +import java.time.Clock; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -191,7 +193,7 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener throw new IllegalStateException(e); } } - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> nextPhase.onResponse(ONE_HIT_SEARCH_RESPONSE)); + threadPool.generic().execute(() -> nextPhase.onResponse(ONE_HIT_SEARCH_RESPONSE)); } @Override @@ -199,8 +201,7 @@ protected void doNextBulk(BulkRequest request, ActionListener next if (doProcessLatch != null) { doProcessLatch.countDown(); } - threadPool.executor(ThreadPool.Names.GENERIC) - .execute(() -> nextPhase.onResponse(new BulkResponse(new BulkItemResponse[0], 100))); + threadPool.generic().execute(() -> nextPhase.onResponse(new BulkResponse(new BulkItemResponse[0], 100))); } @Override @@ -581,7 +582,7 @@ private void setStopAtCheckpoint( ) { // we need to simulate that this is called from the task, which offloads it to the generic threadpool CountDownLatch latch = new CountDownLatch(1); - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + threadPool.generic().execute(() -> { indexer.setStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener); latch.countDown(); }); @@ -625,7 +626,7 @@ private MockedTransformIndexer createMockIndexer( transformConfigManager, mock(TransformCheckpointService.class), transformAuditor, - mock(SchedulerEngine.class) + new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY) ); MockedTransformIndexer indexer = new MockedTransformIndexer( diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 9f7a022ec0405..d71b3dd759539 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollTask; @@ -34,7 +35,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfigTests; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -51,9 +51,11 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; import org.junit.After; import org.junit.Before; +import java.time.Clock; import java.util.Collections; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -210,7 +212,7 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener throw new IllegalStateException(e); } } - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> nextPhase.onResponse(ONE_HIT_SEARCH_RESPONSE)); + threadPool.generic().execute(() -> nextPhase.onResponse(ONE_HIT_SEARCH_RESPONSE)); } @Override @@ -218,8 +220,7 @@ protected void doNextBulk(BulkRequest request, ActionListener next if (doProcessLatch != null) { doProcessLatch.countDown(); } - threadPool.executor(ThreadPool.Names.GENERIC) - .execute(() -> nextPhase.onResponse(new BulkResponse(new BulkItemResponse[0], 100))); + threadPool.generic().execute(() -> nextPhase.onResponse(new BulkResponse(new BulkItemResponse[0], 100))); } @Override @@ -446,7 +447,7 @@ private MockedTransformIndexer createMockIndexer( transformConfigManager, mock(TransformCheckpointService.class), transformAuditor, - mock(SchedulerEngine.class) + new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY) ); MockedTransformIndexer indexer = new MockedTransformIndexer( @@ -472,7 +473,7 @@ private void setStopAtCheckpoint( ) { // we need to simulate that this is called from the task, which offloads it to the generic threadpool CountDownLatch latch = new CountDownLatch(1); - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + threadPool.generic().execute(() -> { indexer.setStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener); latch.countDown(); }); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index ae608a67701fc..7d6e4924d22ed 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.elasticsearch.xpack.transform.Transform; @@ -41,6 +40,7 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; import java.time.Clock; import java.util.ArrayList; @@ -401,8 +401,10 @@ public TransformPersistentTasksExecutor buildTaskExecutor() { client, xContentRegistry() ); + Clock clock = Clock.systemUTC(); + ThreadPool threadPool = mock(ThreadPool.class); TransformCheckpointService transformCheckpointService = new TransformCheckpointService( - Clock.systemUTC(), + clock, Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), transformsConfigManager, @@ -412,7 +414,7 @@ public TransformPersistentTasksExecutor buildTaskExecutor() { transformsConfigManager, transformCheckpointService, mockAuditor, - mock(SchedulerEngine.class) + new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY) ); ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY, Collections.singleton(Transform.NUM_FAILURE_RETRIES_SETTING)); @@ -422,7 +424,7 @@ public TransformPersistentTasksExecutor buildTaskExecutor() { return new TransformPersistentTasksExecutor( client, transformServices, - mock(ThreadPool.class), + threadPool, clusterService, Settings.EMPTY, TestIndexNameExpressionResolver.newInstance() diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index a6a90293ae482..be12730af3f90 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -42,6 +41,7 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; import org.junit.After; import org.junit.Before; @@ -83,6 +83,7 @@ public void tearDownClient() { // see https://github.com/elastic/elasticsearch/issues/48957 public void testStopOnFailedTaskWithStoppedIndexer() { + Clock clock = Clock.systemUTC(); ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class)); @@ -90,7 +91,7 @@ public void testStopOnFailedTaskWithStoppedIndexer() { TransformAuditor auditor = MockTransformAuditor.createMockAuditor(); TransformConfigManager transformsConfigManager = new InMemoryTransformConfigManager(); TransformCheckpointService transformsCheckpointService = new TransformCheckpointService( - Clock.systemUTC(), + clock, Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), transformsConfigManager, @@ -100,7 +101,7 @@ public void testStopOnFailedTaskWithStoppedIndexer() { transformsConfigManager, transformsCheckpointService, auditor, - mock(SchedulerEngine.class) + new TransformScheduler(clock, threadPool, Settings.EMPTY) ); TransformState transformState = new TransformState( @@ -122,7 +123,7 @@ public void testStopOnFailedTaskWithStoppedIndexer() { client, createTransformTaskParams(transformConfig.getId()), transformState, - mock(SchedulerEngine.class), + new TransformScheduler(clock, threadPool, Settings.EMPTY), auditor, threadPool, Collections.emptyMap() @@ -200,7 +201,7 @@ public void testStopOnFailedTaskWithoutIndexer() { client, createTransformTaskParams(transformConfig.getId()), transformState, - mock(SchedulerEngine.class), + new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY), auditor, threadPool, Collections.emptyMap() diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueueTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueueTests.java new file mode 100644 index 0000000000000..42bfa7250c425 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueueTests.java @@ -0,0 +1,217 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.transforms.scheduling; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler.Event; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.fail; + +public class TransformScheduledTaskQueueTests extends ESTestCase { + + private final TransformScheduledTaskQueue queue = new TransformScheduledTaskQueue(); + private TestThreadPool threadPool; + + @Before + public void createThreadPool() { + threadPool = new TestThreadPool(getTestName()); + } + + @After + public void shutdownThreadPool() { + if (threadPool != null) { + threadPool.shutdown(); + } + } + + public void testEmptyQueue() { + assertThatQueueIsEmpty(); + } + + public void testNonEmptyQueue() { + queue.add(createTask("task-1", 5)); + assertThat(queue.isEmpty(), is(false)); + } + + public void testAddAndRemove() { + queue.add(createTask("task-1", 5)); + queue.add(createTask("task-2", 1)); + queue.add(createTask("task-3", 9)); + assertThat(queue.isEmpty(), is(false)); + assertThat(queue.getTransformIds(), containsInAnyOrder("task-1", "task-2", "task-3")); + assertThat(queue.first(), is(equalTo(createTask("task-2", 1)))); + + queue.remove("task-1"); + queue.remove("task-2"); + queue.remove("task-3"); + assertThatQueueIsEmpty(); + } + + public void testConcurrentAddAndRemove() throws Exception { + { + List> concurrentQueueAddTasks = new ArrayList<>(100); + for (int i = 0; i < 100; ++i) { + TransformScheduledTask task = createTask("task-" + i, randomLong()); + concurrentQueueAddTasks.add(() -> queue.add(task)); + } + List> futures = threadPool.generic().invokeAll(concurrentQueueAddTasks); + for (Future future : futures) { + Boolean taskAdded = future.get(); + // Verify that the added task ids were unique + assertThat(taskAdded, is(true)); + } + } + assertThat(queue.isEmpty(), is(false)); + assertThat(queue.getTransformIds(), hasSize(100)); + + { + Set removedTaskIds = new HashSet<>(); + List> concurrentQueueRemoveTasks = new ArrayList<>(100); + for (int i = 0; i < 100; ++i) { + String taskId = "task-" + i; + concurrentQueueRemoveTasks.add(() -> queue.remove(taskId)); + } + List> futures = threadPool.generic().invokeAll(concurrentQueueRemoveTasks); + for (Future future : futures) { + removedTaskIds.add(future.get().getTransformId()); + } + // Verify that the removed tasks ids were unique + assertThat(removedTaskIds, hasSize(100)); + } + assertThatQueueIsEmpty(); + } + + public void testAddNoOp() { + queue.add(createTask("task-1", 5)); + assertThat(queue.first(), is(equalTo(createTask("task-1", 5)))); + + // Try adding a task with a duplicate key + queue.add(createTask("task-1", 6)); + // Verify that the add operation had no effect + assertThat(queue.first(), is(equalTo(createTask("task-1", 5)))); + } + + public void testRemoveNoOp() { + queue.add(createTask("task-1", 5)); + queue.remove("task-non-existent"); + // Verify that the remove operation had no effect + assertThat(queue.isEmpty(), is(false)); + assertThat(queue.getTransformIds(), containsInAnyOrder("task-1")); + assertThat(queue.first(), is(equalTo(createTask("task-1", 5)))); + } + + public void testUpdateNoOp() { + queue.add(createTask("task-1", 5)); + queue.update("task-non-existent", task -> createTask(task.getTransformId(), -999)); + // Verify that the update operation had no effect + assertThat(queue.isEmpty(), is(false)); + assertThat(queue.getTransformIds(), containsInAnyOrder("task-1")); + assertThat(queue.first(), is(equalTo(createTask("task-1", 5)))); + } + + public void testUpdateModifiesId() { + queue.add(createTask("task-id", 5)); + Exception e = expectThrows(IllegalStateException.class, () -> queue.update("task-id", task -> createTask("other-id", 5))); + assertThat(e.getMessage(), is(equalTo("Must not modify the transform's id during update"))); + } + + public void testRemoveAll() { + queue.add(createTask("task-1", 5)); + queue.add(createTask("task-2", 1)); + queue.add(createTask("task-3", 9)); + queue.add(createTask("task-4", 3)); + queue.add(createTask("task-5", 7)); + queue.add(createTask("task-6", 6)); + queue.add(createTask("task-7", 0)); + queue.add(createTask("task-8", 2)); + queue.add(createTask("task-9", 4)); + assertThat(queue.isEmpty(), is(false)); + assertThat( + queue.getTransformIds(), + containsInAnyOrder("task-1", "task-2", "task-3", "task-4", "task-5", "task-6", "task-7", "task-8", "task-9") + ); + assertThat(queue.first(), is(equalTo(createTask("task-7", 0)))); + + List tasksByPriority = new ArrayList<>(); + while (queue.isEmpty() == false) { + TransformScheduledTask task = queue.first(); + tasksByPriority.add(task); + queue.remove(task.getTransformId()); + } + assertThatQueueIsEmpty(); + assertThat( + tasksByPriority, + Matchers.contains( + createTask("task-7", 0), + createTask("task-2", 1), + createTask("task-8", 2), + createTask("task-4", 3), + createTask("task-9", 4), + createTask("task-1", 5), + createTask("task-6", 6), + createTask("task-5", 7), + createTask("task-3", 9) + ) + ); + } + + public void testUpdatePriority() { + queue.add(createTask("task-1", 5)); + queue.add(createTask("task-2", 1)); + queue.add(createTask("task-3", 9)); + assertThat(queue.getTransformIds(), containsInAnyOrder("task-1", "task-2", "task-3")); + assertThat(queue.first(), is(equalTo(createTask("task-2", 1)))); + + queue.update("task-3", task -> createTask(task.getTransformId(), -999)); + assertThat(queue.getTransformIds(), containsInAnyOrder("task-1", "task-2", "task-3")); + assertThat(queue.first(), is(equalTo(createTask("task-3", -999)))); + + queue.update("task-1", task -> createTask(task.getTransformId(), 0)); + queue.remove("task-3"); + assertThat(queue.getTransformIds(), containsInAnyOrder("task-1", "task-2")); + assertThat(queue.first(), is(equalTo(createTask("task-1", 0)))); + } + + private static TransformScheduledTask createTask(String transformId, long nextScheduledTimeMillis) { + return new TransformScheduledTask( + transformId, + null, + null, + 0, + nextScheduledTimeMillis, + TransformScheduledTaskQueueTests::failUnexpectedCall + ); + } + + private static void failUnexpectedCall(Event event) { + fail("Unexpected call to listener: " + event); + } + + private void assertThatQueueIsEmpty() { + assertThat(queue.isEmpty(), is(true)); + assertThat(queue.getTransformIds(), is(empty())); + expectThrows(NoSuchElementException.class, () -> queue.first()); + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskTests.java new file mode 100644 index 0000000000000..2af5cb87e5179 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskTests.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.transforms.scheduling; + +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler.Listener; + +import java.time.Instant; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class TransformScheduledTaskTests extends ESTestCase { + + private static final String TRANSFORM_ID = "transform-id"; + private static final TimeValue FREQUENCY = TimeValue.timeValueSeconds(10); + private static final TimeValue DEFAULT_FREQUENCY = TimeValue.timeValueSeconds(60); + private static final long LAST_TRIGGERED_TIME_MILLIS = 100000L; + private static final Listener LISTENER = event -> {}; + + public void testBasics() { + TransformScheduledTask task = new TransformScheduledTask(TRANSFORM_ID, FREQUENCY, LAST_TRIGGERED_TIME_MILLIS, 0, 123, LISTENER); + assertThat(task.getTransformId(), is(equalTo(TRANSFORM_ID))); + assertThat(task.getFrequency(), is(equalTo(FREQUENCY))); + assertThat(task.getLastTriggeredTimeMillis(), is(equalTo(LAST_TRIGGERED_TIME_MILLIS))); + assertThat(task.getFailureCount(), is(equalTo(0))); + assertThat(task.getNextScheduledTimeMillis(), is(equalTo(123L))); + assertThat(task.getListener(), is(equalTo(LISTENER))); + } + + public void testDefaultFrequency() { + TransformScheduledTask task = new TransformScheduledTask(TRANSFORM_ID, null, LAST_TRIGGERED_TIME_MILLIS, 0, 0, LISTENER); + assertThat(task.getFrequency(), is(equalTo(DEFAULT_FREQUENCY))); + } + + public void testNextScheduledTimeMillis() { + { + TransformScheduledTask task = new TransformScheduledTask(TRANSFORM_ID, FREQUENCY, LAST_TRIGGERED_TIME_MILLIS, 0, 123, LISTENER); + // Verify that the explicitly-provided next scheduled time is returned when failure count is 0 + assertThat(task.getNextScheduledTimeMillis(), is(equalTo(123L))); + } + { + TransformScheduledTask task = new TransformScheduledTask(TRANSFORM_ID, FREQUENCY, LAST_TRIGGERED_TIME_MILLIS, 1, 123, LISTENER); + // Verify that the explicitly-provided next scheduled time is returned when failure count is greater than 0 + assertThat(task.getNextScheduledTimeMillis(), is(equalTo(123L))); + } + { + TransformScheduledTask task = new TransformScheduledTask(TRANSFORM_ID, FREQUENCY, LAST_TRIGGERED_TIME_MILLIS, 0, LISTENER); + // Verify that the next scheduled time is calculated properly when failure count is 0 + assertThat(task.getNextScheduledTimeMillis(), is(equalTo(110000L))); + } + { + TransformScheduledTask task = new TransformScheduledTask(TRANSFORM_ID, FREQUENCY, LAST_TRIGGERED_TIME_MILLIS, 1, LISTENER); + // Verify that the next scheduled time is calculated properly when failure count is greater than 0 + assertThat(task.getNextScheduledTimeMillis(), is(equalTo(102000L))); + } + } + + public void testCalculateNextScheduledTimeAfterFailure() { + long lastTriggeredTimeMillis = Instant.now().toEpochMilli(); + long[] expectedDelayMillis = { + 1000, // 1s + 2000, // 2s + 4000, // 4s + 8000, // 8s + 16000, // 16s + 32000, // 32s + 64000, // ~1min + 128000, // ~2min + 256000, // ~4min + 512000, // ~8.5min + 1024000, // ~17min + 2048000, // ~34min + 3600000, // 1h + 3600000, // 1h + 3600000, // 1h + 3600000 // 1h + }; + for (int failureCount = 0; failureCount < 1000; ++failureCount) { + assertThat( + "failureCount = " + failureCount, + TransformScheduledTask.calculateNextScheduledTimeAfterFailure(lastTriggeredTimeMillis, failureCount), + is(equalTo(lastTriggeredTimeMillis + expectedDelayMillis[Math.min(failureCount, expectedDelayMillis.length - 1)])) + ); + } + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java new file mode 100644 index 0000000000000..de5bc05834041 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java @@ -0,0 +1,413 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.transforms.scheduling; + +import org.elasticsearch.Version; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; +import org.elasticsearch.xpack.transform.Transform; +import org.junit.After; +import org.junit.Before; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; + +public class TransformSchedulerTests extends ESTestCase { + + private static final TimeValue TEST_SCHEDULER_FREQUENCY = TimeValue.timeValueSeconds(1); + private static final Settings SETTINGS = Settings.builder() + .put(Transform.SCHEDULER_FREQUENCY.getKey(), TEST_SCHEDULER_FREQUENCY) + .build(); + + private TestThreadPool threadPool; + + @Before + public void createThreadPool() { + threadPool = new TestThreadPool(getTestName()); + } + + @After + public void shutdownThreadPool() { + if (threadPool != null) { + threadPool.shutdown(); + } + } + + public void testScheduling() { + String transformId = "test-with-fake-clock"; + int frequencySeconds = 5; + TimeValue frequency = TimeValue.timeValueSeconds(frequencySeconds); + TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false); + FakeClock clock = new FakeClock(Instant.ofEpochMilli(0)); + CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + TransformScheduler.Listener listener = events::add; + + TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS); + transformScheduler.registerTransform(transformTaskParams, listener); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 5000, listener)) + ); + assertThat(events, hasSize(1)); + + for (int i = 0; i < frequencySeconds; ++i) { + transformScheduler.processScheduledTasks(); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 5000, listener)) + ); + assertThat(events, hasSize(1)); + clock.advanceTimeBy(Duration.ofMillis(1001)); + } + assertThat(clock.currentTime, is(equalTo(Instant.ofEpochMilli(5005)))); + + for (int i = 0; i < frequencySeconds; ++i) { + transformScheduler.processScheduledTasks(); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 5005L, 0, 10005, listener)) + ); + assertThat(events, hasSize(2)); + clock.advanceTimeBy(Duration.ofMillis(1001)); + } + assertThat(clock.currentTime, is(equalTo(Instant.ofEpochMilli(10010)))); + + for (int i = 0; i < frequencySeconds; ++i) { + transformScheduler.processScheduledTasks(); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 10010L, 0, 15010, listener)) + ); + assertThat(events, hasSize(3)); + clock.advanceTimeBy(Duration.ofMillis(1001)); + } + assertThat(clock.currentTime, is(equalTo(Instant.ofEpochMilli(15015)))); + + assertThat(events.get(0), is(equalTo(new TransformScheduler.Event(transformId, 0, 0)))); + assertThat(events.get(1), is(equalTo(new TransformScheduler.Event(transformId, 5000, 5005)))); + assertThat(events.get(2), is(equalTo(new TransformScheduler.Event(transformId, 10005, 10010)))); + + transformScheduler.deregisterTransform(transformId); + assertThat(transformScheduler.getTransformScheduledTasks(), is(empty())); + + transformScheduler.stop(); + } + + public void testSchedulingWithFailures() { + String transformId = "test-failure-with-fake-clock"; + TimeValue frequency = TimeValue.timeValueHours(1); + TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false); + FakeClock clock = new FakeClock(Instant.ofEpochMilli(0)); + CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + TransformScheduler.Listener listener = events::add; + + TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS); + transformScheduler.registerTransform(transformTaskParams, listener); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 60 * 60 * 1000, listener)) + ); + assertThat(events, hasSize(1)); + + for (int i = 0; i < 60; ++i) { + transformScheduler.processScheduledTasks(); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 60 * 60 * 1000, listener)) + ); + assertThat(events, hasSize(1)); + clock.advanceTimeBy(Duration.ofMillis(TEST_SCHEDULER_FREQUENCY.millis())); + } + assertThat(clock.currentTime, is(equalTo(Instant.ofEpochSecond(60)))); + + transformScheduler.handleTransformFailureCountChanged(transformId, 1); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 0L, 1, 2 * 1000, listener)) + ); + assertThat(events, hasSize(1)); + + transformScheduler.processScheduledTasks(); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 60 * 1000L, 1, 62 * 1000, listener)) + ); + assertThat(events, hasSize(2)); + + assertThat( + events, + contains(new TransformScheduler.Event(transformId, 0, 0), new TransformScheduler.Event(transformId, 2 * 1000, 60 * 1000)) + ); + + transformScheduler.deregisterTransform(transformId); + assertThat(transformScheduler.getTransformScheduledTasks(), is(empty())); + + transformScheduler.stop(); + } + + public void testConcurrentProcessing() throws Exception { + String transformId = "test-with-fake-clock-concurrent"; + int frequencySeconds = 5; + TimeValue frequency = TimeValue.timeValueSeconds(frequencySeconds); + TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false); + FakeClock clock = new FakeClock(Instant.ofEpochMilli(0)); + CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + TransformScheduler.Listener listener = events::add; + + TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS); + transformScheduler.registerTransform(transformTaskParams, listener); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 5000, listener)) + ); + assertThat(events, hasSize(1)); + + clock.advanceTimeBy(Duration.ofMillis(5000)); + + List> concurrentProcessingTasks = new ArrayList<>(10); + for (int i = 0; i < 10; ++i) { + concurrentProcessingTasks.add(() -> { + transformScheduler.processScheduledTasks(); + return null; + }); + } + List> futures = threadPool.generic().invokeAll(concurrentProcessingTasks); + for (Future future : futures) { + future.get(); + } + + assertThat(events, hasSize(2)); + assertThat( + events, + contains(new TransformScheduler.Event(transformId, 0, 0), new TransformScheduler.Event(transformId, 5 * 1000, 5 * 1000)) + ); + } + + public void testConcurrentModifications() { + String transformId = "test-with-fake-clock-concurrent"; + int frequencySeconds = 5; + TimeValue frequency = TimeValue.timeValueSeconds(frequencySeconds); + TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false); + FakeClock clock = new FakeClock(Instant.ofEpochMilli(0)); + CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + + TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS); + TransformScheduler.Listener taskModifyingListener = new TransformScheduler.Listener() { + private boolean firstTime = true; + + @Override + public void triggered(TransformScheduler.Event event) { + events.add(event); + assertThat(event.transformId(), is(equalTo(transformId))); + if (firstTime) { + firstTime = false; + } else { + transformScheduler.handleTransformFailureCountChanged(transformId, 666); + } + } + }; + transformScheduler.registerTransform(transformTaskParams, taskModifyingListener); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 5000, taskModifyingListener)) + ); + assertThat(events, hasSize(1)); + + clock.advanceTimeBy(Duration.ofMillis(5000)); + transformScheduler.processScheduledTasks(); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 5000L, 666, 3605000, taskModifyingListener)) + ); + assertThat(events, hasSize(2)); + assertThat( + events, + contains(new TransformScheduler.Event(transformId, 0, 0), new TransformScheduler.Event(transformId, 5 * 1000, 5 * 1000)) + ); + } + + public void testWithSystemClock() throws Exception { + String transformId = "test-with-system-clock"; + TimeValue frequency = TimeValue.timeValueSeconds(1); + TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false); + Clock clock = Clock.systemUTC(); + CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + + TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS); + transformScheduler.start(); + transformScheduler.registerTransform(transformTaskParams, events::add); + assertThat(events, hasSize(1)); + + assertBusy(() -> assertThat(events, hasSize(greaterThanOrEqualTo(3))), 20, TimeUnit.SECONDS); + + assertThat(events, hasSize(greaterThanOrEqualTo(3))); + assertThat(events.get(0).transformId(), is(equalTo(transformId))); + assertThat(events.get(1).transformId(), is(equalTo(transformId))); + assertThat(events.get(2).transformId(), is(equalTo(transformId))); + assertThat(events.get(1).scheduledTime() - events.get(0).triggeredTime(), is(equalTo(frequency.millis()))); + assertThat(events.get(2).scheduledTime() - events.get(1).triggeredTime(), is(equalTo(frequency.millis()))); + + transformScheduler.deregisterTransform(transformId); + transformScheduler.stop(); + } + + public void testScheduledTransformTaskEqualsAndHashCode() { + Supplier listenerSupplier = () -> new TransformScheduler.Listener() { + @Override + public void triggered(TransformScheduler.Event event) {} + + @Override + public boolean equals(Object o) { + return this == o; + } + + @Override + public int hashCode() { + return 123; + } + }; + TransformScheduler.Listener listener1 = listenerSupplier.get(); + TransformScheduler.Listener listener2 = listenerSupplier.get(); + TransformScheduledTask task1 = new TransformScheduledTask("transform-id", TimeValue.timeValueSeconds(10), 123L, 0, 50, listener1); + TransformScheduledTask task2 = new TransformScheduledTask("transform-id", TimeValue.timeValueSeconds(10), 123L, 0, 50, listener2); + // Verify the tasks are not equal. The equality check for listeners is performed using their identity. + assertThat(task1, is(not(equalTo(task2)))); + assertThat(task1.hashCode(), is(not(equalTo(task2.hashCode())))); + } + + public void testRegisterMultipleTransforms() { + String transformId1 = "test-register-transforms-1"; + String transformId2 = "test-register-transforms-2"; + String transformId3 = "test-register-transforms-3"; + TimeValue frequency = TimeValue.timeValueSeconds(5); + TransformTaskParams transformTaskParams1 = new TransformTaskParams(transformId1, Version.CURRENT, frequency, false); + TransformTaskParams transformTaskParams2 = new TransformTaskParams(transformId2, Version.CURRENT, frequency, false); + TransformTaskParams transformTaskParams3 = new TransformTaskParams(transformId3, Version.CURRENT, frequency, false); + FakeClock clock = new FakeClock(Instant.ofEpochMilli(0)); + CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + TransformScheduler.Listener listener = events::add; + + TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS); + transformScheduler.registerTransform(transformTaskParams1, listener); + transformScheduler.registerTransform(transformTaskParams2, listener); + transformScheduler.registerTransform(transformTaskParams3, listener); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains( + new TransformScheduledTask(transformId1, frequency, 0L, 0, 5000, listener), + new TransformScheduledTask(transformId2, frequency, 0L, 0, 5000, listener), + new TransformScheduledTask(transformId3, frequency, 0L, 0, 5000, listener) + ) + ); + assertThat(events, hasSize(3)); + assertThat(events.get(0).transformId(), is(equalTo(transformId1))); + assertThat(events.get(1).transformId(), is(equalTo(transformId2))); + assertThat(events.get(2).transformId(), is(equalTo(transformId3))); + } + + public void testMultipleTransformsEligibleForProcessingAtOnce() { + String transformId1 = "test-register-transforms-1"; + String transformId2 = "test-register-transforms-2"; + String transformId3 = "test-register-transforms-3"; + TimeValue frequency = TimeValue.timeValueSeconds(5); + TransformTaskParams transformTaskParams1 = new TransformTaskParams(transformId1, Version.CURRENT, frequency, false); + TransformTaskParams transformTaskParams2 = new TransformTaskParams(transformId2, Version.CURRENT, frequency, false); + TransformTaskParams transformTaskParams3 = new TransformTaskParams(transformId3, Version.CURRENT, frequency, false); + FakeClock clock = new FakeClock(Instant.ofEpochMilli(0)); + CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + TransformScheduler.Listener listener = events::add; + + TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS); + transformScheduler.registerTransform(transformTaskParams1, listener); + transformScheduler.registerTransform(transformTaskParams2, listener); + transformScheduler.registerTransform(transformTaskParams3, listener); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains( + new TransformScheduledTask(transformId1, frequency, 0L, 0, 5000, listener), + new TransformScheduledTask(transformId2, frequency, 0L, 0, 5000, listener), + new TransformScheduledTask(transformId3, frequency, 0L, 0, 5000, listener) + ) + ); + assertThat(events, hasSize(3)); + assertThat(events.get(0).transformId(), is(equalTo(transformId1))); + assertThat(events.get(1).transformId(), is(equalTo(transformId2))); + assertThat(events.get(2).transformId(), is(equalTo(transformId3))); + + clock.advanceTimeBy(Duration.ofSeconds(5)); // Advance time to the next scheduled time of all 3 transforms + transformScheduler.processScheduledTasks(); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains( + new TransformScheduledTask(transformId1, frequency, 5000L, 0, 10000, listener), + new TransformScheduledTask(transformId2, frequency, 5000L, 0, 10000, listener), + new TransformScheduledTask(transformId3, frequency, 5000L, 0, 10000, listener) + ) + ); + assertThat(events, hasSize(6)); + assertThat(events.get(3).transformId(), is(equalTo(transformId1))); + assertThat(events.get(4).transformId(), is(equalTo(transformId2))); + assertThat(events.get(5).transformId(), is(equalTo(transformId3))); + } + + private static class FakeClock extends Clock { + + private Instant currentTime; + + FakeClock(Instant time) { + assertThat(time, is(notNullValue())); + currentTime = time; + } + + public void setCurrentTime(Instant time) { + // We cannot go back in time. + assertThat(time, is(greaterThanOrEqualTo(currentTime))); + currentTime = time; + } + + public void advanceTimeBy(Duration duration) { + assertThat(duration, is(notNullValue())); + setCurrentTime(currentTime.plus(duration)); + } + + public Instant instant() { + return currentTime; + } + + @Override + public ZoneId getZone() { + return ZoneId.systemDefault(); + } + + @Override + public Clock withZone(ZoneId zone) { + return this; + } + } +}