Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,46 +35,69 @@ public class TransformCheckpointingInfo {
public static final ParseField NEXT_CHECKPOINT = new ParseField("next", "in_progress");
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at");
public static final ParseField LAST_SEARCH_TIME = new ParseField("last_search_time");

private final TransformCheckpointStats last;
private final TransformCheckpointStats next;
private final long operationsBehind;
private final Instant changesLastDetectedAt;

private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER =
new ConstructingObjectParser<>(
"transform_checkpointing_info",
true,
a -> {
long behind = a[2] == null ? 0L : (Long) a[2];
Instant changesLastDetectedAt = (Instant)a[3];
return new TransformCheckpointingInfo(
a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0],
a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1],
behind,
changesLastDetectedAt);
});
private final Instant lastSearchTime;

private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"transform_checkpointing_info",
true,
a -> {
long behind = a[2] == null ? 0L : (Long) a[2];
Instant changesLastDetectedAt = (Instant) a[3];
Instant lastSearchTime = (Instant) a[4];
return new TransformCheckpointingInfo(
a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0],
a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1],
behind,
changesLastDetectedAt,
lastSearchTime
);
}
);

static {
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TransformCheckpointStats.fromXContent(p), LAST_CHECKPOINT);
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TransformCheckpointStats.fromXContent(p), NEXT_CHECKPOINT);
LENIENT_PARSER.declareObject(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TransformCheckpointStats.fromXContent(p),
LAST_CHECKPOINT
);
LENIENT_PARSER.declareObject(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TransformCheckpointStats.fromXContent(p),
NEXT_CHECKPOINT
);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
LENIENT_PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()),
CHANGES_LAST_DETECTED_AT,
ObjectParser.ValueType.VALUE);
ObjectParser.ValueType.VALUE
);
LENIENT_PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, LAST_SEARCH_TIME.getPreferredName()),
LAST_SEARCH_TIME,
ObjectParser.ValueType.VALUE
);
}

public TransformCheckpointingInfo(TransformCheckpointStats last,
TransformCheckpointStats next,
long operationsBehind,
Instant changesLastDetectedAt) {
public TransformCheckpointingInfo(
TransformCheckpointStats last,
TransformCheckpointStats next,
long operationsBehind,
Instant changesLastDetectedAt,
Instant lastSearchTime
) {
this.last = Objects.requireNonNull(last);
this.next = Objects.requireNonNull(next);
this.operationsBehind = operationsBehind;
this.changesLastDetectedAt = changesLastDetectedAt;
this.lastSearchTime = lastSearchTime;
}

public TransformCheckpointStats getLast() {
Expand All @@ -94,13 +117,18 @@ public Instant getChangesLastDetectedAt() {
return changesLastDetectedAt;
}

@Nullable
public Instant getLastSearchTime() {
return lastSearchTime;
}

public static TransformCheckpointingInfo fromXContent(XContentParser p) {
return LENIENT_PARSER.apply(p, null);
}

@Override
public int hashCode() {
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt);
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, lastSearchTime);
}

@Override
Expand All @@ -115,10 +143,11 @@ public boolean equals(Object other) {

TransformCheckpointingInfo that = (TransformCheckpointingInfo) other;

return Objects.equals(this.last, that.last) &&
Objects.equals(this.next, that.next) &&
this.operationsBehind == that.operationsBehind &&
Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
return Objects.equals(this.last, that.last)
&& Objects.equals(this.next, that.next)
&& this.operationsBehind == that.operationsBehind
&& Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt)
&& Objects.equals(this.lastSearchTime, that.lastSearchTime);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,22 @@
public class TransformCheckpointingInfoTests extends ESTestCase {

public void testFromXContent() throws IOException {
xContentTester(this::createParser,
xContentTester(
this::createParser,
TransformCheckpointingInfoTests::randomTransformCheckpointingInfo,
TransformCheckpointingInfoTests::toXContent,
TransformCheckpointingInfo::fromXContent)
.supportsUnknownFields(false)
.test();
TransformCheckpointingInfo::fromXContent
).supportsUnknownFields(false).test();
}

public static TransformCheckpointingInfo randomTransformCheckpointingInfo() {
return new TransformCheckpointingInfo(
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
randomLongBetween(0, 10000),
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()));
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()),
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())
);
}

public static void toXContent(TransformCheckpointingInfo info, XContentBuilder builder) throws IOException {
Expand All @@ -60,6 +62,9 @@ public static void toXContent(TransformCheckpointingInfo info, XContentBuilder b
if (info.getChangesLastDetectedAt() != null) {
builder.field(TransformCheckpointingInfo.CHANGES_LAST_DETECTED_AT.getPreferredName(), info.getChangesLastDetectedAt());
}
if (info.getLastSearchTime() != null) {
builder.field(TransformCheckpointingInfo.LAST_SEARCH_TIME.getPreferredName(), info.getLastSearchTime());
}
builder.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,22 @@

public class TransformCheckpointingInfoTests extends AbstractResponseTestCase<
org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo,
TransformCheckpointingInfo> {
TransformCheckpointingInfo> {

public static org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo randomTransformCheckpointingInfo() {
return new org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo(
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
randomNonNegativeLong(),
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()));
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()),
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())
);
}

