Skip to content

Commit 8db30eb

Browse files
author
Hendrik Muhs
authored
[7.x][Transform] report last search time in transform stats (#66718) (#67779)
transforms reports the last time changes where detected with changes_last_detected_at, however that doesn't tell a user it searched for changes, this PR adds a field last_search_time to report when transform searched for changes the last time. fixes #66410 relates #66367 backport #66718
1 parent 2420359 commit 8db30eb

File tree

13 files changed

+185
-81
lines changed

13 files changed

+185
-81
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java

Lines changed: 58 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -35,46 +35,69 @@ public class TransformCheckpointingInfo {
3535
public static final ParseField NEXT_CHECKPOINT = new ParseField("next", "in_progress");
3636
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
3737
public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at");
38+
public static final ParseField LAST_SEARCH_TIME = new ParseField("last_search_time");
3839

3940
private final TransformCheckpointStats last;
4041
private final TransformCheckpointStats next;
4142
private final long operationsBehind;
4243
private final Instant changesLastDetectedAt;
43-
44-
private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER =
45-
new ConstructingObjectParser<>(
46-
"transform_checkpointing_info",
47-
true,
48-
a -> {
49-
long behind = a[2] == null ? 0L : (Long) a[2];
50-
Instant changesLastDetectedAt = (Instant)a[3];
51-
return new TransformCheckpointingInfo(
52-
a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0],
53-
a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1],
54-
behind,
55-
changesLastDetectedAt);
56-
});
44+
private final Instant lastSearchTime;
45+
46+
private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
47+
"transform_checkpointing_info",
48+
true,
49+
a -> {
50+
long behind = a[2] == null ? 0L : (Long) a[2];
51+
Instant changesLastDetectedAt = (Instant) a[3];
52+
Instant lastSearchTime = (Instant) a[4];
53+
return new TransformCheckpointingInfo(
54+
a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0],
55+
a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1],
56+
behind,
57+
changesLastDetectedAt,
58+
lastSearchTime
59+
);
60+
}
61+
);
5762

5863
static {
59-
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
60-
(p, c) -> TransformCheckpointStats.fromXContent(p), LAST_CHECKPOINT);
61-
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
62-
(p, c) -> TransformCheckpointStats.fromXContent(p), NEXT_CHECKPOINT);
64+
LENIENT_PARSER.declareObject(
65+
ConstructingObjectParser.optionalConstructorArg(),
66+
(p, c) -> TransformCheckpointStats.fromXContent(p),
67+
LAST_CHECKPOINT
68+
);
69+
LENIENT_PARSER.declareObject(
70+
ConstructingObjectParser.optionalConstructorArg(),
71+
(p, c) -> TransformCheckpointStats.fromXContent(p),
72+
NEXT_CHECKPOINT
73+
);
6374
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
64-
LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
75+
LENIENT_PARSER.declareField(
76+
ConstructingObjectParser.optionalConstructorArg(),
6577
p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()),
6678
CHANGES_LAST_DETECTED_AT,
67-
ObjectParser.ValueType.VALUE);
79+
ObjectParser.ValueType.VALUE
80+
);
81+
LENIENT_PARSER.declareField(
82+
ConstructingObjectParser.optionalConstructorArg(),
83+
p -> TimeUtil.parseTimeFieldToInstant(p, LAST_SEARCH_TIME.getPreferredName()),
84+
LAST_SEARCH_TIME,
85+
ObjectParser.ValueType.VALUE
86+
);
6887
}
6988

