Skip to content

Commit 54ed2e3

Browse files
author
Hendrik Muhs
authored
[Transform] implement retention policy to delete data from a transform (#67832)
add a retention policy to transform to delete data that is considered outdated as part of a transform checkpoint. fixes #67916
1 parent 2096050 commit 54ed2e3

File tree

72 files changed

+2395
-799
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+2395
-799
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/transform/TransformNamedXContentProvider.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
package org.elasticsearch.client.transform;
1010

11+
import org.elasticsearch.client.transform.transforms.RetentionPolicyConfig;
1112
import org.elasticsearch.client.transform.transforms.SyncConfig;
13+
import org.elasticsearch.client.transform.transforms.TimeRetentionPolicyConfig;
1214
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
1315
import org.elasticsearch.common.ParseField;
1416
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -22,9 +24,13 @@ public class TransformNamedXContentProvider implements NamedXContentProvider {
2224
@Override
2325
public List<NamedXContentRegistry.Entry> getNamedXContentParsers() {
2426
return Arrays.asList(
25-
new NamedXContentRegistry.Entry(SyncConfig.class,
26-
new ParseField(TimeSyncConfig.NAME),
27-
TimeSyncConfig::fromXContent));
27+
new NamedXContentRegistry.Entry(SyncConfig.class, new ParseField(TimeSyncConfig.NAME), TimeSyncConfig::fromXContent),
28+
new NamedXContentRegistry.Entry(
29+
RetentionPolicyConfig.class,
30+
new ParseField(TimeRetentionPolicyConfig.NAME),
31+
TimeRetentionPolicyConfig::fromXContent
32+
)
33+
);
2834
}
2935

3036
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.client.transform.transforms;
10+
11+
import org.elasticsearch.common.xcontent.ToXContentObject;
12+
13+
public interface RetentionPolicyConfig extends ToXContentObject {
14+
15+
/**
16+
* Returns the name of the writeable object
17+
*/
18+
String getName();
19+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.client.transform.transforms;
10+
11+
import org.elasticsearch.common.ParseField;
12+
import org.elasticsearch.common.unit.TimeValue;
13+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
14+
import org.elasticsearch.common.xcontent.ObjectParser;
15+
import org.elasticsearch.common.xcontent.XContentBuilder;
16+
import org.elasticsearch.common.xcontent.XContentParser;
17+
18+
import java.io.IOException;
19+
import java.util.Objects;
20+
21+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
22+
23+
public class TimeRetentionPolicyConfig implements RetentionPolicyConfig {
24+
25+
public static final String NAME = "time";
26+
27+
private static final ParseField FIELD = new ParseField("field");
28+
private static final ParseField MAX_AGE = new ParseField("max_age");
29+
30+
private final String field;
31+
private final TimeValue maxAge;
32+
33+
private static final ConstructingObjectParser<TimeRetentionPolicyConfig, Void> PARSER = new ConstructingObjectParser<>(
34+
"time_retention_policy_config",
35+
true,
36+
args -> new TimeRetentionPolicyConfig((String) args[0], args[1] != null ? (TimeValue) args[1] : TimeValue.ZERO)
37+
);
38+
39+
static {
40+
PARSER.declareString(constructorArg(), FIELD);
41+
PARSER.declareField(
42+
constructorArg(),
43+
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_AGE.getPreferredName()),
44+
MAX_AGE,
45+
ObjectParser.ValueType.STRING
46+
);
47+
}
48+
49+
public static TimeRetentionPolicyConfig fromXContent(XContentParser parser) {
50+
return PARSER.apply(parser, null);
51+
}
52+
53+
public TimeRetentionPolicyConfig(String field, TimeValue maxAge) {
54+
this.field = field;
55+
this.maxAge = maxAge;
56+
}
57+
58+
@Override
59+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
60+
builder.startObject();
61+
builder.field(FIELD.getPreferredName(), field);
62+
builder.field(MAX_AGE.getPreferredName(), maxAge.getStringRep());
63+
builder.endObject();
64+
return builder;
65+
}
66+
67+
public String getField() {
68+
return field;
69+
}
70+
71+
public TimeValue getMaxAge() {
72+
return maxAge;
73+
}
74+
75+
@Override
76+
public boolean equals(Object other) {
77+
if (this == other) {
78+
return true;
79+
}
80+
81+
if (other == null || getClass() != other.getClass()) {
82+
return false;
83+
}
84+
85+
final TimeRetentionPolicyConfig that = (TimeRetentionPolicyConfig) other;
86+
87+
return Objects.equals(this.field, that.field) && Objects.equals(this.maxAge, that.maxAge);
88+
}
89+
90+
@Override
91+
public int hashCode() {
92+
return Objects.hash(field, maxAge);
93+
}
94+
95+
@Override
96+
public String getName() {
97+
return NAME;
98+
}
99+
}

