Skip to content

Commit 708aff9

Browse files
committed
[ML] Removing old per-partition normalization code (#32816)
[ML] Removing old per-partition normalization code Per-partition normalization is an old, undocumented feature that was never used by clients. It has been superseded by per-partition maximum scoring. To maintain communication compatibility with nodes prior to 6.5 it is necessary to maintain/cope with the old wire format
1 parent e9622ab commit 708aff9

29 files changed

+99
-596
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfig.java

Lines changed: 15 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
6464
private static final ParseField OVERLAPPING_BUCKETS = new ParseField("overlapping_buckets");
6565
private static final ParseField RESULT_FINALIZATION_WINDOW = new ParseField("result_finalization_window");
6666
private static final ParseField MULTIVARIATE_BY_FIELDS = new ParseField("multivariate_by_fields");
67-
private static final ParseField USER_PER_PARTITION_NORMALIZATION = new ParseField("use_per_partition_normalization");
6867

6968
public static final String ML_CATEGORY_FIELD = "mlcategory";
7069
public static final Set<String> AUTO_CREATED_FIELDS = new HashSet<>(Collections.singletonList(ML_CATEGORY_FIELD));
@@ -98,7 +97,6 @@ private static ConstructingObjectParser<AnalysisConfig.Builder, Void> createPars
9897
parser.declareBoolean(Builder::setOverlappingBuckets, OVERLAPPING_BUCKETS);
9998
parser.declareLong(Builder::setResultFinalizationWindow, RESULT_FINALIZATION_WINDOW);
10099
parser.declareBoolean(Builder::setMultivariateByFields, MULTIVARIATE_BY_FIELDS);
101-
parser.declareBoolean(Builder::setUsePerPartitionNormalization, USER_PER_PARTITION_NORMALIZATION);
102100

103101
return parser;
104102
}
@@ -117,12 +115,11 @@ private static ConstructingObjectParser<AnalysisConfig.Builder, Void> createPars
117115
private final Boolean overlappingBuckets;
118116
private final Long resultFinalizationWindow;
119117
private final Boolean multivariateByFields;
120-
private final boolean usePerPartitionNormalization;
121118

122119
private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, List<String> categorizationFilters,
123120
CategorizationAnalyzerConfig categorizationAnalyzerConfig, TimeValue latency, String summaryCountFieldName,
124121
List<Detector> detectors, List<String> influencers, Boolean overlappingBuckets, Long resultFinalizationWindow,
125-
Boolean multivariateByFields, boolean usePerPartitionNormalization) {
122+
Boolean multivariateByFields) {
126123
this.detectors = detectors;
127124
this.bucketSpan = bucketSpan;
128125
this.latency = latency;
@@ -134,7 +131,6 @@ private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, Lis
134131
this.overlappingBuckets = overlappingBuckets;
135132
this.resultFinalizationWindow = resultFinalizationWindow;
136133
this.multivariateByFields = multivariateByFields;
137-
this.usePerPartitionNormalization = usePerPartitionNormalization;
138134
}
139135

140136
public AnalysisConfig(StreamInput in) throws IOException {
@@ -165,7 +161,12 @@ public AnalysisConfig(StreamInput in) throws IOException {
165161
}
166162
}
167163

168-
usePerPartitionNormalization = in.readBoolean();
164+
// BWC for removed per-partition normalization
165+
// Version check is temporarily against the latest to satisfy CI tests
166+
// TODO change to V_6_5_0 after successful backport to 6.x
167+
if (in.getVersion().before(Version.V_7_0_0_alpha1)) {
168+
in.readBoolean();
169+
}
169170
}
170171

171172
@Override
@@ -195,7 +196,12 @@ public void writeTo(StreamOutput out) throws IOException {
195196
out.writeBoolean(false);
196197
}
197198

198-
out.writeBoolean(usePerPartitionNormalization);
199+
// BWC for removed per-partition normalization
200+
// Version check is temporarily against the latest to satisfy CI tests
201+
// TODO change to V_6_5_0 after successful backport to 6.x
202+
if (out.getVersion().before(Version.V_7_0_0_alpha1)) {
203+
out.writeBoolean(false);
204+
}
199205
}
200206