70-
public TransformCheckpointingInfo(TransformCheckpointStats last,
71-
TransformCheckpointStats next,
72-
long operationsBehind,
73-
Instant changesLastDetectedAt) {
89+
public TransformCheckpointingInfo(
90+
TransformCheckpointStats last,
91+
TransformCheckpointStats next,
92+
long operationsBehind,
93+
Instant changesLastDetectedAt,
94+
Instant lastSearchTime
95+
) {
7496
this.last = Objects.requireNonNull(last);
7597
this.next = Objects.requireNonNull(next);
7698
this.operationsBehind = operationsBehind;
7799
this.changesLastDetectedAt = changesLastDetectedAt;
100+
this.lastSearchTime = lastSearchTime;
78101
}
79102

80103
public TransformCheckpointStats getLast() {
@@ -94,13 +117,18 @@ public Instant getChangesLastDetectedAt() {
94117
return changesLastDetectedAt;
95118
}
96119

120+
@Nullable
121+
public Instant getLastSearchTime() {
122+
return lastSearchTime;
123+
}
124+
97125
public static TransformCheckpointingInfo fromXContent(XContentParser p) {
98126
return LENIENT_PARSER.apply(p, null);
99127
}
100128

101129
@Override
102130
public int hashCode() {
103-
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt);
131+
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, lastSearchTime);
104132
}
105133

106134
@Override
@@ -115,10 +143,11 @@ public boolean equals(Object other) {
115143

116144
TransformCheckpointingInfo that = (TransformCheckpointingInfo) other;
117145

118-
return Objects.equals(this.last, that.last) &&
119-
Objects.equals(this.next, that.next) &&
120-
this.operationsBehind == that.operationsBehind &&
121-
Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
146+
return Objects.equals(this.last, that.last)
147+
&& Objects.equals(this.next, that.next)
148+
&& this.operationsBehind == that.operationsBehind
149+
&& Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt)
150+
&& Objects.equals(this.lastSearchTime, that.lastSearchTime);
122151
}
123152

124153
}

client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,22 @@
3030
public class TransformCheckpointingInfoTests extends ESTestCase {
3131

3232
public void testFromXContent() throws IOException {
33-
xContentTester(this::createParser,
33+
xContentTester(
34+
this::createParser,
3435
TransformCheckpointingInfoTests::randomTransformCheckpointingInfo,
3536
TransformCheckpointingInfoTests::toXContent,
36-
TransformCheckpointingInfo::fromXContent)
37-
.supportsUnknownFields(false)
38-
.test();
37+
TransformCheckpointingInfo::fromXContent
38+
).supportsUnknownFields(false).test();
3939
}
4040

4141
public static TransformCheckpointingInfo randomTransformCheckpointingInfo() {
4242
return new TransformCheckpointingInfo(
4343
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
4444
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
4545
randomLongBetween(0, 10000),
46-
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()));
46+
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()),
47+
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())
48+
);
4749
}
4850

4951
public static void toXContent(TransformCheckpointingInfo info, XContentBuilder builder) throws IOException {
@@ -60,6 +62,9 @@ public static void toXContent(TransformCheckpointingInfo info, XContentBuilder b
6062
if (info.getChangesLastDetectedAt() != null) {
6163
builder.field(TransformCheckpointingInfo.CHANGES_LAST_DETECTED_AT.getPreferredName(), info.getChangesLastDetectedAt());
6264
}
65+
if (info.getLastSearchTime() != null) {
66+
builder.field(TransformCheckpointingInfo.LAST_SEARCH_TIME.getPreferredName(), info.getLastSearchTime());
67+
}
6368
builder.endObject();
6469
}
6570
}

client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,22 @@
3131

3232
public class TransformCheckpointingInfoTests extends AbstractResponseTestCase<
3333
org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo,
34-
TransformCheckpointingInfo> {
34+
TransformCheckpointingInfo> {
3535

3636
public static org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo randomTransformCheckpointingInfo() {
3737
return new org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo(
3838
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
3939
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
4040
randomNonNegativeLong(),
41-
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()));
41+
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()),
42+
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())
43+
);
4244
}
4345

