diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index a82670a52a0c4..3b4e4584a1060 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -96,21 +96,32 @@ public static ShardFollowTask readFrom(StreamInput in) throws IOException { String remoteCluster = in.readString(); ShardId followShardId = ShardId.readShardId(in); ShardId leaderShardId = ShardId.readShardId(in); - // TODO: use ImmutableFollowParameters(StreamInput) constructor - int maxReadRequestOperationCount = in.readVInt(); - ByteSizeValue maxReadRequestSize = new ByteSizeValue(in); - int maxOutstandingReadRequests = in.readVInt(); - int maxWriteRequestOperationCount = in.readVInt(); - ByteSizeValue maxWriteRequestSize = new ByteSizeValue(in); - int maxOutstandingWriteRequests = in.readVInt(); - int maxWriteBufferCount = in.readVInt(); - ByteSizeValue maxWriteBufferSize = new ByteSizeValue(in); - TimeValue maxRetryDelay = in.readTimeValue(); - TimeValue readPollTimeout = in.readTimeValue(); - Map headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); - return new ShardFollowTask(remoteCluster, followShardId, leaderShardId, maxReadRequestOperationCount, - maxWriteRequestOperationCount, maxOutstandingReadRequests, maxOutstandingWriteRequests, maxReadRequestSize, - maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, readPollTimeout, headers); + if (in.getVersion().onOrAfter(Version.V_6_7_0)) { + return new ShardFollowTask(remoteCluster, followShardId, leaderShardId, in); + } else { + int maxReadRequestOperationCount = in.readVInt(); + ByteSizeValue maxReadRequestSize = new ByteSizeValue(in); + int maxOutstandingReadRequests = in.readVInt(); + int maxWriteRequestOperationCount = in.readVInt(); + ByteSizeValue maxWriteRequestSize = new ByteSizeValue(in); + int maxOutstandingWriteRequests = in.readVInt(); + int maxWriteBufferCount = in.readVInt(); + ByteSizeValue maxWriteBufferSize = new ByteSizeValue(in); + TimeValue maxRetryDelay = in.readTimeValue(); + TimeValue readPollTimeout = in.readTimeValue(); + Map headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); + return new ShardFollowTask(remoteCluster, followShardId, leaderShardId, maxReadRequestOperationCount, + maxWriteRequestOperationCount, maxOutstandingReadRequests, maxOutstandingWriteRequests, maxReadRequestSize, + maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, readPollTimeout, headers); + } + } + + private ShardFollowTask(String remoteCluster, ShardId followShardId, ShardId leaderShardId, StreamInput in) throws IOException { + super(in); + this.remoteCluster = remoteCluster; + this.followShardId = followShardId; + this.leaderShardId = leaderShardId; + this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } public String getRemoteCluster() { @@ -139,17 +150,20 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(remoteCluster); followShardId.writeTo(out); leaderShardId.writeTo(out); - // TODO: use super.writeTo() - out.writeVLong(getMaxReadRequestOperationCount()); - getMaxReadRequestSize().writeTo(out); - out.writeVInt(getMaxOutstandingReadRequests()); - out.writeVLong(getMaxWriteRequestOperationCount()); - getMaxWriteRequestSize().writeTo(out); - out.writeVInt(getMaxOutstandingWriteRequests()); - out.writeVInt(getMaxWriteBufferCount()); - getMaxWriteBufferSize().writeTo(out); - out.writeTimeValue(getMaxRetryDelay()); - out.writeTimeValue(getReadPollTimeout()); + if (out.getVersion().onOrAfter(Version.V_6_7_0)) { + super.writeTo(out); + } else { + out.writeVLong(getMaxReadRequestOperationCount()); + getMaxReadRequestSize().writeTo(out); + out.writeVInt(getMaxOutstandingReadRequests()); + out.writeVLong(getMaxWriteRequestOperationCount()); + getMaxWriteRequestSize().writeTo(out); + out.writeVInt(getMaxOutstandingWriteRequests()); + out.writeVInt(getMaxWriteBufferCount()); + getMaxWriteBufferSize().writeTo(out); + out.writeTimeValue(getMaxRetryDelay()); + out.writeTimeValue(getReadPollTimeout()); + } out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); }