Skip to content

Commit 9d90c7d

Browse files
authored
[ML] renamed DatafeedManager to DatafeedRunner (#74082) (#74090)
1 parent b2feedf commit 9d90c7d

File tree

10 files changed

+113
-113
lines changed

10 files changed

+113
-113
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.elasticsearch.license.LicenseStateListener;
1414
import org.elasticsearch.license.XPackLicenseState;
1515
import org.elasticsearch.threadpool.ThreadPool;
16-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
16+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
1717
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
1818

1919
public class InvalidLicenseEnforcer implements LicenseStateListener {
@@ -22,16 +22,16 @@ public class InvalidLicenseEnforcer implements LicenseStateListener {
2222

2323
private final ThreadPool threadPool;
2424
private final XPackLicenseState licenseState;
25-
private final DatafeedManager datafeedManager;
25+
private final DatafeedRunner datafeedRunner;
2626
private final AutodetectProcessManager autodetectProcessManager;
2727

2828
private volatile boolean licenseStateListenerRegistered;
2929

3030
InvalidLicenseEnforcer(XPackLicenseState licenseState, ThreadPool threadPool,
31-
DatafeedManager datafeedManager, AutodetectProcessManager autodetectProcessManager) {
31+
DatafeedRunner datafeedRunner, AutodetectProcessManager autodetectProcessManager) {
3232
this.threadPool = threadPool;
3333
this.licenseState = licenseState;
34-
this.datafeedManager = datafeedManager;
34+
this.datafeedRunner = datafeedRunner;
3535
this.autodetectProcessManager = autodetectProcessManager;
3636
}
3737

@@ -59,7 +59,7 @@ public void onFailure(Exception e) {
5959

6060
@Override
6161
protected void doRun() throws Exception {
62-
datafeedManager.stopAllDatafeedsOnThisNode("invalid license");
62+
datafeedRunner.stopAllDatafeedsOnThisNode("invalid license");
6363
autodetectProcessManager.closeAllJobsOnThisNode("invalid license");
6464
}
6565
});

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@
235235
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingNamedWritableProvider;
236236
import org.elasticsearch.xpack.ml.datafeed.DatafeedContextProvider;
237237
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
238-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
238+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
239239
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
240240
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
241241
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
@@ -530,7 +530,7 @@ public Set<DiscoveryNodeRole> getRoles() {
530530

531531
private final SetOnce<AutodetectProcessManager> autodetectProcessManager = new SetOnce<>();
532532
private final SetOnce<DatafeedConfigProvider> datafeedConfigProvider = new SetOnce<>();
533-
private final SetOnce<DatafeedManager> datafeedManager = new SetOnce<>();
533+
private final SetOnce<DatafeedRunner> datafeedRunner = new SetOnce<>();
534534
private final SetOnce<DataFrameAnalyticsManager> dataFrameAnalyticsManager = new SetOnce<>();
535535
private final SetOnce<DataFrameAnalyticsAuditor> dataFrameAnalyticsAuditor = new SetOnce<>();
536536
private final SetOnce<MlMemoryTracker> memoryTracker = new SetOnce<>();
@@ -759,9 +759,9 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
759759
clusterService.getNodeName());
760760
DatafeedContextProvider datafeedContextProvider = new DatafeedContextProvider(jobConfigProvider, datafeedConfigProvider,
761761
jobResultsProvider);
762-
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
762+
DatafeedRunner datafeedRunner = new DatafeedRunner(threadPool, client, clusterService, datafeedJobBuilder,
763763
System::currentTimeMillis, anomalyDetectionAuditor, autodetectProcessManager, datafeedContextProvider);
764-
this.datafeedManager.set(datafeedManager);
764+
this.datafeedRunner.set(datafeedRunner);
765765

766766
// Inference components
767767
final TrainedModelStatsService trainedModelStatsService = new TrainedModelStatsService(resultsPersisterService,
@@ -807,13 +807,13 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
807807
this.memoryTracker.set(memoryTracker);
808808
MlLifeCycleService mlLifeCycleService =
809809
new MlLifeCycleService(
810-
environment, clusterService, datafeedManager, autodetectProcessManager, dataFrameAnalyticsManager, memoryTracker);
810+
environment, clusterService, datafeedRunner, autodetectProcessManager, dataFrameAnalyticsManager, memoryTracker);
811811
MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
812812
new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService);
813813

814814
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
815815
final InvalidLicenseEnforcer enforcer =
816-
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
816+
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedRunner, autodetectProcessManager);
817817
enforcer.listenForLicenseStateChanges();
818818

