From 2859ee8221991e9fddac6615eccc65fc8662716b Mon Sep 17 00:00:00 2001 From: liketic Date: Sun, 13 Jan 2019 11:39:30 +0800 Subject: [PATCH 1/7] Migrate Streamble to Writeable for recovery package --- .../indices/recovery/RecoveryRequest.java | 10 +- .../recovery/TransportRecoveryAction.java | 4 +- .../indices/recovery/RecoveryState.java | 136 +++++++++++------- .../indices/recovery/RecoveryTargetTests.java | 38 +++-- 4 files changed, 108 insertions(+), 80 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java index 8878713765ba0..e28c8229217bb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java @@ -41,6 +41,12 @@ public RecoveryRequest() { this(Strings.EMPTY_ARRAY); } + public RecoveryRequest(StreamInput in) throws IOException { + super(in); + detailed = in.readBoolean(); + activeOnly = in.readBoolean(); + } + /** * Constructs a request for recovery information for all shards for the given indices * @@ -97,8 +103,6 @@ public void writeTo(StreamOutput out) throws IOException { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - detailed = in.readBoolean(); - activeOnly = in.readBoolean(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java index f7356bd242d06..5f9670f9a2c14 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -93,9 +93,7 @@ protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards, @Override protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException { - final RecoveryRequest recoveryRequest = new RecoveryRequest(); - recoveryRequest.readFrom(in); - return recoveryRequest; + return new RecoveryRequest(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 9013cfa202d83..87bd89e416f02 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentFragment; @@ -46,7 +47,7 @@ /** * Keeps track of state related to shard recovery. */ -public class RecoveryState implements ToXContentFragment, Streamable { +public class RecoveryState implements ToXContentFragment, Streamable, Writeable { public enum Stage { INIT((byte) 0), @@ -102,19 +103,29 @@ public static Stage fromId(byte id) { private Stage stage; - private final Index index = new Index(); - private final Translog translog = new Translog(); - private final VerifyIndex verifyIndex = new VerifyIndex(); - private final Timer timer = new Timer(); + private final Index index; + private final Translog translog; + private final VerifyIndex verifyIndex; + private final Timer timer; private RecoverySource recoverySource; private ShardId shardId; @Nullable private DiscoveryNode sourceNode; private DiscoveryNode targetNode; - private boolean primary = false; + private boolean primary; - private RecoveryState() { + public RecoveryState(StreamInput in) throws IOException { + timer = new Timer(in); + stage = Stage.fromId(in.readByte()); + shardId = ShardId.readShardId(in); + recoverySource = RecoverySource.readFrom(in); + targetNode = new DiscoveryNode(in); + sourceNode = in.readOptionalWriteable(DiscoveryNode::new); + index = new Index(in); + translog = new Translog(in); + verifyIndex = new VerifyIndex(in); + primary = in.readBoolean(); } public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) { @@ -128,6 +139,10 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla this.sourceNode = sourceNode; this.targetNode = targetNode; stage = Stage.INIT; + index = new Index(); + translog = new Translog(); + verifyIndex = new VerifyIndex(); + timer = new Timer(); timer.start(); } @@ -223,23 +238,12 @@ public boolean getPrimary() { } public static RecoveryState readRecoveryState(StreamInput in) throws IOException { - RecoveryState recoveryState = new RecoveryState(); - recoveryState.readFrom(in); - return recoveryState; + return new RecoveryState(in); } @Override public synchronized void readFrom(StreamInput in) throws IOException { - timer.readFrom(in); - stage = Stage.fromId(in.readByte()); - shardId = ShardId.readShardId(in); - recoverySource = RecoverySource.readFrom(in); - targetNode = new DiscoveryNode(in); - sourceNode = in.readOptionalWriteable(DiscoveryNode::new); - index.readFrom(in); - translog.readFrom(in); - verifyIndex.readFrom(in); - primary = in.readBoolean(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -347,12 +351,22 @@ static final class Fields { static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis"; } - public static class Timer implements Streamable { + public static class Timer implements Streamable, Writeable { protected long startTime = 0; protected long startNanoTime = 0; protected long time = -1; protected long stopTime = 0; + public Timer() { + } + + public Timer(StreamInput in) throws IOException { + startTime = in.readVLong(); + startNanoTime = in.readVLong(); + stopTime = in.readVLong(); + time = in.readVLong(); + } + public synchronized void start() { assert startTime == 0 : "already started"; startTime = System.currentTimeMillis(); @@ -394,13 +408,9 @@ public synchronized void reset() { stopTime = 0; } - @Override public synchronized void readFrom(StreamInput in) throws IOException { - startTime = in.readVLong(); - startNanoTime = in.readVLong(); - stopTime = in.readVLong(); - time = in.readVLong(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -414,9 +424,16 @@ public synchronized void writeTo(StreamOutput out) throws IOException { } - public static class VerifyIndex extends Timer implements ToXContentFragment, Streamable { + public static class VerifyIndex extends Timer implements ToXContentFragment, Streamable, Writeable { private volatile long checkIndexTime; + public VerifyIndex() { + } + + public VerifyIndex(StreamInput in) throws IOException { + super(in); + checkIndexTime = in.readVLong(); + } public void reset() { super.reset(); @@ -433,8 +450,7 @@ public void checkIndexTime(long checkIndexTime) { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - checkIndexTime = in.readVLong(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -451,13 +467,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } - public static class Translog extends Timer implements ToXContentFragment, Streamable { + public static class Translog extends Timer implements ToXContentFragment, Streamable, Writeable { public static final int UNKNOWN = -1; private int recovered; private int total = UNKNOWN; private int totalOnStart = UNKNOWN; + public Translog() { + } + + public Translog(StreamInput in) throws IOException { + super(in); + recovered = in.readVInt(); + total = in.readVInt(); + totalOnStart = in.readVInt(); + } + public synchronized void reset() { super.reset(); recovered = 0; @@ -535,10 +561,7 @@ public synchronized float recoveredPercent() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - recovered = in.readVInt(); - total = in.readVInt(); - totalOnStart = in.readVInt(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -560,7 +583,7 @@ public synchronized XContentBuilder toXContent(XContentBuilder builder, Params p } } - public static class File implements ToXContentObject, Streamable { + public static class File implements ToXContentObject, Streamable, Writeable { private String name; private long length; private long recovered; @@ -576,6 +599,13 @@ public File(String name, long length, boolean reused) { this.reused = reused; } + public File(StreamInput in) throws IOException { + name = in.readString(); + length = in.readVLong(); + recovered = in.readVLong(); + reused = in.readBoolean(); + } + void addRecoveredBytes(long bytes) { assert reused == false : "file is marked as reused, can't update recovered bytes"; assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]"; @@ -614,18 +644,9 @@ boolean fullyRecovered() { return reused == false && length == recovered; } - public static File readFile(StreamInput in) throws IOException { - File file = new File(); - file.readFrom(in); - return file; - } - @Override public void readFrom(StreamInput in) throws IOException { - name = in.readString(); - length = in.readVLong(); - recovered = in.readVLong(); - reused = in.readBoolean(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -671,7 +692,7 @@ public String toString() { } } - public static class Index extends Timer implements ToXContentFragment, Streamable { + public static class Index extends Timer implements ToXContentFragment, Streamable, Writeable { private Map fileDetails = new HashMap<>(); @@ -681,6 +702,20 @@ public static class Index extends Timer implements ToXContentFragment, Streamabl private long sourceThrottlingInNanos = UNKNOWN; private long targetThrottleTimeInNanos = UNKNOWN; + public Index() { + } + + public Index(StreamInput in) throws IOException { + super(in); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + File file = new File(in); + fileDetails.put(file.name, file); + } + sourceThrottlingInNanos = in.readLong(); + targetThrottleTimeInNanos = in.readLong(); + } + public synchronized List fileDetails() { return Collections.unmodifiableList(new ArrayList<>(fileDetails.values())); } @@ -885,14 +920,7 @@ public synchronized void updateVersion(long version) { @Override public synchronized void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - File file = File.readFile(in); - fileDetails.put(file.name, file); - } - sourceThrottlingInNanos = in.readLong(); - targetThrottleTimeInNanos = in.readLong(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java index 7a65541cb5eaf..c906f1e416a9f 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java @@ -93,12 +93,10 @@ public T serializeDeserialize() throws IOException { } protected T deserialize(StreamInput in) throws IOException { - T obj = createObj(); - obj.readFrom(in); - return obj; + return createObj(in); } - abstract T createObj(); + abstract T createObj(StreamInput in) throws IOException; @Override public void run() { @@ -121,32 +119,32 @@ public void testTimers() throws Throwable { timer = new Timer(); streamer = new Streamer(stop, timer) { @Override - Timer createObj() { - return new Timer(); + Timer createObj(StreamInput in) throws IOException { + return new Timer(in); } }; } else if (randomBoolean()) { timer = new Index(); streamer = new Streamer(stop, timer) { @Override - Timer createObj() { - return new Index(); + Timer createObj(StreamInput in) throws IOException { + return new Index(in); } }; } else if (randomBoolean()) { timer = new VerifyIndex(); streamer = new Streamer(stop, timer) { @Override - Timer createObj() { - return new VerifyIndex(); + Timer createObj(StreamInput in) throws IOException { + return new VerifyIndex(in); } }; } else { timer = new Translog(); streamer = new Streamer(stop, timer) { @Override - Timer createObj() { - return new Translog(); + Timer createObj(StreamInput in) throws IOException { + return new Translog(in); } }; } @@ -256,8 +254,8 @@ public void testIndex() throws Throwable { Streamer backgroundReader = new Streamer(streamShouldStop, index) { @Override - Index createObj() { - return new Index(); + Index createObj(StreamInput in) throws IOException { + return new Index(in); } }; @@ -381,8 +379,8 @@ public void testTranslog() throws Throwable { AtomicBoolean stop = new AtomicBoolean(); Streamer streamer = new Streamer(stop, translog) { @Override - Translog createObj() { - return new Translog(); + Translog createObj(StreamInput in) throws IOException { + return new Translog(in); } }; @@ -458,8 +456,8 @@ public void testStart() throws IOException { AtomicBoolean stop = new AtomicBoolean(); Streamer streamer = new Streamer(stop, verifyIndex) { @Override - VerifyIndex createObj() { - return new VerifyIndex(); + VerifyIndex createObj(StreamInput in) throws IOException { + return new VerifyIndex(in); } }; @@ -508,8 +506,8 @@ public void testConcurrentModificationIndexFileDetailsMap() throws InterruptedEx final AtomicBoolean stop = new AtomicBoolean(false); Streamer readWriteIndex = new Streamer(stop, index) { @Override - Index createObj() { - return new Index(); + Index createObj(StreamInput in) throws IOException { + return new Index(in); } }; Thread modifyThread = new Thread() { From ee20bda0e33d55da457d27479006c9681dadc903 Mon Sep 17 00:00:00 2001 From: liketic Date: Sun, 13 Jan 2019 16:38:42 +0800 Subject: [PATCH 2/7] Fix test failure --- .../action/support/broadcast/BroadcastRequest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java index a04d2edc8dc63..164bc42830d98 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java @@ -36,6 +36,12 @@ public class BroadcastRequest> extends public BroadcastRequest() { } + public BroadcastRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + } + protected BroadcastRequest(String[] indices) { this.indices = indices; } From 009eec9444f185b23b07420c95b29b9822969c07 Mon Sep 17 00:00:00 2001 From: liketic Date: Sat, 19 Jan 2019 16:45:28 +0800 Subject: [PATCH 3/7] Revert RecoveryRequest --- .../action/admin/indices/recovery/RecoveryRequest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java index e28c8229217bb..8c8ad4da6a138 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java @@ -103,6 +103,8 @@ public void writeTo(StreamOutput out) throws IOException { @Override public void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + super.readFrom(in); + detailed = in.readBoolean(); + activeOnly = in.readBoolean(); } } From 0bd175770a16c1bdd23b67186d0daadfd7accaef Mon Sep 17 00:00:00 2001 From: liketic Date: Fri, 25 Jan 2019 09:00:48 +0800 Subject: [PATCH 4/7] Address comments --- .../indices/recovery/RecoveryRequest.java | 6 ---- .../recovery/TransportRecoveryAction.java | 4 ++- .../support/broadcast/BroadcastRequest.java | 6 ---- .../indices/recovery/RecoveryState.java | 35 +++---------------- .../indices/recovery/RecoveryTargetTests.java | 4 +-- 5 files changed, 10 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java index 8c8ad4da6a138..8878713765ba0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryRequest.java @@ -41,12 +41,6 @@ public RecoveryRequest() { this(Strings.EMPTY_ARRAY); } - public RecoveryRequest(StreamInput in) throws IOException { - super(in); - detailed = in.readBoolean(); - activeOnly = in.readBoolean(); - } - /** * Constructs a request for recovery information for all shards for the given indices * diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java index 5f9670f9a2c14..f7356bd242d06 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -93,7 +93,9 @@ protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards, @Override protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException { - return new RecoveryRequest(in); + final RecoveryRequest recoveryRequest = new RecoveryRequest(); + recoveryRequest.readFrom(in); + return recoveryRequest; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java index 164bc42830d98..a04d2edc8dc63 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java @@ -36,12 +36,6 @@ public class BroadcastRequest> extends public BroadcastRequest() { } - public BroadcastRequest(StreamInput in) throws IOException { - super(in); - indices = in.readStringArray(); - indicesOptions = IndicesOptions.readIndicesOptions(in); - } - protected BroadcastRequest(String[] indices) { this.indices = indices; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 87bd89e416f02..f4370830bcd0b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -351,7 +351,7 @@ static final class Fields { static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis"; } - public static class Timer implements Streamable, Writeable { + public static class Timer implements Writeable { protected long startTime = 0; protected long startNanoTime = 0; protected long time = -1; @@ -408,11 +408,6 @@ public synchronized void reset() { stopTime = 0; } - @Override - public synchronized void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - @Override public synchronized void writeTo(StreamOutput out) throws IOException { out.writeVLong(startTime); @@ -424,7 +419,7 @@ public synchronized void writeTo(StreamOutput out) throws IOException { } - public static class VerifyIndex extends Timer implements ToXContentFragment, Streamable, Writeable { + public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable { private volatile long checkIndexTime; public VerifyIndex() { @@ -448,11 +443,6 @@ public void checkIndexTime(long checkIndexTime) { this.checkIndexTime = checkIndexTime; } - @Override - public void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -467,7 +457,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } - public static class Translog extends Timer implements ToXContentFragment, Streamable, Writeable { + public static class Translog extends Timer implements ToXContentFragment, Writeable { public static final int UNKNOWN = -1; private int recovered; @@ -559,11 +549,6 @@ public synchronized float recoveredPercent() { return recovered * 100.0f / total; } - @Override - public void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -583,7 +568,7 @@ public synchronized XContentBuilder toXContent(XContentBuilder builder, Params p } } - public static class File implements ToXContentObject, Streamable, Writeable { + public static class File implements ToXContentObject, Writeable { private String name; private long length; private long recovered; @@ -644,11 +629,6 @@ boolean fullyRecovered() { return reused == false && length == recovered; } - @Override - public void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); @@ -692,7 +672,7 @@ public String toString() { } } - public static class Index extends Timer implements ToXContentFragment, Streamable, Writeable { + public static class Index extends Timer implements ToXContentFragment, Writeable { private Map fileDetails = new HashMap<>(); @@ -918,11 +898,6 @@ public synchronized void updateVersion(long version) { this.version = version; } - @Override - public synchronized void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - @Override public synchronized void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java index c906f1e416a9f..1c2b5331fef30 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.RecoveryState.File; import org.elasticsearch.indices.recovery.RecoveryState.Index; @@ -57,7 +57,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; public class RecoveryTargetTests extends ESTestCase { - abstract class Streamer extends Thread { + abstract class Streamer extends Thread { private T lastRead; private final AtomicBoolean shouldStop; private final T source; From b01f640bd97fb97101b6b6139e9da6b14273b907 Mon Sep 17 00:00:00 2001 From: liketic Date: Fri, 25 Jan 2019 20:30:41 +0800 Subject: [PATCH 5/7] Move writeTo to next constructor --- .../indices/recovery/RecoveryState.java | 96 +++++++++---------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index f4370830bcd0b..32a4ebaf656e6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -146,6 +146,20 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla timer.start(); } + @Override + public void writeTo(StreamOutput out) throws IOException { + timer.writeTo(out); + out.writeByte(stage.id()); + shardId.writeTo(out); + recoverySource.writeTo(out); + targetNode.writeTo(out); + out.writeOptionalWriteable(sourceNode); + index.writeTo(out); + translog.writeTo(out); + verifyIndex.writeTo(out); + out.writeBoolean(primary); + } + public ShardId getShardId() { return shardId; } @@ -246,20 +260,6 @@ public synchronized void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - @Override - public void writeTo(StreamOutput out) throws IOException { - timer.writeTo(out); - out.writeByte(stage.id()); - shardId.writeTo(out); - recoverySource.writeTo(out); - targetNode.writeTo(out); - out.writeOptionalWriteable(sourceNode); - index.writeTo(out); - translog.writeTo(out); - verifyIndex.writeTo(out); - out.writeBoolean(primary); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -430,6 +430,12 @@ public VerifyIndex(StreamInput in) throws IOException { checkIndexTime = in.readVLong(); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(checkIndexTime); + } + public void reset() { super.reset(); checkIndexTime = 0; @@ -443,12 +449,6 @@ public void checkIndexTime(long checkIndexTime) { this.checkIndexTime = checkIndexTime; } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVLong(checkIndexTime); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.humanReadableField(Fields.CHECK_INDEX_TIME_IN_MILLIS, Fields.CHECK_INDEX_TIME, new TimeValue(checkIndexTime)); @@ -474,6 +474,14 @@ public Translog(StreamInput in) throws IOException { totalOnStart = in.readVInt(); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(recovered); + out.writeVInt(total); + out.writeVInt(totalOnStart); + } + public synchronized void reset() { super.reset(); recovered = 0; @@ -549,14 +557,6 @@ public synchronized float recoveredPercent() { return recovered * 100.0f / total; } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(recovered); - out.writeVInt(total); - out.writeVInt(totalOnStart); - } - @Override public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(Fields.RECOVERED, recovered); @@ -591,6 +591,14 @@ public File(StreamInput in) throws IOException { reused = in.readBoolean(); } + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeVLong(length); + out.writeVLong(recovered); + out.writeBoolean(reused); + } + void addRecoveredBytes(long bytes) { assert reused == false : "file is marked as reused, can't update recovered bytes"; assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]"; @@ -629,14 +637,6 @@ boolean fullyRecovered() { return reused == false && length == recovered; } - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(name); - out.writeVLong(length); - out.writeVLong(recovered); - out.writeBoolean(reused); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -696,6 +696,18 @@ public Index(StreamInput in) throws IOException { targetThrottleTimeInNanos = in.readLong(); } + @Override + public synchronized void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + final File[] files = fileDetails.values().toArray(new File[0]); + out.writeVInt(files.length); + for (File file : files) { + file.writeTo(out); + } + out.writeLong(sourceThrottlingInNanos); + out.writeLong(targetThrottleTimeInNanos); + } + public synchronized List fileDetails() { return Collections.unmodifiableList(new ArrayList<>(fileDetails.values())); } @@ -898,18 +910,6 @@ public synchronized void updateVersion(long version) { this.version = version; } - @Override - public synchronized void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - final File[] files = fileDetails.values().toArray(new File[0]); - out.writeVInt(files.length); - for (File file : files) { - file.writeTo(out); - } - out.writeLong(sourceThrottlingInNanos); - out.writeLong(targetThrottleTimeInNanos); - } - @Override public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { // stream size first, as it matters more and the files section can be long From 3584702ec2644861a8c4c3202b7084fd6f651045 Mon Sep 17 00:00:00 2001 From: liketic Date: Fri, 25 Jan 2019 20:32:35 +0800 Subject: [PATCH 6/7] Move writeTo to next constructor --- .../indices/recovery/RecoveryState.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 32a4ebaf656e6..5be9890a85edd 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -367,6 +367,15 @@ public Timer(StreamInput in) throws IOException { time = in.readVLong(); } + @Override + public synchronized void writeTo(StreamOutput out) throws IOException { + out.writeVLong(startTime); + out.writeVLong(startNanoTime); + out.writeVLong(stopTime); + // write a snapshot of current time, which is not per se the time field + out.writeVLong(time()); + } + public synchronized void start() { assert startTime == 0 : "already started"; startTime = System.currentTimeMillis(); @@ -408,15 +417,6 @@ public synchronized void reset() { stopTime = 0; } - @Override - public synchronized void writeTo(StreamOutput out) throws IOException { - out.writeVLong(startTime); - out.writeVLong(startNanoTime); - out.writeVLong(stopTime); - // write a snapshot of current time, which is not per se the time field - out.writeVLong(time()); - } - } public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable { From f08269fac991c4a44821e73c3f729778dad1ff59 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 25 Jan 2019 10:52:10 -0500 Subject: [PATCH 7/7] Move RecoveryState writeTo next ctor(in) --- .../indices/recovery/RecoveryState.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 5be9890a85edd..1fed238f8ddf6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -115,19 +115,6 @@ public static Stage fromId(byte id) { private DiscoveryNode targetNode; private boolean primary; - public RecoveryState(StreamInput in) throws IOException { - timer = new Timer(in); - stage = Stage.fromId(in.readByte()); - shardId = ShardId.readShardId(in); - recoverySource = RecoverySource.readFrom(in); - targetNode = new DiscoveryNode(in); - sourceNode = in.readOptionalWriteable(DiscoveryNode::new); - index = new Index(in); - translog = new Translog(in); - verifyIndex = new VerifyIndex(in); - primary = in.readBoolean(); - } - public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) { assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting; RecoverySource recoverySource = shardRouting.recoverySource(); @@ -146,6 +133,19 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla timer.start(); } + public RecoveryState(StreamInput in) throws IOException { + timer = new Timer(in); + stage = Stage.fromId(in.readByte()); + shardId = ShardId.readShardId(in); + recoverySource = RecoverySource.readFrom(in); + targetNode = new DiscoveryNode(in); + sourceNode = in.readOptionalWriteable(DiscoveryNode::new); + index = new Index(in); + translog = new Translog(in); + verifyIndex = new VerifyIndex(in); + primary = in.readBoolean(); + } + @Override public void writeTo(StreamOutput out) throws IOException { timer.writeTo(out);