201207
/**
@@ -299,10 +305,6 @@ public Boolean getMultivariateByFields() {
299305
return multivariateByFields;
300306
}
301307

302-
public boolean getUsePerPartitionNormalization() {
303-
return usePerPartitionNormalization;
304-
}
305-
306308
/**
307309
* Return the set of fields required by the analysis.
308310
* These are the influencer fields, metric field, partition field,
@@ -403,9 +405,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
403405
if (multivariateByFields != null) {
404406
builder.field(MULTIVARIATE_BY_FIELDS.getPreferredName(), multivariateByFields);
405407
}
406-
if (usePerPartitionNormalization) {
407-
builder.field(USER_PER_PARTITION_NORMALIZATION.getPreferredName(), usePerPartitionNormalization);
408-
}
409408
builder.endObject();
410409
return builder;
411410
}
@@ -416,7 +415,6 @@ public boolean equals(Object o) {
416415
if (o == null || getClass() != o.getClass()) return false;
417416
AnalysisConfig that = (AnalysisConfig) o;
418417
return Objects.equals(latency, that.latency) &&
419-
usePerPartitionNormalization == that.usePerPartitionNormalization &&
420418
Objects.equals(bucketSpan, that.bucketSpan) &&
421419
Objects.equals(categorizationFieldName, that.categorizationFieldName) &&
422420
Objects.equals(categorizationFilters, that.categorizationFilters) &&
@@ -434,7 +432,7 @@ public int hashCode() {
434432
return Objects.hash(
435433
bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, latency,
436434
summaryCountFieldName, detectors, influencers, overlappingBuckets, resultFinalizationWindow,
437-
multivariateByFields, usePerPartitionNormalization
435+
multivariateByFields
438436
);
439437
}
440438

@@ -453,7 +451,6 @@ public static class Builder {
453451
private Boolean overlappingBuckets;
454452
private Long resultFinalizationWindow;
455453
private Boolean multivariateByFields;
456-
private boolean usePerPartitionNormalization = false;
457454

458455
public Builder(List<Detector> detectors) {
459456
setDetectors(detectors);
@@ -472,7 +469,6 @@ public Builder(AnalysisConfig analysisConfig) {
472469
this.overlappingBuckets = analysisConfig.overlappingBuckets;
473470
this.resultFinalizationWindow = analysisConfig.resultFinalizationWindow;
474471
this.multivariateByFields = analysisConfig.multivariateByFields;
475-
this.usePerPartitionNormalization = analysisConfig.usePerPartitionNormalization;
476472
}
477473

478474
public void setDetectors(List<Detector> detectors) {
@@ -535,10 +531,6 @@ public void setMultivariateByFields(Boolean multivariateByFields) {
535531
this.multivariateByFields = multivariateByFields;
536532
}
537533

538-
public void setUsePerPartitionNormalization(boolean usePerPartitionNormalization) {
539-
this.usePerPartitionNormalization = usePerPartitionNormalization;
540-
}
541-
542534
/**
543535
* Checks the configuration is valid
544536
* <ol>
@@ -571,16 +563,11 @@ public AnalysisConfig build() {
571563

572564
overlappingBuckets = verifyOverlappingBucketsConfig(overlappingBuckets, detectors);
573565

574-
if (usePerPartitionNormalization) {
575-
checkDetectorsHavePartitionFields(detectors);
576-
checkNoInfluencersAreSet(influencers);
577-
}
578-
579566
verifyNoInconsistentNestedFieldNames();
580567

581568
return new AnalysisConfig(bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig,
582569
latency, summaryCountFieldName, detectors, influencers, overlappingBuckets,
583-
resultFinalizationWindow, multivariateByFields, usePerPartitionNormalization);
570+
resultFinalizationWindow, multivariateByFields);
584571
}
585572

586573
private void verifyNoMetricFunctionsWhenSummaryCountFieldNameIsSet() {
@@ -704,23 +691,6 @@ private void verifyCategorizationFiltersAreValidRegex() {
704691
}
705692
}
706693

707-
private static void checkDetectorsHavePartitionFields(List<Detector> detectors) {
708-
for (Detector detector : detectors) {
709-
if (!Strings.isNullOrEmpty(detector.getPartitionFieldName())) {
710-
return;
711-
}
712-
}
713-
throw ExceptionsHelper.badRequestException(Messages.getMessage(
714-
Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD));
715-
}
716-
717-
private static void checkNoInfluencersAreSet(List<String> influencers) {
718-
if (!influencers.isEmpty()) {
719-
throw ExceptionsHelper.badRequestException(Messages.getMessage(
720-
Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS));
721-
}
722-
}
723-
724694
private static boolean isValidRegex(String exp) {
725695
try {
726696
Pattern.compile(exp);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,6 @@ public final class Messages {
130130
"over_field_name cannot be used with function ''{0}''";
131131
public static final String JOB_CONFIG_OVERLAPPING_BUCKETS_INCOMPATIBLE_FUNCTION =
132132
"Overlapping buckets cannot be used with function ''{0}''";
133-
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS =
134-
"A job configured with Per-Partition Normalization cannot use influencers";
135-
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD =
136-
"If the job is configured with Per-Partition Normalization enabled a detector must have a partition field";
137133
public static final String JOB_CONFIG_UNKNOWN_FUNCTION = "Unknown function ''{0}''";
138134
public static final String JOB_CONFIG_UPDATE_ANALYSIS_LIMITS_MODEL_MEMORY_LIMIT_CANNOT_BE_DECREASED =
139135
"Invalid update value for analysis_limits: model_memory_limit cannot be decreased below current usage; " +

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -227,23 +227,6 @@ private static void addResultsMapping(XContentBuilder builder) throws IOExceptio
227227
.startObject(Bucket.SCHEDULED_EVENTS.getPreferredName())
228228
.field(TYPE, KEYWORD)
229229
.endObject()
230-
.startObject(Bucket.PARTITION_SCORES.getPreferredName())
231-
.field(TYPE, NESTED)
232-
.startObject(PROPERTIES)
233-
.startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName())
234-
.field(TYPE, KEYWORD)
235-
.endObject()
236-
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
237-
.field(TYPE, KEYWORD)
238-
.endObject()
239-
.startObject(Bucket.INITIAL_ANOMALY_SCORE.getPreferredName())
240-
.field(TYPE, DOUBLE)
241-
.endObject()
242-
.startObject(AnomalyRecord.PROBABILITY.getPreferredName())
243-
.field(TYPE, DOUBLE)
244-
.endObject()
245-
.endObject()
246-
.endObject()
247230

248231
.startObject(Bucket.BUCKET_INFLUENCERS.getPreferredName())
249232
.field(TYPE, NESTED)
@@ -328,7 +311,7 @@ private static void addTermFields(XContentBuilder builder, Collection<String> te
328311
}
329312

330313
private static void addForecastFieldsToMapping(XContentBuilder builder) throws IOException {
331-
314+
332315
// Forecast Output
333316
builder.startObject(Forecast.FORECAST_LOWER.getPreferredName())
334317
.field(TYPE, DOUBLE)
@@ -370,7 +353,7 @@ private static void addForecastFieldsToMapping(XContentBuilder builder) throws I
370353
.field(TYPE, LONG)
371354
.endObject();
372355
}
373-
356+
374357
/**
375358
* AnomalyRecord fields to be added under the 'properties' section of the mapping
376359
* @param builder Add properties to this builder

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/Bucket.java

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Date;
2626
import java.util.List;
2727
import java.util.Objects;
28-
import java.util.Optional;
2928

3029
/**
3130
* Bucket Result POJO
@@ -43,7 +42,6 @@ public class Bucket implements ToXContentObject, Writeable {
4342
public static final ParseField BUCKET_INFLUENCERS = new ParseField("bucket_influencers");
4443
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
4544
public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms");
46-
public static final ParseField PARTITION_SCORES = new ParseField("partition_scores");
4745
public static final ParseField SCHEDULED_EVENTS = new ParseField("scheduled_events");
4846

4947
// Used for QueryPage
@@ -58,6 +56,19 @@ public class Bucket implements ToXContentObject, Writeable {
5856
public static final ConstructingObjectParser<Bucket, Void> STRICT_PARSER = createParser(false);
5957
public static final ConstructingObjectParser<Bucket, Void> LENIENT_PARSER = createParser(true);
6058

59+
/* *
60+
* Read and discard the old (prior to 6.5) perPartitionNormalization values
61+
*/
62+
public static Bucket readOldPerPartitionNormalization(StreamInput in) throws IOException {
63+
in.readString();
64+
in.readString();
65+
in.readDouble();
66+
in.readDouble();
67+
in.readDouble();
68+
69+
return null;
70+
}
71+
6172
private static ConstructingObjectParser<Bucket, Void> createParser(boolean ignoreUnknownFields) {
6273
ConstructingObjectParser<Bucket, Void> parser = new ConstructingObjectParser<>(RESULT_TYPE_VALUE, ignoreUnknownFields,
6374
a -> new Bucket((String) a[0], (Date) a[1], (long) a[2]));
@@ -82,8 +93,6 @@ private static ConstructingObjectParser<Bucket, Void> createParser(boolean ignor
8293
parser.declareObjectArray(Bucket::setBucketInfluencers, ignoreUnknownFields ?
8394
BucketInfluencer.LENIENT_PARSER : BucketInfluencer.STRICT_PARSER, BUCKET_INFLUENCERS);
8495
parser.declareLong(Bucket::setProcessingTimeMs, PROCESSING_TIME_MS);
85-
parser.declareObjectArray(Bucket::setPartitionScores, ignoreUnknownFields ?
86-
PartitionScore.LENIENT_PARSER : PartitionScore.STRICT_PARSER, PARTITION_SCORES);
8796
parser.declareString((bucket, s) -> {}, Result.RESULT_TYPE);
8897
parser.declareStringArray(Bucket::setScheduledEvents, SCHEDULED_EVENTS);
8998

@@ -100,7 +109,6 @@ private static ConstructingObjectParser<Bucket, Void> createParser(boolean ignor
100109
private boolean isInterim;
101110
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to
102111
private long processingTimeMs;
103-
private List<PartitionScore> partitionScores = Collections.emptyList();
104112
private List<String> scheduledEvents = Collections.emptyList();
105113

106114
public Bucket(String jobId, Date timestamp, long bucketSpan) {
@@ -120,7 +128,6 @@ public Bucket(Bucket other) {
120128
this.isInterim = other.isInterim;
121129
this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers);
122130
this.processingTimeMs = other.processingTimeMs;
123-
this.partitionScores = new ArrayList<>(other.partitionScores);
124131
this.scheduledEvents = new ArrayList<>(other.scheduledEvents);
125132
}
126133

@@ -143,7 +150,10 @@ public Bucket(StreamInput in) throws IOException {
143150
if (in.getVersion().before(Version.V_5_5_0)) {
144151
in.readGenericValue();
145152
}
146-
partitionScores = in.readList(PartitionScore::new);
153+
// bwc for perPartitionNormalization
154+
if (in.getVersion().before(Version.V_6_5_0)) {
155+
in.readList(Bucket::readOldPerPartitionNormalization);
156+
}
147157
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
148158
scheduledEvents = in.readList(StreamInput::readString);
149159
if (scheduledEvents.isEmpty()) {
@@ -174,7 +184,10 @@ public void writeTo(StreamOutput out) throws IOException {
174184
if (out.getVersion().before(Version.V_5_5_0)) {
175185
out.writeGenericValue(Collections.emptyMap());
176186
}
177-
out.writeList(partitionScores);
187+
// bwc for perPartitionNormalization
188+
if (out.getVersion().before(Version.V_6_5_0)) {
189+
out.writeList(Collections.emptyList());
190+
}
178191
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
179192
out.writeStringList(scheduledEvents);
180193
}
@@ -195,9 +208,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
195208
builder.field(Result.IS_INTERIM.getPreferredName(), isInterim);
196209
builder.field(BUCKET_INFLUENCERS.getPreferredName(), bucketInfluencers);
197210
builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTimeMs);
198-
if (partitionScores.isEmpty() == false) {
199-
builder.field(PARTITION_SCORES.getPreferredName(), partitionScores);
200-
}
211+
201212
if (scheduledEvents.isEmpty() == false) {
202213
builder.field(SCHEDULED_EVENTS.getPreferredName(), scheduledEvents);
203214
}
@@ -304,14 +315,6 @@ public void addBucketInfluencer(BucketInfluencer bucketInfluencer) {
304315
bucketInfluencers.add(bucketInfluencer);
305316
}
306317

307-
public List<PartitionScore> getPartitionScores() {
308-
return partitionScores;
309-
}
310-
311-
public void setPartitionScores(List<PartitionScore> scores) {
312-
partitionScores = Objects.requireNonNull(scores);
313-
}
314-
315318
public List<String> getScheduledEvents() {
316319
return scheduledEvents;
317320
}
@@ -320,24 +323,10 @@ public void setScheduledEvents(List<String> scheduledEvents) {
320323
this.scheduledEvents = ExceptionsHelper.requireNonNull(scheduledEvents, SCHEDULED_EVENTS.getPreferredName());
321324
}
322325

323-
public double partitionInitialAnomalyScore(String partitionValue) {
324-
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
325-
.findFirst();
326-
327-
return first.isPresent() ? first.get().getInitialRecordScore() : 0.0;
328-
}
329-
330-
public double partitionAnomalyScore(String partitionValue) {
331-
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
332-
.findFirst();
333-
334-
return first.isPresent() ? first.get().getRecordScore() : 0.0;
335-
}
336-
337326
@Override
338327
public int hashCode() {
339328
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, records,
340-
isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs, scheduledEvents);
329+
isInterim, bucketSpan, bucketInfluencers, processingTimeMs, scheduledEvents);
341330
}
342331

343332
/**
@@ -360,7 +349,6 @@ public boolean equals(Object other) {
360349
&& (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore)
361350
&& Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim)
362351
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers)
363-
&& Objects.equals(this.partitionScores, that.partitionScores)
364352
&& (this.processingTimeMs == that.processingTimeMs)
365353
&& Objects.equals(this.scheduledEvents, that.scheduledEvents);
366354
}
@@ -374,6 +362,6 @@ public boolean equals(Object other) {
374362
* @return true if the bucket should be normalized or false otherwise
375363
*/
376364
public boolean isNormalizable() {
377-
return anomalyScore > 0.0 || partitionScores.stream().anyMatch(s -> s.getRecordScore() > 0);
365+
return anomalyScore > 0.0;
378366
}
379367
}

0 commit comments

Comments
 (0)