client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformConfig.java

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.elasticsearch.common.xcontent.ToXContentObject;
2222
import org.elasticsearch.common.xcontent.XContentBuilder;
2323
import org.elasticsearch.common.xcontent.XContentParser;
24-
import org.elasticsearch.common.xcontent.XContentParserUtils;
2524

2625
import java.io.IOException;
2726
import java.time.Instant;
@@ -41,6 +40,7 @@ public class TransformConfig implements ToXContentObject {
4140
public static final ParseField SETTINGS = new ParseField("settings");
4241
public static final ParseField VERSION = new ParseField("version");
4342
public static final ParseField CREATE_TIME = new ParseField("create_time");
43+
public static final ParseField RETENTION_POLICY = new ParseField("retention_policy");
4444
// types of transforms
4545
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
4646
public static final ParseField LATEST_TRANSFORM = new ParseField("latest");
@@ -54,6 +54,7 @@ public class TransformConfig implements ToXContentObject {
5454
private final PivotConfig pivotConfig;
5555
private final LatestConfig latestConfig;
5656
private final String description;
57+
private final RetentionPolicyConfig retentionPolicyConfig;
5758
private final Version transformVersion;
5859
private final Instant createTime;
5960

@@ -70,8 +71,9 @@ public class TransformConfig implements ToXContentObject {
7071
LatestConfig latestConfig = (LatestConfig) args[6];
7172
String description = (String) args[7];
7273
SettingsConfig settings = (SettingsConfig) args[8];
73-
Instant createTime = (Instant) args[9];
74-
String transformVersion = (String) args[10];
74+
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[9];
75+
Instant createTime = (Instant) args[10];
76+
String transformVersion = (String) args[11];
7577
return new TransformConfig(
7678
id,
7779
source,
@@ -82,6 +84,7 @@ public class TransformConfig implements ToXContentObject {
8284
latestConfig,
8385
description,
8486
settings,
87+
retentionPolicyConfig,
8588
createTime,
8689
transformVersion
8790
);
@@ -98,11 +101,16 @@ public class TransformConfig implements ToXContentObject {
98101
FREQUENCY,
99102
ObjectParser.ValueType.STRING
100103
);
101-
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC);
104+
PARSER.declareNamedObject(optionalConstructorArg(), (p, c, n) -> p.namedObject(SyncConfig.class, n, c), SYNC);
102105
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
103106
PARSER.declareObject(optionalConstructorArg(), (p, c) -> LatestConfig.fromXContent(p), LATEST_TRANSFORM);
104107
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
105108
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p), SETTINGS);
109+
PARSER.declareNamedObject(
110+
optionalConstructorArg(),
111+
(p, c, n) -> p.namedObject(RetentionPolicyConfig.class, n, c),
112+
RETENTION_POLICY
113+
);
106114
PARSER.declareField(
107115
optionalConstructorArg(),
108116
p -> TimeUtil.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()),
@@ -112,14 +120,6 @@ public class TransformConfig implements ToXContentObject {
112120
PARSER.declareString(optionalConstructorArg(), VERSION);
113121
}
114122

115-
private static SyncConfig parseSyncConfig(XContentParser parser) throws IOException {
116-
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
117-
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
118-
SyncConfig syncConfig = parser.namedObject(SyncConfig.class, parser.currentName(), true);
119-
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);
120-
return syncConfig;
121-
}
122-
123123
public static TransformConfig fromXContent(final XContentParser parser) {
124124
return PARSER.apply(parser, null);
125125
}
@@ -136,7 +136,7 @@ public static TransformConfig fromXContent(final XContentParser parser) {
136136
* @return A TransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
137137
*/
138138
public static TransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
139-
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null, null, null);
139+
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null, null, null, null);
140140
}
141141

