From e3fce77a9e9f212c6d586f415016db8f0864ab60 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 20 Dec 2021 10:34:45 +0100 Subject: [PATCH 1/2] remove the indexer threadpool and use the generic threadpool instead. the indexer threadpool was only used on start. fixes #81796 --- .../xpack/transform/Transform.java | 17 ---------- .../transforms/TransformIndexer.java | 6 ++-- .../TransformPersistentTasksExecutor.java | 31 ++++++++++++++++--- 3 files changed, 28 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 8a815340aca65..59d7f6f03567f 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -47,8 +47,6 @@ import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -123,7 +121,6 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTaskPlugin { public static final String NAME = "transform"; - public static final String TASK_THREAD_POOL_NAME = "transform_indexing"; private static final Logger logger = LogManager.getLogger(Transform.class); @@ -201,20 +198,6 @@ public List getRestHandlers( ); } - @Override - public List> getExecutorBuilders(Settings settingsToUse) { - FixedExecutorBuilder indexing = new FixedExecutorBuilder( - settingsToUse, - TASK_THREAD_POOL_NAME, - 4, - 4, - "transform.task_thread_pool", - false - ); - - return Collections.singletonList(indexing); - } - @Override public Collection createComponents( Client client, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 592ad6c6e3dec..9273ebc079d90 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -571,7 +571,7 @@ protected IterationResult doProcess(SearchResponse sea @Override public boolean maybeTriggerAsyncJob(long now) { - boolean triggered; + // threadpool: trigger_engine_scheduler if triggered from the scheduler if (context.getTaskState() == TransformTaskState.FAILED) { logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getJobId()); @@ -602,10 +602,8 @@ public boolean maybeTriggerAsyncJob(long now) { return false; } - triggered = super.maybeTriggerAsyncJob(now); + return super.maybeTriggerAsyncJob(now); } - - return triggered; } /** diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 37c497fba3580..c92e6498f8709 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -83,7 +83,7 @@ public TransformPersistentTasksExecutor( Settings settings, IndexNameExpressionResolver resolver ) { - super(TransformField.TASK_NAME, Transform.TASK_THREAD_POOL_NAME); + super(TransformField.TASK_NAME, ThreadPool.Names.GENERIC); this.client = client; this.transformServices = transformServices; this.threadPool = threadPool; @@ -100,6 +100,13 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( Collection candidateNodes, ClusterState clusterState ) { + /* Note: + * + * This method is executed on the _master_ node. The master and transform node might be on a different version. + * Therefore certain checks must happen on the corresponding node, e.g. the existence of the internal index. + * + * Operations on the transform node happen in {@link #nodeOperation()} + */ if (TransformMetadata.getTransformMetadata(clusterState).isResetMode()) { return new PersistentTasksCustomMetadata.Assignment( null, @@ -160,6 +167,12 @@ static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat @Override protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTaskParams params, PersistentTaskState state) { + /* Note: + * + * This method is executed on the _transform_ node. The master and transform node might be on a different version. + * Operations on master happen in {@link #getAssignment()} + */ + final String transformId = params.getId(); final TransformTask buildTask = (TransformTask) task; // NOTE: TransformPersistentTasksExecutor#createTask pulls in the stored task state from the ClusterState when the object @@ -188,6 +201,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa // <6> load next checkpoint ActionListener getTransformNextCheckpointListener = ActionListener.wrap(nextCheckpoint -> { + // threadpool: system_read if (nextCheckpoint.isEmpty()) { // extra safety: reset position and progress if next checkpoint is empty @@ -211,8 +225,9 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa // <5> load last checkpoint ActionListener getTransformLastCheckpointListener = ActionListener.wrap(lastCheckpoint -> { - indexerBuilder.setLastCheckpoint(lastCheckpoint); + // threadpool: system_read + indexerBuilder.setLastCheckpoint(lastCheckpoint); logger.trace("[{}] Loaded last checkpoint [{}], looking for next checkpoint", transformId, lastCheckpoint.getCheckpoint()); transformServices.getConfigManager() .getTransformCheckpoint(transformId, lastCheckpoint.getCheckpoint() + 1, getTransformNextCheckpointListener); @@ -227,6 +242,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa // Schedule execution regardless ActionListener> transformStatsActionListener = ActionListener.wrap( stateAndStatsAndSeqNoPrimaryTermAndIndex -> { + // threadpool: system_read + TransformStoredDoc stateAndStats = stateAndStatsAndSeqNoPrimaryTermAndIndex.v1(); SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = stateAndStatsAndSeqNoPrimaryTermAndIndex.v2(); // Since we have not set the value for this yet, it SHOULD be null @@ -272,6 +289,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa // <3> Validate the transform, assigning it to the indexer, and get the previous stats (if they exist) ActionListener getTransformConfigListener = ActionListener.wrap(config -> { + // threadpool: system_read // fail if a transform is too old, this can only happen on a rolling upgrade if (config.getVersion() == null || config.getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) { @@ -368,9 +386,12 @@ private void startTask( Long previousCheckpoint, ActionListener listener ) { - buildTask.initializeIndexer(indexerBuilder); - // TransformTask#start will fail if the task state is FAILED - buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener); + // switch the threadpool to generic, because the caller is on the system_read threadpool + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + buildTask.initializeIndexer(indexerBuilder); + // TransformTask#start will fail if the task state is FAILED + buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener); + }); } private void setNumFailureRetries(int numFailureRetries) { From 53f0dff8ed38f0d73e24c1f37ed6d89ad1e92f07 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 20 Dec 2021 17:15:11 +0100 Subject: [PATCH 2/2] Update TransformIndexer.java --- .../xpack/transform/transforms/TransformIndexer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 9273ebc079d90..dc4c7b5fa47fe 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -571,7 +571,7 @@ protected IterationResult doProcess(SearchResponse sea @Override public boolean maybeTriggerAsyncJob(long now) { - // threadpool: trigger_engine_scheduler if triggered from the scheduler + // threadpool: trigger_engine_scheduler if triggered from the scheduler, generic if called from the task on start if (context.getTaskState() == TransformTaskState.FAILED) { logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getJobId());