Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1f0fb22
Fix deprecation warnings for doc value fields format
davidkyle Oct 25, 2018
9e0fc5c
Re-enable MachineLearningLicensingTests
davidkyle Oct 25, 2018
e873419
Refresh, refresh, refresh
davidkyle Oct 25, 2018
0cdff43
Mute TooManyJobsIT tests they rely on the job memory usage being known
davidkyle Oct 18, 2018
088617a
ToXContent job field in OpenJobParams
davidkyle Oct 25, 2018
c81e54d
Mute BasicDistributedJobsIT.testMlIndicesNotAvailable
davidkyle Oct 25, 2018
d9dde59
Enable muted job manager tests
davidkyle Oct 25, 2018
cecaa89
Catch MlMetadata throwing from expandX
davidkyle Oct 19, 2018
4b40e57
Return sorted Ids for job/datafeed expand Ids
davidkyle Oct 22, 2018
7c3a7ad
Various checks at job creation that the job and group names are unique.
davidkyle Oct 23, 2018
97bbfd3
Mute tests where a missing job exception is no longer thrown
davidkyle Oct 23, 2018
11054d7
Disable index mappings for CategorizationAnalyzerConfig
davidkyle Oct 23, 2018
9b3774c
Delete job was accidentally deleting .ml-anomalies-shared
davidkyle Oct 23, 2018
f24bff1
Do update job work on autodetect close in a threaded listener
davidkyle Oct 24, 2018
c6b6882
Delete expired data tests must set model snapshot
davidkyle Oct 24, 2018
f5499a1
fix test after the order of checks deleting a datafeed has changed
davidkyle Oct 24, 2018
d330b51
Wait for the update job request on autodetect result processor close
davidkyle Oct 24, 2018
501438c
Revert change to JobTests
davidkyle Oct 25, 2018
f2e7012
Make onCloseActionsLatch visible to thread calling awaitCompletion
davidkyle Oct 25, 2018
9aec620
Mute rolling upgrade tests
davidkyle Oct 25, 2018
553928d
Actually really mute rolling upgrade tests
davidkyle Oct 25, 2018
1274e16
Set size for requests where the max number of hits is known
davidkyle Oct 25, 2018
cc223ed
Remove CategorizationAnalyzerConfig fields from mapping tests
davidkyle Oct 25, 2018
daffc3e
Fix unknown fields in job params toXContent tests
davidkyle Oct 25, 2018
f0f976e
checkstyle
davidkyle Oct 25, 2018
2e0b983
Re-enable BasicDistributedJobsIT test for active indices
davidkyle Oct 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,15 @@ public static class JobParams implements XPackPlugin.XPackPersistentTaskParams {
/** TODO Remove in 7.0.0 */
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField JOB = new ParseField("job");

public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(MlTasks.JOB_TASK_NAME, true, JobParams::new);
static {
PARSER.declareString(JobParams::setJobId, Job.ID);
PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareObject(JobParams::setJob, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOB);
}

public static JobParams fromXContent(XContentParser parser) {
Expand Down Expand Up @@ -233,6 +235,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
if (job != null) {
builder.field("job", job);
}
builder.endObject();
// The job field is streamed but not persisted
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.ml.job.config;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -1084,6 +1085,10 @@ private void validateGroups() {
if (MlStrings.isValidId(group) == false) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_GROUP, group));
}
if (this.id.equals(group)) {
// cannot have a group name the same as the job id
throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, group));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
Expand Down Expand Up @@ -156,21 +155,7 @@ public static void addJobConfigFields(XContentBuilder builder) throws IOExceptio
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnalysisConfig.CATEGORIZATION_ANALYZER.getPreferredName())
.startObject(PROPERTIES)
.startObject(CategorizationAnalyzerConfig.CATEGORIZATION_ANALYZER.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
// TOKENIZER, TOKEN_FILTERS and CHAR_FILTERS are complex types, don't parse or index
.startObject(CategorizationAnalyzerConfig.TOKENIZER.getPreferredName())
.field(ENABLED, false)
.endObject()
.startObject(CategorizationAnalyzerConfig.TOKEN_FILTERS.getPreferredName())
.field(ENABLED, false)
.endObject()
.startObject(CategorizationAnalyzerConfig.CHAR_FILTERS.getPreferredName())
.field(ENABLED, false)
.endObject()
.endObject()
.field(ENABLED, false)
.endObject()
.startObject(AnalysisConfig.LATENCY.getPreferredName())
.field(TYPE, KEYWORD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
Expand Down Expand Up @@ -214,10 +213,6 @@ public final class ReservedFieldNames {
AnalysisLimits.MODEL_MEMORY_LIMIT.getPreferredName(),
AnalysisLimits.CATEGORIZATION_EXAMPLES_LIMIT.getPreferredName(),

CategorizationAnalyzerConfig.CHAR_FILTERS.getPreferredName(),
CategorizationAnalyzerConfig.TOKENIZER.getPreferredName(),
CategorizationAnalyzerConfig.TOKEN_FILTERS.getPreferredName(),

Detector.DETECTOR_DESCRIPTION_FIELD.getPreferredName(),
Detector.FUNCTION_FIELD.getPreferredName(),
Detector.FIELD_NAME_FIELD.getPreferredName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.job.config.JobTests;

import java.io.IOException;
import java.util.function.Predicate;

public class JobParamsTests extends AbstractSerializingTestCase<OpenJobAction.JobParams> {

Expand All @@ -25,6 +27,9 @@ public static OpenJobAction.JobParams createJobParams() {
if (randomBoolean()) {
params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
if (randomBoolean()) {
params.setJob(JobTests.createRandomizedJob());
}
return params;
}

Expand All @@ -42,4 +47,12 @@ protected Writeable.Reader<OpenJobAction.JobParams> instanceReader() {
protected boolean supportsUnknownFields() {
return true;
}

@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
// Don't insert random fields into the job object as the
// custom_fields member accepts arbitrary fields and new
// fields inserted there will result in object inequality
return path -> path.startsWith(OpenJobAction.JobParams.JOB.getPreferredName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -523,6 +524,13 @@ public void testInvalidGroup() {
assertThat(e.getMessage(), containsString("Invalid group id '$$$'"));
}

public void testInvalidGroup_matchesJobId() {
Job.Builder builder = buildJobBuilder("foo");
builder.setGroups(Collections.singletonList("foo"));
ResourceAlreadyExistsException e = expectThrows(ResourceAlreadyExistsException.class, builder::build);
assertEquals(e.getMessage(), "job and group names must be unique but job [foo] and group [foo] have the same name");
}

public void testEstimateMemoryFootprint_GivenEstablished() {
Job.Builder builder = buildJobBuilder("established");
long establishedModelMemory = randomIntBetween(10_000, 2_000_000_000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ public void testRealtime() throws Exception {
response = e.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(409));
assertThat(EntityUtils.toString(response.getEntity()),
containsString("Cannot delete job [" + jobId + "] because datafeed [" + datafeedId + "] refers to it"));
containsString("Cannot delete job [" + jobId + "] because the job is opened"));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of checks has changed. The response code is the same (409) only the error message has change which does not constitute a breaking change.


response = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop"));
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
}

public void testDeleteExpiredData() throws Exception {
registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(null));
registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(null));
registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L));
registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L));
registerJob(newJobBuilder("snapshots-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
registerJob(newJobBuilder("snapshots-retention-with-retain").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
registerJob(newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
builder -> {
Job job = builder.build();
indexName.set(job.getResultsIndexName());
if (indexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
if (indexName.get().equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
//don't bother searching the index any further, we are on the default shared
customIndexSearchHandler.onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,15 @@ Map<String, DatafeedConfig> expandClusterStateDatafeeds(String datafeedExpressio
ClusterState clusterState) {

Map<String, DatafeedConfig> configById = new HashMap<>();

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression, allowNoDatafeeds);

for (String expandedDatafeedId : expandedDatafeedIds) {
configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId));
try {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression, allowNoDatafeeds);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This throws if the required job Id(s) is not found. Eventually this code will be removed but I think it's a useful early check that the future migration process is successful.


for (String expandedDatafeedId : expandedDatafeedIds) {
configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId));
}
} catch (Exception e){
// ignore

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on your comment above it sounds like exceptions are unexpected here? If so, we could assert and display the exception and stack trace. Then we'd get CI failures that might flag situations we hadn't thought of. (But obviously not if I've misunderstood and exceptions here are expected.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose here is to check that the job is not defined in both the cluster state and the index and will error in that case. Unfortunately expandDatafeedIds throws if the argument is a wild card (foo*) and there is no match

}

return configById;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ protected void doExecute(GetModelSnapshotsAction.Request request, ActionListener
request.getJobId(), request.getSnapshotId(), request.getPageParams().getFrom(), request.getPageParams().getSize(),
request.getStart(), request.getEnd(), request.getSort(), request.getDescOrder());

jobManager.getJob(request.getJobId(), ActionListener.wrap(
job -> {
jobManager.jobExists(request.getJobId(), ActionListener.wrap(
ok -> {
jobResultsProvider.modelSnapshots(request.getJobId(), request.getPageParams().getFrom(),
request.getPageParams().getSize(), request.getStart(), request.getEnd(), request.getSort(),
request.getDescOrder(), request.getSnapshotId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
Expand All @@ -62,6 +64,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -114,6 +118,7 @@ public void putDatafeedConfig(DatafeedConfig config, Map<String, String> headers
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId))
.setSource(source)
.setOpType(DocWriteRequest.OpType.CREATE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.request();

executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
Expand Down Expand Up @@ -181,19 +186,20 @@ public void onFailure(Exception e) {
public void findDatafeedsForJobIds(Collection<String> jobIds, ActionListener<Set<String>> listener) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedJobIdsQuery(jobIds));
sourceBuilder.fetchSource(false);
sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName());
sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

USE_DEFAULT_FORMAT prevents a deprecation warning


SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSize(jobIds.size())
.setSource(sourceBuilder).request();

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
response -> {
Set<String> datafeedIds = new HashSet<>();
SearchHit[] hits = response.getHits().getHits();
// There cannot be more than one datafeed per job
assert hits.length <= jobIds.size();
assert response.getHits().totalHits <= jobIds.size();
SearchHit[] hits = response.getHits().getHits();

for (SearchHit hit : hits) {
datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue());
Expand All @@ -214,6 +220,7 @@ public void findDatafeedsForJobIds(Collection<String> jobIds, ActionListener<Set
public void deleteDatafeedConfig(String datafeedId, ActionListener<DeleteResponse> actionListener) {
DeleteRequest request = new DeleteRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId));
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
Expand Down Expand Up @@ -307,6 +314,7 @@ private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, Acti
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(updatedConfig.getId()))
.setSource(updatedSource)
.setVersion(version)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.request();

executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener);
Expand Down Expand Up @@ -341,12 +349,12 @@ private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, Acti
* wildcard then setting this true will not suppress the exception
* @param listener The expanded datafeed IDs listener
*/
public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ActionListener<Set<String>> listener) {
public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ActionListener<SortedSet<String>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens));
sourceBuilder.sort(DatafeedConfig.ID.getPreferredName());
sourceBuilder.fetchSource(false);
sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName());
sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT);

SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
Expand All @@ -357,7 +365,7 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
response -> {
Set<String> datafeedIds = new HashSet<>();
SortedSet<String> datafeedIds = new TreeSet<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue());
Expand Down
Loading