Skip to content

Commit 2ccfaf4

Browse files
committed
[ML] adding pivot.max_search_page_size option for setting paging size (elastic#41920)
* [ML] adding pivot.size option for setting paging size * Changing field name to address PR comments * fixing ctor usage * adjust hlrc for field name change
1 parent 44c3418 commit 2ccfaf4

File tree

14 files changed

+154
-32
lines changed

14 files changed

+154
-32
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,32 +39,39 @@ public class PivotConfig implements ToXContentObject {
3939

4040
private static final ParseField GROUP_BY = new ParseField("group_by");
4141
private static final ParseField AGGREGATIONS = new ParseField("aggregations");
42+
private static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
4243

4344
private final GroupConfig groups;
4445
private final AggregationConfig aggregationConfig;
46+
private final Integer maxPageSearchSize;
4547

4648
private static final ConstructingObjectParser<PivotConfig, Void> PARSER = new ConstructingObjectParser<>("pivot_config", true,
47-
args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1]));
49+
args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1], (Integer) args[2]));
4850

4951
static {
5052
PARSER.declareObject(constructorArg(), (p, c) -> (GroupConfig.fromXContent(p)), GROUP_BY);
5153
PARSER.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p), AGGREGATIONS);
54+
PARSER.declareInt(optionalConstructorArg(), MAX_PAGE_SEARCH_SIZE);
5255
}
5356

5457
public static PivotConfig fromXContent(final XContentParser parser) {
5558
return PARSER.apply(parser, null);
5659
}
5760

58-
PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig) {
61+
PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) {
5962
this.groups = groups;
6063
this.aggregationConfig = aggregationConfig;
64+
this.maxPageSearchSize = maxPageSearchSize;
6165
}
6266

6367
@Override
6468
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
6569
builder.startObject();
6670
builder.field(GROUP_BY.getPreferredName(), groups);
6771
builder.field(AGGREGATIONS.getPreferredName(), aggregationConfig);
72+
if (maxPageSearchSize != null) {
73+
builder.field(MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
74+
}
6875
builder.endObject();
6976
return builder;
7077
}
@@ -77,6 +84,10 @@ public GroupConfig getGroupConfig() {
7784
return groups;
7885
}
7986

87+
public Integer getMaxPageSearchSize() {
88+
return maxPageSearchSize;
89+
}
90+
8091
@Override
8192
public boolean equals(Object other) {
8293
if (this == other) {
@@ -89,12 +100,14 @@ public boolean equals(Object other) {
89100

90101
final PivotConfig that = (PivotConfig) other;
91102

92-
return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig);
103+
return Objects.equals(this.groups, that.groups)
104+
&& Objects.equals(this.aggregationConfig, that.aggregationConfig)
105+
&& Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize);
93106
}
94107

95108
@Override
96109
public int hashCode() {
97-
return Objects.hash(groups, aggregationConfig);
110+
return Objects.hash(groups, aggregationConfig, maxPageSearchSize);
98111
}
99112

100113
public static Builder builder() {
@@ -104,6 +117,7 @@ public static Builder builder() {
104117
public static class Builder {
105118
private GroupConfig groups;
106119
private AggregationConfig aggregationConfig;
120+
private Integer maxPageSearchSize;
107121

108122
/**
109123
* Set how to group the source data
@@ -135,8 +149,22 @@ public Builder setAggregations(AggregatorFactories.Builder aggregations) {
135149
return this;
136150
}
137151

152+
/**
153+
* Sets the paging maximum paging maxPageSearchSize that date frame transform can use when
154+
* pulling the data from the source index.
155+
*
156+
* If OOM is triggered, the paging maxPageSearchSize is dynamically reduced so that the transform can continue to gather data.
157+
*
158+
* @param maxPageSearchSize Integer value between 10 and 10_000
159+
* @return the {@link Builder} with the paging maxPageSearchSize set.
160+
*/
161+
public Builder setMaxPageSearchSize(Integer maxPageSearchSize) {
162+
this.maxPageSearchSize = maxPageSearchSize;
163+
return this;
164+
}
165+
138166
public PivotConfig build() {
139-
return new PivotConfig(groups, aggregationConfig);
167+
return new PivotConfig(groups, aggregationConfig, maxPageSearchSize);
140168
}
141169
}
142170
}

client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
public class PivotConfigTests extends AbstractXContentTestCase<PivotConfig> {
3333

3434
public static PivotConfig randomPivotConfig() {
35-
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig());
35+
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
36+
AggregationConfigTests.randomAggregationConfig(),
37+
randomBoolean() ? null : randomIntBetween(10, 10_000));
3638
}
3739