@Override
protected org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo
createServerTestInstance(XContentType xContentType) {
protected org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo createServerTestInstance(
XContentType xContentType
) {
return randomTransformCheckpointingInfo();
}

Expand All @@ -53,8 +56,10 @@ protected TransformCheckpointingInfo doParseToClientInstance(XContentParser pars
}

@Override
protected void assertInstances(org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance,
TransformCheckpointingInfo clientInstance) {
protected void assertInstances(
org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance,
TransformCheckpointingInfo clientInstance
) {
assertTransformCheckpointInfo(serverTestInstance, clientInstance);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static class TransformCheckpointingInfoBuilder {
private TransformCheckpoint nextCheckpoint;
private TransformCheckpoint sourceCheckpoint;
private Instant changesLastDetectedAt;
private Instant lastSearchTime;
private long operationsBehind;

public TransformCheckpointingInfoBuilder() {}
Expand Down Expand Up @@ -76,7 +77,8 @@ public TransformCheckpointingInfo build() {
nextCheckpoint.getTimeUpperBound()
),
operationsBehind,
changesLastDetectedAt
changesLastDetectedAt,
lastSearchTime
);
}

Expand Down Expand Up @@ -122,6 +124,11 @@ public TransformCheckpointingInfoBuilder setChangesLastDetectedAt(Instant change
return this;
}

public TransformCheckpointingInfoBuilder setLastSearchTime(Instant lastSearchTime) {
this.lastSearchTime = lastSearchTime;
return this;
}

public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehind) {
this.operationsBehind = operationsBehind;
return this;
Expand All @@ -133,29 +140,34 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi
TransformCheckpointStats.EMPTY,
TransformCheckpointStats.EMPTY,
0L,
null,
null
);

public static final ParseField LAST_CHECKPOINT = new ParseField("last");
public static final ParseField NEXT_CHECKPOINT = new ParseField("next");
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at");
public static final ParseField LAST_SEARCH_TIME = new ParseField("last_search_time");
private final TransformCheckpointStats last;
private final TransformCheckpointStats next;
private final long operationsBehind;
private final Instant changesLastDetectedAt;
private final Instant lastSearchTime;

private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"data_frame_transform_checkpointing_info",
true,
a -> {
long behind = a[2] == null ? 0L : (Long) a[2];
Instant changesLastDetectedAt = (Instant) a[3];
Instant lastSearchTime = (Instant) a[4];
return new TransformCheckpointingInfo(
a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0],
a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1],
behind,
changesLastDetectedAt
changesLastDetectedAt,
lastSearchTime
);
}
);
Expand All @@ -178,6 +190,12 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi
CHANGES_LAST_DETECTED_AT,
ObjectParser.ValueType.VALUE
);
LENIENT_PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, LAST_SEARCH_TIME.getPreferredName()),
LAST_SEARCH_TIME,
ObjectParser.ValueType.VALUE
);
}

/**
Expand All @@ -187,18 +205,21 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi
* @param last stats of the last checkpoint
* @param next stats of the next checkpoint
* @param operationsBehind counter of operations the current checkpoint is behind source
* @param changesLastDetectedAt the last time the source indices were checked for changes
* @param changesLastDetectedAt the last time the source indices changes have been found
* @param lastSearchTime the last time the source indices were searched
*/
public TransformCheckpointingInfo(
TransformCheckpointStats last,
TransformCheckpointStats next,
long operationsBehind,
Instant changesLastDetectedAt
Instant changesLastDetectedAt,
Instant lastSearchTime
) {
this.last = Objects.requireNonNull(last);
this.next = Objects.requireNonNull(next);
this.operationsBehind = operationsBehind;
this.changesLastDetectedAt = changesLastDetectedAt == null ? null : Instant.ofEpochMilli(changesLastDetectedAt.toEpochMilli());
this.changesLastDetectedAt = changesLastDetectedAt;
this.lastSearchTime = lastSearchTime;
}

public TransformCheckpointingInfo(StreamInput in) throws IOException {
Expand All @@ -210,6 +231,11 @@ public TransformCheckpointingInfo(StreamInput in) throws IOException {
} else {
changesLastDetectedAt = null;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_12_0
lastSearchTime = in.readOptionalInstant();
} else {
lastSearchTime = null;
}
}

public TransformCheckpointStats getLast() {
Expand All @@ -228,6 +254,10 @@ public Instant getChangesLastDetectedAt() {
return changesLastDetectedAt;
}

public Instant getLastSearchTime() {
return lastSearchTime;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -245,6 +275,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
changesLastDetectedAt.toEpochMilli()
);
}
if (lastSearchTime != null) {
builder.timeField(
LAST_SEARCH_TIME.getPreferredName(),
LAST_SEARCH_TIME.getPreferredName() + "_string",
lastSearchTime.toEpochMilli()
);
}
builder.endObject();
return builder;
}
Expand All @@ -257,6 +294,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeOptionalInstant(changesLastDetectedAt);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_12_0
out.writeOptionalInstant(lastSearchTime);
}
}

public static TransformCheckpointingInfo fromXContent(XContentParser p) {
Expand All @@ -265,7 +305,7 @@ public static TransformCheckpointingInfo fromXContent(XContentParser p) {

@Override
public int hashCode() {
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt);
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, lastSearchTime);
}

@Override
Expand All @@ -283,7 +323,8 @@ public boolean equals(Object other) {
return Objects.equals(this.last, that.last)
&& Objects.equals(this.next, that.next)
&& this.operationsBehind == that.operationsBehind
&& Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
&& Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt)
&& Objects.equals(this.lastSearchTime, that.lastSearchTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public static TransformCheckpointingInfo randomTransformCheckpointingInfo() {
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
randomNonNegativeLong(),
randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000)),
randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000))
);
}
Expand All @@ -46,7 +47,8 @@ public void testBackwardsSerialization() throws IOException {
TransformCheckpointStats.EMPTY,
TransformCheckpointStats.EMPTY,
randomNonNegativeLong(),
// changesLastDetectedAt is not serialized to past values, so when it is pulled back in, it will be null
// changesLastDetectedAt, lastSearchTime is not serialized to past values, so when it is pulled back in, it will be null
null,
null
);
try (BytesStreamOutput output = new BytesStreamOutput()) {
Expand Down
Loading