Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2c7ff1f
Wait for the new .ml-config index in tests
davidkyle Nov 8, 2018
f150c2d
Prevent pre 6.6 job being assigned to a 6.6 node
davidkyle Nov 9, 2018
b488806
Don’t index update cluster state jobs
davidkyle Nov 19, 2018
49fde66
DatafeedJobBuilder must read config from index and cluster state
davidkyle Nov 19, 2018
28a58eb
Order GetJobStats by job Id
davidkyle Nov 19, 2018
8265777
Rolling upgrade tests for jobs and data feeds
davidkyle Nov 19, 2018
9928a1e
Re-enable all rolling upgrade tests
davidkyle Nov 19, 2018
74f65f2
Set expected templates based on upgrade version
davidkyle Nov 19, 2018
e43e256
Use finalize action to set job’s finished time
davidkyle Nov 20, 2018
43af2d7
Use valid mapping type name for 5.6
davidkyle Nov 20, 2018
ee57263
Check finished time is set
davidkyle Nov 20, 2018
8bf3216
Prevent concurrent updates from AutoDetectResultsProcessor and ensure…
davidkyle Nov 20, 2018
a919880
Simplify sort
davidkyle Nov 20, 2018
d1beb2b
Make migration upgrade tests work with other upgrade tests
davidkyle Nov 21, 2018
e1020fd
Extra debugging for job assignment
davidkyle Nov 20, 2018
6e86639
Another name clash, this time the index
davidkyle Nov 21, 2018
bc2252d
Skip tests that rely on wildcard expansion in versions that do not su…
davidkyle Nov 22, 2018
eb8a40d
Remove upgrade test invalidated by the memory tracker
davidkyle Nov 22, 2018
4a755d2
Prevent NPE reading model memory limit
davidkyle Nov 22, 2018
6532995
Log get job stats in the right place and fix silly error
davidkyle Nov 22, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,13 @@ public long estimateMemoryFootprint() {
if (establishedModelMemory != null && establishedModelMemory > 0) {
return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes();
}
return ByteSizeUnit.MB.toBytes(analysisLimits.getModelMemoryLimit()) + PROCESS_MEMORY_OVERHEAD.getBytes();
// Pre v6.1 jobs may have a null analysis limits object or
// a null model memory limit
long modelMemoryLimit = AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB;
if (analysisLimits != null && analysisLimits.getModelMemoryLimit() != null) {
modelMemoryLimit = analysisLimits.getModelMemoryLimit();
}
return ByteSizeUnit.MB.toBytes(modelMemoryLimit) + PROCESS_MEMORY_OVERHEAD.getBytes();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,44 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public final class XPackRestTestHelper {

public static final List<String> ML_PRE_V660_TEMPLATES = Collections.unmodifiableList(
Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndex.jobResultsIndexPrefix()));

public static final List<String> ML_POST_V660_TEMPLATES = Collections.unmodifiableList(
Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndex.jobResultsIndexPrefix(),
AnomalyDetectorsIndex.configIndexName()));

private XPackRestTestHelper() {
}