819819
// Perform node startup operations
@@ -832,7 +832,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
832832
autodetectProcessManager,
833833
new MlInitializationService(settings, threadPool, clusterService, client, mlAssignmentNotifier),
834834
jobDataCountsPersister,
835-
datafeedManager,
835+
datafeedRunner,
836836
anomalyDetectionAuditor,
837837
dataFrameAnalyticsAuditor,
838838
inferenceAuditor,
@@ -865,7 +865,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
865865
memoryTracker.get(),
866866
client,
867867
expressionResolver),
868-
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get(), expressionResolver),
868+
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedRunner.get(), expressionResolver),
869869
new TransportStartDataFrameAnalyticsAction.TaskExecutor(settings,
870870
client,
871871
clusterService,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import org.elasticsearch.cluster.service.ClusterService;
1010
import org.elasticsearch.common.component.LifecycleListener;
1111
import org.elasticsearch.env.Environment;
12-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
12+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
1313
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
1414
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
1515
import org.elasticsearch.xpack.ml.process.NativeController;
@@ -22,17 +22,17 @@ public class MlLifeCycleService {
2222

2323
private final Environment environment;
2424
private final ClusterService clusterService;
25-
private final DatafeedManager datafeedManager;
25+
private final DatafeedRunner datafeedRunner;
2626
private final AutodetectProcessManager autodetectProcessManager;
2727
private final DataFrameAnalyticsManager analyticsManager;
2828
private final MlMemoryTracker memoryTracker;
2929

30-
public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedManager datafeedManager,
30+
public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedRunner datafeedRunner,
3131
AutodetectProcessManager autodetectProcessManager, DataFrameAnalyticsManager analyticsManager,
3232
MlMemoryTracker memoryTracker) {
3333
this.environment = environment;
3434
this.clusterService = clusterService;
35-
this.datafeedManager = datafeedManager;
35+
this.datafeedRunner = datafeedRunner;
3636
this.autodetectProcessManager = autodetectProcessManager;
3737
this.analyticsManager = analyticsManager;
3838
this.memoryTracker = memoryTracker;
@@ -53,8 +53,8 @@ public synchronized void stop() {
5353
// This prevents datafeeds from sending data to autodetect processes WITHOUT stopping the
5454
// datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds
5555
// could fail if they send data to a dead autodetect process.
56-
if (datafeedManager != null) {
57-
datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
56+
if (datafeedRunner != null) {
57+
datafeedRunner.isolateAllDatafeedsOnThisNodeBeforeShutdown();
5858
}
5959
NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment);
6060
if (nativeController != null) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
5858
import org.elasticsearch.xpack.ml.MachineLearning;
5959
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
60-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
60+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
6161
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
6262
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
6363
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@@ -416,12 +416,12 @@ private ElasticsearchStatusException createUnknownLicenseError(
416416
}
417417

418418
public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<StartDatafeedAction.DatafeedParams> {
419-
private final DatafeedManager datafeedManager;
419+
private final DatafeedRunner datafeedRunner;
420420
private final IndexNameExpressionResolver resolver;
421421

422-
public StartDatafeedPersistentTasksExecutor(DatafeedManager datafeedManager, IndexNameExpressionResolver resolver) {
422+
public StartDatafeedPersistentTasksExecutor(DatafeedRunner datafeedRunner, IndexNameExpressionResolver resolver) {
423423
super(MlTasks.DATAFEED_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
424-
this.datafeedManager = datafeedManager;
424+
this.datafeedRunner = datafeedRunner;
425425
this.resolver = resolver;
426426
}
427427

@@ -460,8 +460,8 @@ protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTa
460460
datafeedTask.markAsCompleted();
461461
return;
462462
}
463-
datafeedTask.datafeedManager = datafeedManager;
464-
datafeedManager.run(datafeedTask,
463+
datafeedTask.datafeedRunner = datafeedRunner;
464+
datafeedRunner.run(datafeedTask,
465465
(error) -> {
466466
if (error != null) {
467467
datafeedTask.markAsFailed(error);
@@ -486,7 +486,7 @@ public static class DatafeedTask extends AllocatedPersistentTask implements Star
486486
private final long startTime;
487487
private final Long endTime;
488488
/* only pck protected for testing */
489-
volatile DatafeedManager datafeedManager;
489+
volatile DatafeedRunner datafeedRunner;
490490

491491
DatafeedTask(long id, String type, String action, TaskId parentTaskId, StartDatafeedAction.DatafeedParams params,
492492
Map<String, String> headers) {
@@ -529,24 +529,24 @@ public boolean shouldCancelChildrenOnCancellation() {
529529
}
530530

531531
public void stop(String reason, TimeValue timeout) {
532-
if (datafeedManager != null) {
533-
datafeedManager.stopDatafeed(this, reason, timeout);
532+
if (datafeedRunner != null) {
533+
datafeedRunner.stopDatafeed(this, reason, timeout);
534534
}
535535
}
536536

537537
public void isolate() {
538-
if (datafeedManager != null) {
539-
datafeedManager.isolateDatafeed(getAllocationId());
538+
if (datafeedRunner != null) {
539+
datafeedRunner.isolateDatafeed(getAllocationId());
540540
}
541541
}
542542

543543
public Optional<GetDatafeedRunningStateAction.Response.RunningState> getRunningState() {
544-
if (datafeedManager == null) {
544+
if (datafeedRunner == null) {
545545
return Optional.empty();
546546
}
547547
return Optional.of(new GetDatafeedRunningStateAction.Response.RunningState(
548548
this.endTime == null,
549-
datafeedManager.finishedLookBack(this)
549+
datafeedRunner.finishedLookBack(this)
550550
));
551551
}
552552
}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@
5353
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
5454
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
5555

56-
public class DatafeedManager {
56+
public class DatafeedRunner {
5757

58-
private static final Logger logger = LogManager.getLogger(DatafeedManager.class);
58+
private static final Logger logger = LogManager.getLogger(DatafeedRunner.class);
5959

6060
private final Client client;
6161
private final ClusterService clusterService;
@@ -69,9 +69,9 @@ public class DatafeedManager {
6969
private final AutodetectProcessManager autodetectProcessManager;
7070
private final DatafeedContextProvider datafeedContextProvider;
7171

72-
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
73-
Supplier<Long> currentTimeSupplier, AnomalyDetectionAuditor auditor,
74-
AutodetectProcessManager autodetectProcessManager, DatafeedContextProvider datafeedContextProvider) {
72+
public DatafeedRunner(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
73+
Supplier<Long> currentTimeSupplier, AnomalyDetectionAuditor auditor,
74+
AutodetectProcessManager autodetectProcessManager, DatafeedContextProvider datafeedContextProvider) {
7575
this.client = Objects.requireNonNull(client);
7676
this.clusterService = Objects.requireNonNull(clusterService);
7777
this.threadPool = Objects.requireNonNull(threadPool);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import java.util.Map;
2828

2929
import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
30-
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig;
30+
import static org.elasticsearch.xpack.ml.datafeed.DatafeedRunnerTests.createDatafeedConfig;
3131
import static org.hamcrest.Matchers.contains;
3232
import static org.hamcrest.Matchers.equalTo;
3333
import static org.hamcrest.Matchers.nullValue;

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests;
2222
import org.elasticsearch.xpack.core.ml.job.config.Job;
2323
import org.elasticsearch.xpack.core.ml.job.config.JobState;
24-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
25-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
24+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
25+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunnerTests;
2626
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
2727

2828
import java.util.Arrays;
@@ -50,37 +50,37 @@ protected NamedXContentRegistry xContentRegistry() {
5050
}
5151

5252
public void testValidate_jobClosed() {
53-
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
53+
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
5454
PersistentTasksCustomMetadata tasks = PersistentTasksCustomMetadata.builder().build();
55-
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
55+
DatafeedConfig datafeedConfig1 = DatafeedRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
5656
Exception e = expectThrows(ElasticsearchStatusException.class,
5757
() -> TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks, xContentRegistry()));
5858
assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is closed"));
5959
}
6060

6161
public void testValidate_jobOpening() {
62-
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
62+
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
6363
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
6464
addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), null, tasksBuilder);
6565
PersistentTasksCustomMetadata tasks = tasksBuilder.build();
66-
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
66+
DatafeedConfig datafeedConfig1 = DatafeedRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
6767

6868
TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks, xContentRegistry());
6969
}
7070

7171
public void testValidate_jobOpened() {
72-
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
72+
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
7373
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
7474
addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), JobState.OPENED, tasksBuilder);
7575
PersistentTasksCustomMetadata tasks = tasksBuilder.build();
76-
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
76+
DatafeedConfig datafeedConfig1 = DatafeedRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
7777

7878
TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks, xContentRegistry());
7979
}
8080

8181
public void testDeprecationsLogged() {
82-
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
83-
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
82+
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
83+
DatafeedConfig.Builder datafeedConfig = DatafeedRunnerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
8484
DatafeedConfig config = spy(datafeedConfig.build());
8585
doReturn(Collections.singletonList("Deprecated Agg")).when(config).getAggDeprecations(any(NamedXContentRegistry.class));
8686
doReturn(Collections.singletonList("Deprecated Query")).when(config).getQueryDeprecations(any(NamedXContentRegistry.class));
@@ -94,8 +94,8 @@ public void testDeprecationsLogged() {
9494
}
9595

9696
public void testNoDeprecationsLogged() {
97-
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
98-
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
97+
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
98+
DatafeedConfig.Builder datafeedConfig = DatafeedRunnerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
9999
DatafeedConfig config = spy(datafeedConfig.build());
100100
doReturn(Collections.emptyList()).when(config).getAggDeprecations(any(NamedXContentRegistry.class));
101101
doReturn(Collections.emptyList()).when(config).getQueryDeprecations(any(NamedXContentRegistry.class));
@@ -160,10 +160,10 @@ public void testRemoteClusterVersionCheck() {
160160
public static TransportStartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action,
161161
TaskId parentTaskId,
162162
StartDatafeedAction.DatafeedParams params,
163-
DatafeedManager datafeedManager) {
163+
DatafeedRunner datafeedRunner) {
164164
TransportStartDatafeedAction.DatafeedTask task = new TransportStartDatafeedAction.DatafeedTask(id, type, action, parentTaskId,
165165
params, Collections.emptyMap());
166-
task.datafeedManager = datafeedManager;
166+
task.datafeedRunner = datafeedRunner;
167167
return task;
168168
}
169169
}

0 commit comments

Comments
 (0)