From 8dedbef040956c586ab1de6f35be37f790ac161c Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 15 Oct 2019 06:46:58 -0400 Subject: [PATCH] [ML][Transforms] fix bwc serialization with 7.3 (#48021) --- .../transforms/DataFrameTransformStats.java | 6 ++-- .../DataFrameTransformStatsTests.java | 30 +++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStats.java index 6b58374925bfe..b21aae998628e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStats.java @@ -123,7 +123,7 @@ public DataFrameTransformStats(StreamInput in) throws IOException { DataFrameTransformState transformState = new DataFrameTransformState(in); this.state = State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()); this.reason = transformState.getReason(); - this.node = null; + this.node = transformState.getNode(); this.indexerStats = new DataFrameIndexerTransformStats(in); this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in); } @@ -171,8 +171,8 @@ public void writeTo(StreamOutput out) throws IOException { checkpointingInfo.getNext().getPosition(), checkpointingInfo.getLast().getCheckpoint(), reason, - checkpointingInfo.getNext().getCheckpointProgress()).writeTo(out); - out.writeBoolean(false); + checkpointingInfo.getNext().getCheckpointProgress(), + node).writeTo(out); indexerStats.writeTo(out); checkpointingInfo.writeTo(out); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStatsTests.java index f438d6cfcf605..006a39b8cae53 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStatsTests.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.core.dataframe.transforms; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -13,6 +16,9 @@ import java.io.IOException; import java.util.function.Predicate; +import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStats.State.STARTED; +import static org.hamcrest.Matchers.equalTo; + public class DataFrameTransformStatsTests extends AbstractSerializingTestCase { public static DataFrameTransformStats randomDataFrameTransformStats() { @@ -53,4 +59,28 @@ protected String[] getShuffleFieldsExceptions() { protected Predicate getRandomFieldsExcludeFilter() { return field -> !field.isEmpty(); } + + public void testBwcWith73() throws IOException { + for(int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { + DataFrameTransformStats stats = new DataFrameTransformStats("bwc-id", + STARTED, + randomBoolean() ? null : randomAlphaOfLength(100), + randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(), + new DataFrameIndexerTransformStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + new DataFrameTransformCheckpointingInfo( + new DataFrameTransformCheckpointStats(0, null, null, 10, 100), + new DataFrameTransformCheckpointStats(0, null, null, 100, 1000), + // changesLastDetectedAt aren't serialized back + 100, null)); + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_7_3_0); + stats.writeTo(output); + try (StreamInput in = output.bytes().streamInput()) { + in.setVersion(Version.V_7_3_0); + DataFrameTransformStats statsFromOld = new DataFrameTransformStats(in); + assertThat(statsFromOld, equalTo(stats)); + } + } + } + } }