3840
@Override

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
137137
// end::put-data-frame-transform-agg-config
138138
// tag::put-data-frame-transform-pivot-config
139139
PivotConfig pivotConfig = PivotConfig.builder()
140-
.setGroups(groupConfig)
141-
.setAggregationConfig(aggConfig)
140+
.setGroups(groupConfig) // <1>
141+
.setAggregationConfig(aggConfig) // <2>
142+
.setMaxPageSearchSize(1000) // <3>
142143
.build();
143144
// end::put-data-frame-transform-pivot-config
144145
// tag::put-data-frame-transform-config

docs/java-rest/high-level/dataframe/put_data_frame.asciidoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ Defines the pivot function `group by` fields and the aggregation to reduce the d
6666
--------------------------------------------------
6767
include-tagged::{doc-tests-file}[{api}-pivot-config]
6868
--------------------------------------------------
69+
<1> The `GroupConfig` to use in the pivot
70+
<2> The aggregations to use
71+
<3> The maximum paging size for the transform when pulling data
72+
from the source. The size dynamically adjusts as the transform
73+
is running to recover from and prevent OOM issues.
6974

7075
===== GroupConfig
7176
The grouping terms. Defines the group by and destination fields

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public final class DataFrameField {
2727
public static final ParseField SOURCE = new ParseField("source");
2828
public static final ParseField DESTINATION = new ParseField("dest");
2929
public static final ParseField FORCE = new ParseField("force");
30+
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
3031

3132
/**
3233
* Fields for checkpointing

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ public static Request fromXContent(final XContentParser parser, final String id)
5656
@Override
5757
public ActionRequestValidationException validate() {
5858
ActionRequestValidationException validationException = null;
59+
if(config.getPivotConfig() != null
60+
&& config.getPivotConfig().getMaxPageSearchSize() != null
61+
&& (config.getPivotConfig().getMaxPageSearchSize() < 10 || config.getPivotConfig().getMaxPageSearchSize() > 10_000)) {
62+
validationException = addValidationError(
63+
"pivot.max_page_search_size [" +
64+
config.getPivotConfig().getMaxPageSearchSize() + "] must be greater than 10 and less than 10,000",
65+
validationException);
66+
}
5967
for(String failure : config.getPivotConfig().aggFieldValidation()) {
6068
validationException = addValidationError(failure, validationException);
6169
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.elasticsearch.xpack.core.dataframe.transforms.pivot;
88

9+
import org.elasticsearch.common.Nullable;
910
import org.elasticsearch.common.io.stream.StreamInput;
1011
import org.elasticsearch.common.io.stream.StreamOutput;
1112
import org.elasticsearch.common.io.stream.Writeable;
@@ -35,6 +36,7 @@ public class PivotConfig implements Writeable, ToXContentObject {
3536
private static final String NAME = "data_frame_transform_pivot";
3637
private final GroupConfig groups;
3738
private final AggregationConfig aggregationConfig;
39+
private final Integer maxPageSearchSize;
3840

3941
private static final ConstructingObjectParser<PivotConfig, Void> STRICT_PARSER = createParser(false);
4042
private static final ConstructingObjectParser<PivotConfig, Void> LENIENT_PARSER = createParser(true);
@@ -61,33 +63,39 @@ private static ConstructingObjectParser<PivotConfig, Void> createParser(boolean
6163
throw new IllegalArgumentException("Required [aggregations]");
6264
}
6365

64-
return new PivotConfig(groups, aggregationConfig);
66+
return new PivotConfig(groups, aggregationConfig, (Integer)args[3]);
6567
});
6668

6769
parser.declareObject(constructorArg(),
6870
(p, c) -> (GroupConfig.fromXContent(p, lenient)), DataFrameField.GROUP_BY);
6971

7072
parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGREGATIONS);
7173
parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGS);
74+
parser.declareInt(optionalConstructorArg(), DataFrameField.MAX_PAGE_SEARCH_SIZE);
7275

7376
return parser;
7477
}
7578

76-
public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig) {
79+
public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) {
7780
this.groups = ExceptionsHelper.requireNonNull(groups, DataFrameField.GROUP_BY.getPreferredName());
7881
this.aggregationConfig = ExceptionsHelper.requireNonNull(aggregationConfig, DataFrameField.AGGREGATIONS.getPreferredName());
82+
this.maxPageSearchSize = maxPageSearchSize;
7983
}
8084

8185
public PivotConfig(StreamInput in) throws IOException {
8286
this.groups = new GroupConfig(in);
8387
this.aggregationConfig = new AggregationConfig(in);
88+
this.maxPageSearchSize = in.readOptionalInt();
8489
}
8590

8691
@Override
8792
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
8893
builder.startObject();
8994
builder.field(DataFrameField.GROUP_BY.getPreferredName(), groups);
9095
builder.field(DataFrameField.AGGREGATIONS.getPreferredName(), aggregationConfig);
96+
if (maxPageSearchSize != null) {
97+
builder.field(DataFrameField.MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
98+
}
9199
builder.endObject();
92100
return builder;
93101
}
@@ -113,6 +121,7 @@ public void toCompositeAggXContent(XContentBuilder builder, Params params) throw
113121
public void writeTo(StreamOutput out) throws IOException {
114122
groups.writeTo(out);
115123
aggregationConfig.writeTo(out);
124+
out.writeOptionalInt(maxPageSearchSize);
116125
}
117126

118127
public AggregationConfig getAggregationConfig() {
@@ -123,6 +132,11 @@ public GroupConfig getGroupConfig() {
123132
return groups;
124133
}
125134

135+
@Nullable
136+
public Integer getMaxPageSearchSize() {
137+
return maxPageSearchSize;
138+
}
139+
126140
@Override
127141
public boolean equals(Object other) {
128142
if (this == other) {
@@ -135,12 +149,14 @@ public boolean equals(Object other) {
135149

136150
final PivotConfig that = (PivotConfig) other;
137151

138-
return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig);
152+
return Objects.equals(this.groups, that.groups)
153+
&& Objects.equals(this.aggregationConfig, that.aggregationConfig)
154+
&& Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize);
139155
}
140156

141157
@Override
142158
public int hashCode() {
143-
return Objects.hash(groups, aggregationConfig);
159+
return Objects.hash(groups, aggregationConfig, maxPageSearchSize);
144160
}
145161

146162
public boolean isValid() {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,15 @@
2424
public class PivotConfigTests extends AbstractSerializingDataFrameTestCase<PivotConfig> {
2525

2626
public static PivotConfig randomPivotConfig() {
27-
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig());
27+
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
28+
AggregationConfigTests.randomAggregationConfig(),
29+
randomBoolean() ? null : randomIntBetween(10, 10_000));
2830
}
2931

3032
public static PivotConfig randomInvalidPivotConfig() {
31-
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomInvalidAggregationConfig());
33+
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
34+
AggregationConfigTests.randomInvalidAggregationConfig(),
35+
randomBoolean() ? null : randomIntBetween(10, 10_000));
3236
}
3337

3438
@Override

x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,13 @@ protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregat
172172

173173
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
174174
AggregatorFactories.Builder aggregations) throws Exception {
175-
return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations));
175+
return createPivotConfig(groups, aggregations, null);
176+
}
177+
178+
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
179+
AggregatorFactories.Builder aggregations,
180+
Integer size) throws Exception {
181+
return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations), size);
176182
}
177183

178184
protected DataFrameTransformConfig createTransformConfig(String id,

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void testGetProgress() throws Exception {
130130
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
131131
aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
132132
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
133-
PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig);
133+
PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
134134
DataFrameTransformConfig config = new DataFrameTransformConfig("get_progress_transform",
135135
sourceConfig,
136136
destConfig,
@@ -149,7 +149,7 @@ public void testGetProgress() throws Exception {
149149

150150

151151
QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26"));
152-
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig);
152+
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
153153
sourceConfig = new SourceConfig(new String[]{REVIEWS_INDEX_NAME}, queryConfig);
154154
config = new DataFrameTransformConfig("get_progress_transform",
155155
sourceConfig,

0 commit comments

Comments
 (0)