142142
/**
@@ -151,7 +151,7 @@ public static TransformConfig forPreview(final SourceConfig source, final PivotC
151151
* @return A TransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
152152
*/
153153
public static TransformConfig forPreview(final SourceConfig source, final LatestConfig latestConfig) {
154-
return new TransformConfig(null, source, null, null, null, null, latestConfig, null, null, null, null);
154+
return new TransformConfig(null, source, null, null, null, null, latestConfig, null, null, null, null, null);
155155
}
156156

157157
TransformConfig(
@@ -164,6 +164,7 @@ public static TransformConfig forPreview(final SourceConfig source, final Latest
164164
final LatestConfig latestConfig,
165165
final String description,
166166
final SettingsConfig settings,
167+
final RetentionPolicyConfig retentionPolicyConfig,
167168
final Instant createTime,
168169
final String version
169170
) {
@@ -176,6 +177,7 @@ public static TransformConfig forPreview(final SourceConfig source, final Latest
176177
this.latestConfig = latestConfig;
177178
this.description = description;
178179
this.settings = settings;
180+
this.retentionPolicyConfig = retentionPolicyConfig;
179181
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
180182
this.transformVersion = version == null ? null : Version.fromString(version);
181183
}
@@ -226,6 +228,11 @@ public SettingsConfig getSettings() {
226228
return settings;
227229
}
228230

231+
@Nullable
232+
public RetentionPolicyConfig getRetentionPolicyConfig() {
233+
return retentionPolicyConfig;
234+
}
235+
229236
@Override
230237
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
231238
builder.startObject();
@@ -258,6 +265,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
258265
if (settings != null) {
259266
builder.field(SETTINGS.getPreferredName(), settings);
260267
}
268+
if (retentionPolicyConfig != null) {
269+
builder.startObject(RETENTION_POLICY.getPreferredName());
270+
builder.field(retentionPolicyConfig.getName(), retentionPolicyConfig);
271+
builder.endObject();
272+
}
261273
if (createTime != null) {
262274
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
263275
}
@@ -290,13 +302,26 @@ public boolean equals(Object other) {
290302
&& Objects.equals(this.settings, that.settings)
291303
&& Objects.equals(this.createTime, that.createTime)
292304
&& Objects.equals(this.pivotConfig, that.pivotConfig)
293-
&& Objects.equals(this.latestConfig, that.latestConfig);
305+
&& Objects.equals(this.latestConfig, that.latestConfig)
306+
&& Objects.equals(this.retentionPolicyConfig, that.retentionPolicyConfig);
294307
}
295308

296309
@Override
297310
public int hashCode() {
298311
return Objects.hash(
299-
id, source, dest, frequency, syncConfig, settings, createTime, transformVersion, pivotConfig, latestConfig, description);
312+
id,
313+
source,
314+
dest,
315+
frequency,
316+
syncConfig,
317+
settings,
318+
createTime,
319+
transformVersion,
320+
pivotConfig,
321+
latestConfig,
322+
description,
323+
retentionPolicyConfig
324+
);
300325
}
301326

302327
@Override
@@ -319,6 +344,7 @@ public static class Builder {
319344
private LatestConfig latestConfig;
320345
private SettingsConfig settings;
321346
private String description;
347+
private RetentionPolicyConfig retentionPolicyConfig;
322348

323349
public Builder setId(String id) {
324350
this.id = id;
@@ -365,9 +391,26 @@ public Builder setSettings(SettingsConfig settings) {
365391
return this;
366392
}
367393

394+
public Builder setRetentionPolicyConfig(RetentionPolicyConfig retentionPolicyConfig) {
395+
this.retentionPolicyConfig = retentionPolicyConfig;
396+
return this;
397+
}
398+
368399
public TransformConfig build() {
369400
return new TransformConfig(
370-
id, source, dest, frequency, syncConfig, pivotConfig, latestConfig, description, settings, null, null);
401+
id,
402+
source,
403+
dest,
404+
frequency,
405+
syncConfig,
406+
pivotConfig,
407+
latestConfig,
408+
description,
409+
settings,
410+
retentionPolicyConfig,
411+
null,
412+
null
413+
);
371414
}
372415
}
373416
}

0 commit comments

Comments
 (0)