@@ -34,12 +34,13 @@ public class SettingsConfig implements Writeable, ToXContentObject {
3434 private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1 ;
3535 private static final float DEFAULT_DOCS_PER_SECOND = -1F ;
3636 private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1 ;
37+ private static final int DEFAULT_INTERIM_RESULTS = -1 ;
3738
3839 private static ConstructingObjectParser <SettingsConfig , Void > createParser (boolean lenient ) {
3940 ConstructingObjectParser <SettingsConfig , Void > parser = new ConstructingObjectParser <>(
4041 "transform_config_settings" ,
4142 lenient ,
42- args -> new SettingsConfig ((Integer ) args [0 ], (Float ) args [1 ], (Integer ) args [2 ])
43+ args -> new SettingsConfig ((Integer ) args [0 ], (Float ) args [1 ], (Integer ) args [2 ], ( Integer ) args [ 3 ] )
4344 );
4445 parser .declareIntOrNull (optionalConstructorArg (), DEFAULT_MAX_PAGE_SEARCH_SIZE , TransformField .MAX_PAGE_SEARCH_SIZE );
4546 parser .declareFloatOrNull (optionalConstructorArg (), DEFAULT_DOCS_PER_SECOND , TransformField .DOCS_PER_SECOND );
@@ -50,25 +51,39 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
5051 TransformField .DATES_AS_EPOCH_MILLIS ,
5152 ValueType .BOOLEAN_OR_NULL
5253 );
54+ // this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser
55+ parser .declareField (
56+ optionalConstructorArg (),
57+ p -> p .currentToken () == XContentParser .Token .VALUE_NULL ? DEFAULT_INTERIM_RESULTS : p .booleanValue () ? 1 : 0 ,
58+ TransformField .INTERIM_RESULTS ,
59+ ValueType .BOOLEAN_OR_NULL
60+ );
5361 return parser ;
5462 }
5563
5664 private final Integer maxPageSearchSize ;
5765 private final Float docsPerSecond ;
5866 private final Integer datesAsEpochMillis ;
67+ private final Integer interimResults ;
5968
6069 public SettingsConfig () {
61- this (null , null , (Integer ) null );
70+ this (null , null , (Integer ) null , ( Integer ) null );
6271 }
6372
64- public SettingsConfig (Integer maxPageSearchSize , Float docsPerSecond , Boolean datesAsEpochMillis ) {
65- this (maxPageSearchSize , docsPerSecond , datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0 );
73+ public SettingsConfig (Integer maxPageSearchSize , Float docsPerSecond , Boolean datesAsEpochMillis , Boolean interimResults ) {
74+ this (
75+ maxPageSearchSize ,
76+ docsPerSecond ,
77+ datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0 ,
78+ interimResults == null ? null : interimResults ? 1 : 0
79+ );
6680 }
6781
68- public SettingsConfig (Integer maxPageSearchSize , Float docsPerSecond , Integer datesAsEpochMillis ) {
82+ public SettingsConfig (Integer maxPageSearchSize , Float docsPerSecond , Integer datesAsEpochMillis , Integer interimResults ) {
6983 this .maxPageSearchSize = maxPageSearchSize ;
7084 this .docsPerSecond = docsPerSecond ;
7185 this .datesAsEpochMillis = datesAsEpochMillis ;
86+ this .interimResults = interimResults ;
7287 }
7388
7489 public SettingsConfig (final StreamInput in ) throws IOException {
@@ -79,6 +94,11 @@ public SettingsConfig(final StreamInput in) throws IOException {
7994 } else {
8095 this .datesAsEpochMillis = DEFAULT_DATES_AS_EPOCH_MILLIS ;
8196 }
97+ if (in .getVersion ().onOrAfter (Version .CURRENT )) { // TODO: 7.15
98+ this .interimResults = in .readOptionalInt ();
99+ } else {
100+ this .interimResults = DEFAULT_INTERIM_RESULTS ;
101+ }
82102 }
83103
84104 public Integer getMaxPageSearchSize () {
@@ -97,6 +117,14 @@ public Integer getDatesAsEpochMillisForUpdate() {
97117 return datesAsEpochMillis ;
98118 }
99119
120+ public Boolean getInterimResults () {
121+ return interimResults != null ? interimResults > 0 : null ;
122+ }
123+
124+ public Integer getInterimResultsForUpdate () {
125+ return interimResults ;
126+ }
127+
100128 public ActionRequestValidationException validate (ActionRequestValidationException validationException ) {
101129 if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService .DEFAULT_MAX_BUCKETS )) {
102130 validationException = addValidationError (
@@ -118,6 +146,9 @@ public void writeTo(StreamOutput out) throws IOException {
118146 if (out .getVersion ().onOrAfter (Version .V_7_11_0 )) {
119147 out .writeOptionalInt (datesAsEpochMillis );
120148 }
149+ if (out .getVersion ().onOrAfter (Version .CURRENT )) { // TODO: 7.15
150+ out .writeOptionalInt (interimResults );
151+ }
121152 }
122153
123154 @ Override
@@ -133,6 +164,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
133164 if (datesAsEpochMillis != null && (datesAsEpochMillis .equals (DEFAULT_DATES_AS_EPOCH_MILLIS ) == false )) {
134165 builder .field (TransformField .DATES_AS_EPOCH_MILLIS .getPreferredName (), datesAsEpochMillis > 0 ? true : false );
135166 }
167+ if (interimResults != null && (interimResults .equals (DEFAULT_INTERIM_RESULTS ) == false )) {
168+ builder .field (TransformField .INTERIM_RESULTS .getPreferredName (), interimResults > 0 ? true : false );
169+ }
136170 builder .endObject ();
137171 return builder ;
138172 }
@@ -149,12 +183,13 @@ public boolean equals(Object other) {
149183 SettingsConfig that = (SettingsConfig ) other ;
150184 return Objects .equals (maxPageSearchSize , that .maxPageSearchSize )
151185 && Objects .equals (docsPerSecond , that .docsPerSecond )
152- && Objects .equals (datesAsEpochMillis , that .datesAsEpochMillis );
186+ && Objects .equals (datesAsEpochMillis , that .datesAsEpochMillis )
187+ && Objects .equals (interimResults , that .interimResults );
153188 }
154189
155190 @ Override
156191 public int hashCode () {
157- return Objects .hash (maxPageSearchSize , docsPerSecond , datesAsEpochMillis );
192+ return Objects .hash (maxPageSearchSize , docsPerSecond , datesAsEpochMillis , interimResults );
158193 }
159194
160195 @ Override
@@ -170,6 +205,7 @@ public static class Builder {
170205 private Integer maxPageSearchSize ;
171206 private Float docsPerSecond ;
172207 private Integer datesAsEpochMillis ;
208+ private Integer interimResults ;
173209
174210 /**
175211 * Default builder
@@ -185,6 +221,7 @@ public Builder(SettingsConfig base) {
185221 this .maxPageSearchSize = base .maxPageSearchSize ;
186222 this .docsPerSecond = base .docsPerSecond ;
187223 this .datesAsEpochMillis = base .datesAsEpochMillis ;
224+ this .interimResults = base .interimResults ;
188225 }
189226
190227 /**
@@ -231,6 +268,19 @@ public Builder setDatesAsEpochMillis(Boolean datesAsEpochMillis) {
231268 return this ;
232269 }
233270
271+ /**
272+ * Whether to write interim results in transform checkpoints.
273+ *
274+ * An explicit `null` resets to default.
275+ *
276+ * @param interimResults true if interim results should be written.
277+ * @return the {@link Builder} with interimResults set.
278+ */
279+ public Builder setInterimResults (Boolean interimResults ) {
280+ this .interimResults = interimResults == null ? DEFAULT_INTERIM_RESULTS : interimResults ? 1 : 0 ;
281+ return this ;
282+ }
283+
234284 /**
235285 * Update settings according to given settings config.
236286 *
@@ -253,12 +303,17 @@ public Builder update(SettingsConfig update) {
253303 ? null
254304 : update .getDatesAsEpochMillisForUpdate ();
255305 }
306+ if (update .getInterimResultsForUpdate () != null ) {
307+ this .interimResults = update .getInterimResultsForUpdate ().equals (DEFAULT_INTERIM_RESULTS )
308+ ? null
309+ : update .getInterimResultsForUpdate ();
310+ }
256311
257312 return this ;
258313 }
259314
260315 public SettingsConfig build () {
261- return new SettingsConfig (maxPageSearchSize , docsPerSecond , datesAsEpochMillis );
316+ return new SettingsConfig (maxPageSearchSize , docsPerSecond , datesAsEpochMillis , interimResults );
262317 }
263318 }
264319}
0 commit comments