From b518bb451f867303dbb63a7bcddc64aae3d9ef14 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 19 Feb 2019 09:53:35 +0100 Subject: [PATCH] Change ShardFollowTask to reuse common serialization logic Initially in #38910, ShardFollowTask was reusing ImmutableFollowParameters' serialization logic. After merging, bwc tests failed sometimes and the binary serialization that ShardFollowTask was originally was using was added back. ImmutableFollowParameters is using optional fields (optional vint) while ShardFollowTask was not (vint). This PR is against 6.7 branch. Prior to merging this pr, bwc tests in 7.0, 7.x and master branch will need to be disabled. This change then will need to be forwardported to 7.0, 7.x and master branches (but without the version check and pre 6.7 serialization). --- .../xpack/ccr/action/ShardFollowTask.java | 66 +++++++++++-------- 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index a82670a52a0c4..3b4e4584a1060 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -96,21 +96,32 @@ public static ShardFollowTask readFrom(StreamInput in) throws IOException { String remoteCluster = in.readString(); ShardId followShardId = ShardId.readShardId(in); ShardId leaderShardId = ShardId.readShardId(in); - // TODO: use ImmutableFollowParameters(StreamInput) constructor - int maxReadRequestOperationCount = in.readVInt(); - ByteSizeValue maxReadRequestSize = new ByteSizeValue(in); - int maxOutstandingReadRequests = in.readVInt(); - int maxWriteRequestOperationCount = in.readVInt(); - ByteSizeValue maxWriteRequestSize = new ByteSizeValue(in); - int maxOutstandingWriteRequests = in.readVInt(); - int maxWriteBufferCount = in.readVInt(); - ByteSizeValue maxWriteBufferSize = new ByteSizeValue(in); - TimeValue maxRetryDelay = in.readTimeValue(); - TimeValue readPollTimeout = in.readTimeValue(); - Map headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); - return new ShardFollowTask(remoteCluster, followShardId, leaderShardId, maxReadRequestOperationCount, - maxWriteRequestOperationCount, maxOutstandingReadRequests, maxOutstandingWriteRequests, maxReadRequestSize, - maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, readPollTimeout, headers); + if (in.getVersion().onOrAfter(Version.V_6_7_0)) { + return new ShardFollowTask(remoteCluster, followShardId, leaderShardId, in); + } else { + int maxReadRequestOperationCount = in.readVInt(); + ByteSizeValue maxReadRequestSize = new ByteSizeValue(in); + int maxOutstandingReadRequests = in.readVInt(); + int maxWriteRequestOperationCount = in.readVInt(); + ByteSizeValue maxWriteRequestSize = new ByteSizeValue(in); + int maxOutstandingWriteRequests = in.readVInt(); + int maxWriteBufferCount = in.readVInt(); + ByteSizeValue maxWriteBufferSize = new ByteSizeValue(in); + TimeValue maxRetryDelay = in.readTimeValue(); + TimeValue readPollTimeout = in.readTimeValue(); + Map headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); + return new ShardFollowTask(remoteCluster, followShardId, leaderShardId, maxReadRequestOperationCount, + maxWriteRequestOperationCount, maxOutstandingReadRequests, maxOutstandingWriteRequests, maxReadRequestSize, + maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, readPollTimeout, headers); + } + } + + private ShardFollowTask(String remoteCluster, ShardId followShardId, ShardId leaderShardId, StreamInput in) throws IOException { + super(in); + this.remoteCluster = remoteCluster; + this.followShardId = followShardId; + this.leaderShardId = leaderShardId; + this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } public String getRemoteCluster() { @@ -139,17 +150,20 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(remoteCluster); followShardId.writeTo(out); leaderShardId.writeTo(out); - // TODO: use super.writeTo() - out.writeVLong(getMaxReadRequestOperationCount()); - getMaxReadRequestSize().writeTo(out); - out.writeVInt(getMaxOutstandingReadRequests()); - out.writeVLong(getMaxWriteRequestOperationCount()); - getMaxWriteRequestSize().writeTo(out); - out.writeVInt(getMaxOutstandingWriteRequests()); - out.writeVInt(getMaxWriteBufferCount()); - getMaxWriteBufferSize().writeTo(out); - out.writeTimeValue(getMaxRetryDelay()); - out.writeTimeValue(getReadPollTimeout()); + if (out.getVersion().onOrAfter(Version.V_6_7_0)) { + super.writeTo(out); + } else { + out.writeVLong(getMaxReadRequestOperationCount()); + getMaxReadRequestSize().writeTo(out); + out.writeVInt(getMaxOutstandingReadRequests()); + out.writeVLong(getMaxWriteRequestOperationCount()); + getMaxWriteRequestSize().writeTo(out); + out.writeVInt(getMaxOutstandingWriteRequests()); + out.writeVInt(getMaxWriteBufferCount()); + getMaxWriteBufferSize().writeTo(out); + out.writeTimeValue(getMaxRetryDelay()); + out.writeTimeValue(getReadPollTimeout()); + } out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); }