Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.license.LicenseStateListener;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;

public class InvalidLicenseEnforcer implements LicenseStateListener {
Expand All @@ -22,16 +22,16 @@ public class InvalidLicenseEnforcer implements LicenseStateListener {

private final ThreadPool threadPool;
private final XPackLicenseState licenseState;
private final DatafeedManager datafeedManager;
private final DatafeedRunner datafeedRunner;
private final AutodetectProcessManager autodetectProcessManager;

private volatile boolean licenseStateListenerRegistered;

InvalidLicenseEnforcer(XPackLicenseState licenseState, ThreadPool threadPool,
DatafeedManager datafeedManager, AutodetectProcessManager autodetectProcessManager) {
DatafeedRunner datafeedRunner, AutodetectProcessManager autodetectProcessManager) {
this.threadPool = threadPool;
this.licenseState = licenseState;
this.datafeedManager = datafeedManager;
this.datafeedRunner = datafeedRunner;
this.autodetectProcessManager = autodetectProcessManager;
}

Expand Down Expand Up @@ -59,7 +59,7 @@ public void onFailure(Exception e) {

@Override
protected void doRun() throws Exception {
datafeedManager.stopAllDatafeedsOnThisNode("invalid license");
datafeedRunner.stopAllDatafeedsOnThisNode("invalid license");
autodetectProcessManager.closeAllJobsOnThisNode("invalid license");
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigAutoUpdater;
import org.elasticsearch.xpack.ml.datafeed.DatafeedContextProvider;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
Expand Down Expand Up @@ -540,7 +540,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet

private final SetOnce<AutodetectProcessManager> autodetectProcessManager = new SetOnce<>();
private final SetOnce<DatafeedConfigProvider> datafeedConfigProvider = new SetOnce<>();
private final SetOnce<DatafeedManager> datafeedManager = new SetOnce<>();
private final SetOnce<DatafeedRunner> datafeedRunner = new SetOnce<>();
private final SetOnce<DataFrameAnalyticsManager> dataFrameAnalyticsManager = new SetOnce<>();
private final SetOnce<DataFrameAnalyticsAuditor> dataFrameAnalyticsAuditor = new SetOnce<>();
private final SetOnce<MlMemoryTracker> memoryTracker = new SetOnce<>();
Expand Down Expand Up @@ -771,9 +771,9 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
clusterService.getNodeName());
DatafeedContextProvider datafeedContextProvider = new DatafeedContextProvider(jobConfigProvider, datafeedConfigProvider,
jobResultsProvider);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
DatafeedRunner datafeedRunner = new DatafeedRunner(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, anomalyDetectionAuditor, autodetectProcessManager, datafeedContextProvider);
this.datafeedManager.set(datafeedManager);
this.datafeedRunner.set(datafeedRunner);

// Inference components
final TrainedModelStatsService trainedModelStatsService = new TrainedModelStatsService(resultsPersisterService,
Expand Down Expand Up @@ -820,7 +820,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
this.memoryTracker.set(memoryTracker);
MlLifeCycleService mlLifeCycleService =
new MlLifeCycleService(
clusterService, datafeedManager, mlController, autodetectProcessManager, dataFrameAnalyticsManager, memoryTracker);
clusterService, datafeedRunner, mlController, autodetectProcessManager, dataFrameAnalyticsManager, memoryTracker);
MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService);

Expand All @@ -829,7 +829,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
clusterService.addListener(mlAutoUpdateService);
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
final InvalidLicenseEnforcer enforcer =
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedRunner, autodetectProcessManager);
enforcer.listenForLicenseStateChanges();

// Perform node startup operations
Expand All @@ -849,7 +849,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
autodetectProcessManager,
new MlInitializationService(settings, threadPool, clusterService, client, mlAssignmentNotifier),
jobDataCountsPersister,
datafeedManager,
datafeedRunner,
anomalyDetectionAuditor,
dataFrameAnalyticsAuditor,
inferenceAuditor,
Expand Down Expand Up @@ -883,7 +883,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
memoryTracker.get(),
client,
expressionResolver),
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get(), expressionResolver),
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedRunner.get(), expressionResolver),
new TransportStartDataFrameAnalyticsAction.TaskExecutor(settings,
client,
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.process.MlController;
Expand All @@ -19,16 +19,16 @@

public class MlLifeCycleService {

private final DatafeedManager datafeedManager;
private final DatafeedRunner datafeedRunner;
private final MlController mlController;
private final AutodetectProcessManager autodetectProcessManager;
private final DataFrameAnalyticsManager analyticsManager;
private final MlMemoryTracker memoryTracker;

MlLifeCycleService(ClusterService clusterService, DatafeedManager datafeedManager, MlController mlController,
MlLifeCycleService(ClusterService clusterService, DatafeedRunner datafeedRunner, MlController mlController,
AutodetectProcessManager autodetectProcessManager, DataFrameAnalyticsManager analyticsManager,
MlMemoryTracker memoryTracker) {
this.datafeedManager = Objects.requireNonNull(datafeedManager);
this.datafeedRunner = Objects.requireNonNull(datafeedRunner);
this.mlController = Objects.requireNonNull(mlController);
this.autodetectProcessManager = Objects.requireNonNull(autodetectProcessManager);
this.analyticsManager = Objects.requireNonNull(analyticsManager);
Expand All @@ -47,7 +47,7 @@ public synchronized void stop() {
analyticsManager.markNodeAsShuttingDown();
// This prevents datafeeds from sending data to autodetect processes WITHOUT stopping the datafeeds, so they get reassigned.
// We have to do this first, otherwise the datafeeds could fail if they send data to a dead autodetect process.
datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
datafeedRunner.isolateAllDatafeedsOnThisNodeBeforeShutdown();
// This kills autodetect processes WITHOUT closing the jobs, so they get reassigned.
autodetectProcessManager.killAllProcessesOnThisNode();
mlController.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
Expand Down Expand Up @@ -417,12 +417,12 @@ private ElasticsearchStatusException createUnknownLicenseError(
}

public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<StartDatafeedAction.DatafeedParams> {
private final DatafeedManager datafeedManager;
private final DatafeedRunner datafeedRunner;
private final IndexNameExpressionResolver resolver;

public StartDatafeedPersistentTasksExecutor(DatafeedManager datafeedManager, IndexNameExpressionResolver resolver) {
public StartDatafeedPersistentTasksExecutor(DatafeedRunner datafeedRunner, IndexNameExpressionResolver resolver) {
super(MlTasks.DATAFEED_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.datafeedManager = datafeedManager;
this.datafeedRunner = datafeedRunner;
this.resolver = resolver;
}

Expand Down Expand Up @@ -461,8 +461,8 @@ protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTa
datafeedTask.markAsCompleted();
return;
}
datafeedTask.datafeedManager = datafeedManager;
datafeedManager.run(datafeedTask,
datafeedTask.datafeedRunner = datafeedRunner;
datafeedRunner.run(datafeedTask,
(error) -> {
if (error != null) {
datafeedTask.markAsFailed(error);
Expand All @@ -487,7 +487,7 @@ public static class DatafeedTask extends AllocatedPersistentTask implements Star
private final long startTime;
private final Long endTime;
/* only pck protected for testing */
volatile DatafeedManager datafeedManager;
volatile DatafeedRunner datafeedRunner;

DatafeedTask(long id, String type, String action, TaskId parentTaskId, StartDatafeedAction.DatafeedParams params,
Map<String, String> headers) {
Expand Down Expand Up @@ -530,24 +530,24 @@ public boolean shouldCancelChildrenOnCancellation() {
}

public void stop(String reason, TimeValue timeout) {
if (datafeedManager != null) {
datafeedManager.stopDatafeed(this, reason, timeout);
if (datafeedRunner != null) {
datafeedRunner.stopDatafeed(this, reason, timeout);
}
}

public void isolate() {
if (datafeedManager != null) {
datafeedManager.isolateDatafeed(getAllocationId());
if (datafeedRunner != null) {
datafeedRunner.isolateDatafeed(getAllocationId());
}
}

public Optional<GetDatafeedRunningStateAction.Response.RunningState> getRunningState() {
if (datafeedManager == null) {
if (datafeedRunner == null) {
return Optional.empty();
}
return Optional.of(new GetDatafeedRunningStateAction.Response.RunningState(
this.endTime == null,
datafeedManager.finishedLookBack(this)
datafeedRunner.finishedLookBack(this)
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class DatafeedManager {
public class DatafeedRunner {

private static final Logger logger = LogManager.getLogger(DatafeedManager.class);
private static final Logger logger = LogManager.getLogger(DatafeedRunner.class);

private final Client client;
private final ClusterService clusterService;
Expand All @@ -69,9 +69,9 @@ public class DatafeedManager {
private final AutodetectProcessManager autodetectProcessManager;
private final DatafeedContextProvider datafeedContextProvider;

public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
Supplier<Long> currentTimeSupplier, AnomalyDetectionAuditor auditor,
AutodetectProcessManager autodetectProcessManager, DatafeedContextProvider datafeedContextProvider) {
public DatafeedRunner(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
Supplier<Long> currentTimeSupplier, AnomalyDetectionAuditor auditor,
AutodetectProcessManager autodetectProcessManager, DatafeedContextProvider datafeedContextProvider) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.threadPool = Objects.requireNonNull(threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Map;

import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedRunnerTests.createDatafeedConfig;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunnerTests;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;

import java.util.Arrays;
Expand Down Expand Up @@ -50,37 +50,37 @@ protected NamedXContentRegistry xContentRegistry() {
}

public void testValidate_jobClosed() {
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
PersistentTasksCustomMetadata tasks = PersistentTasksCustomMetadata.builder().build();
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
DatafeedConfig datafeedConfig1 = DatafeedRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks, xContentRegistry()));
assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is closed"));
}

public void testValidate_jobOpening() {
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), null, tasksBuilder);
PersistentTasksCustomMetadata tasks = tasksBuilder.build();
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
DatafeedConfig datafeedConfig1 = DatafeedRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();

TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks, xContentRegistry());
}

public void testValidate_jobOpened() {
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), JobState.OPENED, tasksBuilder);
PersistentTasksCustomMetadata tasks = tasksBuilder.build();
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
DatafeedConfig datafeedConfig1 = DatafeedRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();

TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks, xContentRegistry());
}

public void testDeprecationsLogged() {
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
DatafeedConfig.Builder datafeedConfig = DatafeedRunnerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
DatafeedConfig config = spy(datafeedConfig.build());
doReturn(Collections.singletonList("Deprecated Agg")).when(config).getAggDeprecations(any(NamedXContentRegistry.class));
doReturn(Collections.singletonList("Deprecated Query")).when(config).getQueryDeprecations(any(NamedXContentRegistry.class));
Expand All @@ -94,8 +94,8 @@ public void testDeprecationsLogged() {
}

public void testNoDeprecationsLogged() {
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
DatafeedConfig.Builder datafeedConfig = DatafeedRunnerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
DatafeedConfig config = spy(datafeedConfig.build());
doReturn(Collections.emptyList()).when(config).getAggDeprecations(any(NamedXContentRegistry.class));
doReturn(Collections.emptyList()).when(config).getQueryDeprecations(any(NamedXContentRegistry.class));
Expand Down Expand Up @@ -160,10 +160,10 @@ public void testRemoteClusterVersionCheck() {
public static TransportStartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action,
TaskId parentTaskId,
StartDatafeedAction.DatafeedParams params,
DatafeedManager datafeedManager) {
DatafeedRunner datafeedRunner) {
TransportStartDatafeedAction.DatafeedTask task = new TransportStartDatafeedAction.DatafeedTask(id, type, action, parentTaskId,
params, Collections.emptyMap());
task.datafeedManager = datafeedManager;
task.datafeedRunner = datafeedRunner;
return task;
}
}
Loading