/**
* Waits for the Machine Learning templates to be created
* and check the version is up to date
*/
public static void waitForMlTemplates(RestClient client) throws InterruptedException {


/**
* For each template name wait for the template to be created and
* for the template version to be equal to the master node version.
*
* @param client The rest client
* @param templateNames Names of the templates to wait for
* @throws InterruptedException If the wait is interrupted
*/
public static void waitForTemplates(RestClient client, List<String> templateNames) throws InterruptedException {
AtomicReference<Version> masterNodeVersion = new AtomicReference<>();
ESTestCase.awaitBusy(() -> {
String response;
Expand All @@ -53,8 +77,6 @@ public static void waitForMlTemplates(RestClient client) throws InterruptedExcep
return false;
});

final List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix());
for (String template : templateNames) {
ESTestCase.awaitBusy(() -> {
Map<?, ?> response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,8 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
public static boolean allTemplatesInstalled(ClusterState clusterState) {
boolean allPresent = true;
List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix());
AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix(),
AnomalyDetectorsIndex.configIndexName());
for (String templateName : templateNames) {
allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
Expand All @@ -48,10 +50,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;

public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJobAction.JobTask, CloseJobAction.Request,
CloseJobAction.Response, CloseJobAction.Response> {

private final ClusterService clusterService;
private final Client client;
private final Auditor auditor;
private final PersistentTasksService persistentTasksService;
private final DatafeedConfigProvider datafeedConfigProvider;
Expand All @@ -61,11 +67,12 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
public TransportCloseJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, Auditor auditor, PersistentTasksService persistentTasksService,
DatafeedConfigProvider datafeedConfigProvider, JobManager jobManager) {
DatafeedConfigProvider datafeedConfigProvider, JobManager jobManager, Client client) {
// We fork in innerTaskOperation(...), so we can use ThreadPool.Names.SAME here:
super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, CloseJobAction.Request::new, CloseJobAction.Response::new, ThreadPool.Names.SAME);
this.clusterService = clusterService;
this.client = client;
this.auditor = auditor;
this.persistentTasksService = persistentTasksService;
this.datafeedConfigProvider = datafeedConfigProvider;
Expand Down Expand Up @@ -419,7 +426,10 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo
}, request.getCloseTimeout(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean result) {
listener.onResponse(response);
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(
waitForCloseRequest.jobsToFinalize.toArray(new String[0]));
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -24,23 +28,36 @@
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

// This action is only called from modes before version 6.6.0
public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction<FinalizeJobExecutionAction.Request,
AcknowledgedResponse> {
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportFinalizeJobExecutionAction extends
TransportMasterNodeAction<FinalizeJobExecutionAction.Request, AcknowledgedResponse> {

private final Client client;
@Inject
public TransportFinalizeJobExecutionAction(Settings settings, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
super(settings, FinalizeJobExecutionAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, FinalizeJobExecutionAction.Request::new);
this.client = client;
}

@Override
Expand All @@ -58,20 +75,60 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust
ActionListener<AcknowledgedResponse> listener) {

MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
List<String> jobsInClusterState = Arrays.stream(request.getJobIds())
Set<String> jobsInClusterState = Arrays.stream(request.getJobIds())
.filter(id -> mlMetadata.getJobs().containsKey(id))
.collect(Collectors.toList());

// This action should not be called for jobs that have
// their configuration in index documents
.collect(Collectors.toSet());

if (jobsInClusterState.isEmpty()) {
// This action is a no-op for jobs not defined in the cluster state.
listener.onResponse(new AcknowledgedResponse(true));
return;
finalizeIndexJobs(Arrays.asList(request.getJobIds()), listener);
} else {
ActionListener<AcknowledgedResponse> finalizeClusterStateJobsListener = ActionListener.wrap(
ack -> finalizeClusterStateJobs(jobsInClusterState, listener),
listener::onFailure
);

Set<String> jobsInIndex = new HashSet<>(Arrays.asList(request.getJobIds()));
jobsInIndex.removeAll(jobsInClusterState);

finalizeIndexJobs(jobsInIndex, finalizeClusterStateJobsListener);
}
}

private void finalizeIndexJobs(Collection<String> jobIds, ActionListener<AcknowledgedResponse> listener) {
String jobIdString = String.join(",", jobIds);
logger.debug("finalizing jobs [{}]", jobIdString);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isn't it stringified to "[id1, id2, ...]" anyway? + less garbage for non-debug?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. I'll remedy in a later commit as this is merged now.


ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor(
MachineLearning.UTILITY_THREAD_POOL_NAME), true);

Map<Object, Object> update = Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date());

for (String jobId: jobIds) {
UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
updateRequest.retryOnConflict(3);
updateRequest.doc(update);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

chainTaskExecutor.add(chainedListener -> {
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap(
updateResponse -> chainedListener.onResponse(null),
chainedListener::onFailure
));
});
}

String jobIdString = String.join(",", jobsInClusterState);
chainTaskExecutor.execute(ActionListener.wrap(
aVoid -> {
logger.debug("finalized job [{}]", jobIdString);
listener.onResponse(new AcknowledgedResponse(true));
},
listener::onFailure
));
}

private void finalizeClusterStateJobs(Collection<String> jobIds, ActionListener<AcknowledgedResponse> listener) {
String jobIdString = String.join(",", jobIds);
String source = "finalize_job_execution [" + jobIdString + "]";
logger.debug("finalizing jobs [{}]", jobIdString);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
Expand All @@ -82,7 +139,7 @@ public ClusterState execute(ClusterState currentState) {
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
Date finishedTime = new Date();

for (String jobId : jobsInClusterState) {
for (String jobId : jobIds) {
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
jobBuilder.setFinishedTime(finishedTime);
mlMetadataBuilder.putJob(jobBuilder.build(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -73,7 +74,7 @@ public TransportGetJobsStatsAction(Settings settings, TransportService transport

@Override
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> finalListener) {

logger.debug("Get stats for job [{}]", request.getJobId());
jobManager.expandJobIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap(
expandedIds -> {
request.setExpandedJobsIds(new ArrayList<>(expandedIds));
Expand All @@ -96,6 +97,7 @@ protected GetJobsStatsAction.Response newResponse(GetJobsStatsAction.Request req
for (QueryPage<GetJobsStatsAction.Response.JobStats> task : tasks) {
stats.addAll(task.results());
}
Collections.sort(stats, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
return new GetJobsStatsAction.Response(taskOperationFailures, failedNodeExceptions, new QueryPage<>(stats, stats.size(),
Job.RESULTS_FIELD));
}
Expand All @@ -109,7 +111,6 @@ protected QueryPage<GetJobsStatsAction.Response.JobStats> readTaskResponse(Strea
protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<QueryPage<GetJobsStatsAction.Response.JobStats>> listener) {
String jobId = task.getJobId();
logger.debug("Get stats for job [{}]", jobId);
ClusterState state = clusterService.state();
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(task);
Expand Down Expand Up @@ -159,6 +160,7 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc
if (counter.decrementAndGet() == 0) {
List<GetJobsStatsAction.Response.JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
int maxMachineMemoryPercent,
MlMemoryTracker memoryTracker,
Logger logger) {
if (job == null) {
logger.debug("[{}] select node job is null", jobId);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe assert instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job == null is valid for the rolling upgrade as that field isn't present in the persistent task parameters for jobs < v6.6.0. I added this to help me understand what was happening during the rolling upgrade tests it probably shouldn't be in the released code however.


String resultsIndexName = job != null ? job.getResultsIndexName() : null;
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState);
if (unavailableIndices.size() != 0) {
Expand Down Expand Up @@ -236,6 +240,16 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
reasons.add(reason);
continue;
}

boolean jobConfigIsStoredInIndex = job.getJobVersion().onOrAfter(Version.V_6_6_0);
if (jobConfigIsStoredInIndex && node.getVersion().before(Version.V_6_6_0)) {
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameOrId(node)
+ "] version [" + node.getVersion() + "], because this node does not support " +
"jobs of version [" + job.getJobVersion() + "]";
logger.trace(reason);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: debug seems more suited to me

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember that on a 100 node cluster each allocation will generate 100 messages similar to this one, which would be significant log spam. They get concatenated into the overall reason, which is stored in the cluster state if the persistent task exists (and returned in the error message in the case of this being called prior to opening).

All the other possible reasons for ruling out a node in this method also currently log at the trace level. I think they should all log at the same level, otherwise someone reading the logs could get a misleading picture of what is happening.

I would leave this as trace to match the others.

reasons.add(reason);
continue;
}
}

long numberOfAssignedJobs = 0;
Expand Down Expand Up @@ -820,8 +834,16 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS

@Override
public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) {
Job foundJob = params.getJob();
if (foundJob == null) {
// The job was added to the persistent task parameters in 6.6.0
// if the field is not present the task was created before 6.6.0.
// In which case the job should still be in the clusterstate
foundJob = MlMetadata.getMlMetadata(clusterState).getJobs().get(params.getJobId());
}

PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(),
params.getJob(),
foundJob,
clusterState,
maxConcurrentJobAllocations,
fallbackMaxNumberOfOpenJobs,
Expand Down
Loading