4446
@Override
45-
protected org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo
46-
createServerTestInstance(XContentType xContentType) {
47+
protected org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo createServerTestInstance(
48+
XContentType xContentType
49+
) {
4750
return randomTransformCheckpointingInfo();
4851
}
4952

@@ -53,8 +56,10 @@ protected TransformCheckpointingInfo doParseToClientInstance(XContentParser pars
5356
}
5457

5558
@Override
56-
protected void assertInstances(org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance,
57-
TransformCheckpointingInfo clientInstance) {
59+
protected void assertInstances(
60+
org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance,
61+
TransformCheckpointingInfo clientInstance
62+
) {
5863
assertTransformCheckpointInfo(serverTestInstance, clientInstance);
5964
}
6065
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public static class TransformCheckpointingInfoBuilder {
4141
private TransformCheckpoint nextCheckpoint;
4242
private TransformCheckpoint sourceCheckpoint;
4343
private Instant changesLastDetectedAt;
44+
private Instant lastSearchTime;
4445
private long operationsBehind;
4546

4647
public TransformCheckpointingInfoBuilder() {}
@@ -76,7 +77,8 @@ public TransformCheckpointingInfo build() {
7677
nextCheckpoint.getTimeUpperBound()
7778
),
7879
operationsBehind,
79-
changesLastDetectedAt
80+
changesLastDetectedAt,
81+
lastSearchTime
8082
);
8183
}
8284

@@ -122,6 +124,11 @@ public TransformCheckpointingInfoBuilder setChangesLastDetectedAt(Instant change
122124
return this;
123125
}
124126

127+
public TransformCheckpointingInfoBuilder setLastSearchTime(Instant lastSearchTime) {
128+
this.lastSearchTime = lastSearchTime;
129+
return this;
130+
}
131+
125132
public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehind) {
126133
this.operationsBehind = operationsBehind;
127134
return this;
@@ -133,29 +140,34 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi
133140
TransformCheckpointStats.EMPTY,
134141
TransformCheckpointStats.EMPTY,
135142
0L,
143+
null,
136144
null
137145
);
138146

139147
public static final ParseField LAST_CHECKPOINT = new ParseField("last");
140148
public static final ParseField NEXT_CHECKPOINT = new ParseField("next");
141149
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
142150
public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at");
151+
public static final ParseField LAST_SEARCH_TIME = new ParseField("last_search_time");
143152
private final TransformCheckpointStats last;
144153
private final TransformCheckpointStats next;
145154
private final long operationsBehind;
146155
private final Instant changesLastDetectedAt;
156+
private final Instant lastSearchTime;
147157

148158
private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
149159
"data_frame_transform_checkpointing_info",
150160
true,
151161
a -> {
152162
long behind = a[2] == null ? 0L : (Long) a[2];
153163
Instant changesLastDetectedAt = (Instant) a[3];
164+
Instant lastSearchTime = (Instant) a[4];
154165
return new TransformCheckpointingInfo(
155166
a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0],
156167
a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1],
157168
behind,
158-
changesLastDetectedAt
169+
changesLastDetectedAt,
170+
lastSearchTime
159171
);
160172
}
161173
);
@@ -178,6 +190,12 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi
178190
CHANGES_LAST_DETECTED_AT,
179191
ObjectParser.ValueType.VALUE
180192
);
193+
LENIENT_PARSER.declareField(
194+
ConstructingObjectParser.optionalConstructorArg(),
195+
p -> TimeUtils.parseTimeFieldToInstant(p, LAST_SEARCH_TIME.getPreferredName()),
196+
LAST_SEARCH_TIME,
197+
ObjectParser.ValueType.VALUE
198+
);
181199
}
182200

183201
/**
@@ -187,18 +205,21 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi
187205
* @param last stats of the last checkpoint
188206
* @param next stats of the next checkpoint
189207
* @param operationsBehind counter of operations the current checkpoint is behind source
190-
* @param changesLastDetectedAt the last time the source indices were checked for changes
208+
* @param changesLastDetectedAt the last time the source indices changes have been found
209+
* @param lastSearchTime the last time the source indices were searched
191210
*/
192211
public TransformCheckpointingInfo(
193212
TransformCheckpointStats last,
194213
TransformCheckpointStats next,
195214
long operationsBehind,
196-
Instant changesLastDetectedAt
215+
Instant changesLastDetectedAt,
216+
Instant lastSearchTime
197217
) {
198218
this.last = Objects.requireNonNull(last);
199219
this.next = Objects.requireNonNull(next);
200220
this.operationsBehind = operationsBehind;
201-
this.changesLastDetectedAt = changesLastDetectedAt == null ? null : Instant.ofEpochMilli(changesLastDetectedAt.toEpochMilli());
221+
this.changesLastDetectedAt = changesLastDetectedAt;
222+
this.lastSearchTime = lastSearchTime;
202223
}
203224

204225
public TransformCheckpointingInfo(StreamInput in) throws IOException {
@@ -210,6 +231,11 @@ public TransformCheckpointingInfo(StreamInput in) throws IOException {
210231
} else {
211232
changesLastDetectedAt = null;
212233
}
234+
if (in.getVersion().onOrAfter(Version.V_7_12_0)) {
235+
lastSearchTime = in.readOptionalInstant();
236+
} else {
237+
lastSearchTime = null;
238+
}
213239
}
214240

215241
public TransformCheckpointStats getLast() {
@@ -228,6 +254,10 @@ public Instant getChangesLastDetectedAt() {
228254
return changesLastDetectedAt;
229255
}
230256

257+
public Instant getLastSearchTime() {
258+
return lastSearchTime;
259+
}
260+
231261
@Override
232262
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
233263
builder.startObject();
@@ -245,6 +275,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
245275
changesLastDetectedAt.toEpochMilli()
246276
);
247277
}
278+
if (lastSearchTime != null) {
279+
builder.timeField(
280+
LAST_SEARCH_TIME.getPreferredName(),
281+
LAST_SEARCH_TIME.getPreferredName() + "_string",
282+
lastSearchTime.toEpochMilli()
283+
);
284+
}
248285
builder.endObject();
249286
return builder;
250287
}
@@ -257,6 +294,9 @@ public void writeTo(StreamOutput out) throws IOException {
257294
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
258295
out.writeOptionalInstant(changesLastDetectedAt);
259296
}
297+
if (out.getVersion().onOrAfter(Version.V_7_12_0)) {
298+
out.writeOptionalInstant(lastSearchTime);
299+
}
260300
}
261301

262302
public static TransformCheckpointingInfo fromXContent(XContentParser p) {
@@ -265,7 +305,7 @@ public static TransformCheckpointingInfo fromXContent(XContentParser p) {
265305

266306
@Override
267307
public int hashCode() {
268-
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt);
308+
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, lastSearchTime);
269309
}
270310

271311
@Override
@@ -283,7 +323,8 @@ public boolean equals(Object other) {
283323
return Objects.equals(this.last, that.last)
284324
&& Objects.equals(this.next, that.next)
285325
&& this.operationsBehind == that.operationsBehind
286-
&& Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
326+
&& Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt)
327+
&& Objects.equals(this.lastSearchTime, that.lastSearchTime);
287328
}
288329

289330
@Override

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public static TransformCheckpointingInfo randomTransformCheckpointingInfo() {
2222
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
2323
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
2424
randomNonNegativeLong(),
25+
randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000)),
2526
randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000))
2627
);
2728
}
@@ -46,7 +47,8 @@ public void testBackwardsSerialization() throws IOException {
4647
TransformCheckpointStats.EMPTY,
4748
TransformCheckpointStats.EMPTY,
4849
randomNonNegativeLong(),
49-
// changesLastDetectedAt is not serialized to past values, so when it is pulled back in, it will be null
50+
// changesLastDetectedAt, lastSearchTime is not serialized to past values, so when it is pulled back in, it will be null
51+
null,
5052
null
5153
);
5254
try (BytesStreamOutput output = new BytesStreamOutput()) {

0 commit comments

Comments
 (0)