From 0a98db64f9f3c9e5540cc6da89ea862856945206 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Nov 2017 19:04:22 -0500 Subject: [PATCH 01/13] Fix resync request serialization This commit addresses a subtle bug in the serialization routine for resync requests. The problem here is that Translog.Operation#readType is not compatible with the implementations of Translog.Operation#writeTo. Unfortunately, this issue prevents primary-replica from succeeding, issues which we will address in follow-ups. --- .../resync/ResyncReplicationRequest.java | 26 ++++++++- .../resync/ResyncReplicationRequestTests.java | 55 +++++++++++++++++++ 2 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index 6f6382d717437..547a586eebbab 100644 --- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -25,7 +25,9 @@ import org.elasticsearch.index.translog.Translog; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.Objects; public final class ResyncReplicationRequest extends ReplicatedWriteRequest { @@ -47,13 +49,33 @@ public List getOperations() { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - operations = in.readList(Translog.Operation::readType); + final int size = in.readVInt(); + operations = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + operations.add(Translog.Operation.readType(in)); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeList(operations); + out.writeVInt(operations.size()); + for (int i = 0; i < operations.size(); i++) { + Translog.Operation.writeType(operations.get(i), out); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ResyncReplicationRequest that = (ResyncReplicationRequest) o; + return Objects.equals(getOperations(), that.getOperations()); + } + + @Override + public int hashCode() { + return Objects.hash(getOperations()); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java b/core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java new file mode 100644 index 0000000000000..e92bcb01a8401 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.resync; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; + +public class ResyncReplicationRequestTests extends ESTestCase { + + public void testSerialization() throws IOException { + final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8")); + final Translog.Index index = new Translog.Index("type", "id", 0, Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, null, -1); + final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); + final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, Collections.singletonList(index)); + + final BytesStreamOutput out = new BytesStreamOutput(); + before.writeTo(out); + + final StreamInput in = out.bytes().streamInput(); + final ResyncReplicationRequest after = new ResyncReplicationRequest(); + after.readFrom(in); + + assertThat(after, equalTo(before)); + } + +} From 04fe32c549ea105bc54bca6a54df94bed9cf4a3c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Nov 2017 20:13:25 -0500 Subject: [PATCH 02/13] Remove writable --- .../index/translog/Translog.java | 33 ++++++++++++------- .../index/translog/TranslogTests.java | 11 ++++--- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 20c428960f747..fb74ac560d712 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.uid.Versions; @@ -847,7 +846,7 @@ public interface Snapshot extends Closeable { * A generic interface representing an operation performed on the transaction log. * Each is associated with a type. */ - public interface Operation extends Writeable { + public interface Operation { enum Type { @Deprecated CREATE((byte) 1), @@ -917,7 +916,20 @@ static Operation readType(StreamInput input) throws IOException { */ static void writeType(Translog.Operation operation, StreamOutput output) throws IOException { output.writeByte(operation.opType().id()); - operation.writeTo(output); + switch(operation.opType()) { + case CREATE: + case INDEX: + ((Index) operation).write(output); + break; + case DELETE: + ((Delete) operation).write(output); + break; + case NO_OP: + ((NoOp) operation).write(output); + break; + default: + throw new IOException("No type for [" + operation.opType() + "]"); + } } } @@ -954,7 +966,7 @@ public static class Index implements Operation { private final String routing; private final String parent; - public Index(StreamInput in) throws IOException { + private Index(StreamInput in) throws IOException { final int format = in.readVInt(); // SERIALIZATION_FORMAT assert format >= FORMAT_2_X : "format was: " + format; id = in.readString(); @@ -1067,8 +1079,7 @@ public Source getSource() { return new Source(source, routing, parent); } - @Override - public void writeTo(StreamOutput out) throws IOException { + private void write(StreamOutput out) throws IOException { out.writeVInt(SERIALIZATION_FORMAT); out.writeString(id); out.writeString(type); @@ -1156,7 +1167,7 @@ public static class Delete implements Operation { private final long version; private final VersionType versionType; - public Delete(StreamInput in) throws IOException { + private Delete(StreamInput in) throws IOException { final int format = in.readVInt();// SERIALIZATION_FORMAT assert format >= FORMAT_5_0 : "format was: " + format; if (format >= FORMAT_SINGLE_TYPE) { @@ -1251,8 +1262,7 @@ public Source getSource() { throw new IllegalStateException("trying to read doc source from delete operation"); } - @Override - public void writeTo(StreamOutput out) throws IOException { + private void write(StreamOutput out) throws IOException { out.writeVInt(SERIALIZATION_FORMAT); out.writeString(type); out.writeString(id); @@ -1322,7 +1332,7 @@ public String reason() { return reason; } - NoOp(final StreamInput in) throws IOException { + private NoOp(final StreamInput in) throws IOException { seqNo = in.readLong(); primaryTerm = in.readLong(); reason = in.readString(); @@ -1337,8 +1347,7 @@ public NoOp(final long seqNo, final long primaryTerm, final String reason) { this.reason = reason; } - @Override - public void writeTo(StreamOutput out) throws IOException { + private void write(StreamOutput out) throws IOException { out.writeLong(seqNo); out.writeLong(primaryTerm); out.writeString(reason); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 78ed6697b22b4..8ad57270069a9 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2343,9 +2343,9 @@ public void testTranslogOpSerialization() throws Exception { Translog.Index index = new Translog.Index(eIndex, eIndexResult); BytesStreamOutput out = new BytesStreamOutput(); - index.writeTo(out); + Translog.Operation.writeType(index, out); StreamInput in = out.bytes().streamInput(); - Translog.Index serializedIndex = new Translog.Index(in); + Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readType(in); assertEquals(index, serializedIndex); Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm, @@ -2354,13 +2354,14 @@ public void testTranslogOpSerialization() throws Exception { Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult); out = new BytesStreamOutput(); - delete.writeTo(out); + Translog.Operation.writeType(delete, out); in = out.bytes().streamInput(); - Translog.Delete serializedDelete = new Translog.Delete(in); + Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readType(in); assertEquals(delete, serializedDelete); // simulate legacy delete serialization out = new BytesStreamOutput(); + out.writeByte(Translog.Operation.Type.DELETE.id()); out.writeVInt(Translog.Delete.FORMAT_5_0); out.writeString(UidFieldMapper.NAME); out.writeString("my_type#my_id"); @@ -2369,7 +2370,7 @@ public void testTranslogOpSerialization() throws Exception { out.writeLong(2); // seq no out.writeLong(0); // primary term in = out.bytes().streamInput(); - serializedDelete = new Translog.Delete(in); + serializedDelete = (Translog.Delete) Translog.Operation.readType(in); assertEquals("my_type", serializedDelete.type()); assertEquals("my_id", serializedDelete.id()); } From 72964b2bd07da788ccd1062ecc519ffab9d0f302 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Nov 2017 20:17:53 -0500 Subject: [PATCH 03/13] Rename methods --- .../action/resync/ResyncReplicationRequest.java | 4 ++-- .../org/elasticsearch/index/translog/Translog.java | 10 +++++----- .../elasticsearch/index/translog/TranslogTests.java | 10 +++++----- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index 547a586eebbab..9883f58fd938a 100644 --- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -52,7 +52,7 @@ public void readFrom(StreamInput in) throws IOException { final int size = in.readVInt(); operations = new ArrayList<>(size); for (int i = 0; i < size; i++) { - operations.add(Translog.Operation.readType(in)); + operations.add(Translog.Operation.readOperation(in)); } } @@ -61,7 +61,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeVInt(operations.size()); for (int i = 0; i < operations.size(); i++) { - Translog.Operation.writeType(operations.get(i), out); + Translog.Operation.writeOperation(operations.get(i), out); } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index fb74ac560d712..970b463e15059 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -892,9 +892,9 @@ public static Type fromId(byte id) { /** * Reads the type and the operation from the given stream. The operation must be written with - * {@link Operation#writeType(Operation, StreamOutput)} + * {@link Operation#writeOperation(Operation, StreamOutput)} */ - static Operation readType(StreamInput input) throws IOException { + static Operation readOperation(StreamInput input) throws IOException { Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte()); switch (type) { case CREATE: @@ -914,7 +914,7 @@ static Operation readType(StreamInput input) throws IOException { /** * Writes the type and translog operation to the given stream */ - static void writeType(Translog.Operation operation, StreamOutput output) throws IOException { + static void writeOperation(Translog.Operation operation, StreamOutput output) throws IOException { output.writeByte(operation.opType().id()); switch(operation.opType()) { case CREATE: @@ -1449,7 +1449,7 @@ static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws I verifyChecksum(in); in.reset(); } - operation = Translog.Operation.readType(in); + operation = Translog.Operation.readOperation(in); verifyChecksum(in); } catch (TranslogCorruptedException e) { throw e; @@ -1492,7 +1492,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl // because closing it closes the underlying stream, which we don't // want to do here. out.resetDigest(); - Translog.Operation.writeType(op, out); + Translog.Operation.writeOperation(op, out); long checksum = out.getChecksum(); out.writeInt((int) checksum); } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 8ad57270069a9..477a7ae849748 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2343,9 +2343,9 @@ public void testTranslogOpSerialization() throws Exception { Translog.Index index = new Translog.Index(eIndex, eIndexResult); BytesStreamOutput out = new BytesStreamOutput(); - Translog.Operation.writeType(index, out); + Translog.Operation.writeOperation(index, out); StreamInput in = out.bytes().streamInput(); - Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readType(in); + Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readOperation(in); assertEquals(index, serializedIndex); Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm, @@ -2354,9 +2354,9 @@ public void testTranslogOpSerialization() throws Exception { Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult); out = new BytesStreamOutput(); - Translog.Operation.writeType(delete, out); + Translog.Operation.writeOperation(delete, out); in = out.bytes().streamInput(); - Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readType(in); + Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in); assertEquals(delete, serializedDelete); // simulate legacy delete serialization @@ -2370,7 +2370,7 @@ public void testTranslogOpSerialization() throws Exception { out.writeLong(2); // seq no out.writeLong(0); // primary term in = out.bytes().streamInput(); - serializedDelete = (Translog.Delete) Translog.Operation.readType(in); + serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in); assertEquals("my_type", serializedDelete.type()); assertEquals("my_id", serializedDelete.id()); } From e6b51e4dad6a9f8b48d085bb5227e9f77e113b8e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Nov 2017 20:19:29 -0500 Subject: [PATCH 04/13] Nit --- .../java/org/elasticsearch/index/translog/Translog.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 970b463e15059..e2632c951f1b0 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -875,7 +875,7 @@ public static Type fromId(byte id) { case 4: return NO_OP; default: - throw new IllegalArgumentException("No type mapped for [" + id + "]"); + throw new IllegalArgumentException("no type mapped for [" + id + "]"); } } } @@ -907,7 +907,7 @@ static Operation readOperation(StreamInput input) throws IOException { case NO_OP: return new NoOp(input); default: - throw new IOException("No type for [" + type + "]"); + throw new IOException("no type for [" + type + "]"); } } @@ -928,7 +928,7 @@ static void writeOperation(Translog.Operation operation, StreamOutput output) th ((NoOp) operation).write(output); break; default: - throw new IOException("No type for [" + operation.opType() + "]"); + throw new IOException("no type for [" + operation.opType() + "]"); } } From 9e5e810ba879097125ad24042c20f78624fb9238 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Nov 2017 20:20:32 -0500 Subject: [PATCH 05/13] Fix indentation --- .../main/java/org/elasticsearch/index/translog/Translog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index e2632c951f1b0..359990df19fc7 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -927,8 +927,8 @@ static void writeOperation(Translog.Operation operation, StreamOutput output) th case NO_OP: ((NoOp) operation).write(output); break; - default: - throw new IOException("no type for [" + operation.opType() + "]"); + default: + throw new IOException("no type for [" + operation.opType() + "]"); } } From 7333e04782572fc7fcfc40ad97f5c13953f80157 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Nov 2017 20:21:21 -0500 Subject: [PATCH 06/13] Assert! --- .../main/java/org/elasticsearch/index/translog/Translog.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 359990df19fc7..f73916ecf1f62 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.translog; +import com.vividsolutions.jts.util.Assert; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.index.Term; @@ -907,7 +908,7 @@ static Operation readOperation(StreamInput input) throws IOException { case NO_OP: return new NoOp(input); default: - throw new IOException("no type for [" + type + "]"); + throw new AssertionError("no case for [" + type + "]"); } } @@ -928,7 +929,7 @@ static void writeOperation(Translog.Operation operation, StreamOutput output) th ((NoOp) operation).write(output); break; default: - throw new IOException("no type for [" + operation.opType() + "]"); + throw new AssertionError("no case for [" + operation.opType() + "]"); } } From 4113e60040dc0492a2673b438bd52bfb4b3b59eb Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Nov 2017 20:22:21 -0500 Subject: [PATCH 07/13] Symmetry --- .../java/org/elasticsearch/index/translog/Translog.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index f73916ecf1f62..8bbdc49ccf7c5 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -899,12 +899,11 @@ static Operation readOperation(StreamInput input) throws IOException { Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte()); switch (type) { case CREATE: - // the deserialization logic in Index was identical to that of Create when create was deprecated + // the de-serialization logic in Index was identical to that of Create when create was deprecated + case INDEX: return new Index(input); case DELETE: return new Delete(input); - case INDEX: - return new Index(input); case NO_OP: return new NoOp(input); default: @@ -919,6 +918,7 @@ static void writeOperation(Translog.Operation operation, StreamOutput output) th output.writeByte(operation.opType().id()); switch(operation.opType()) { case CREATE: + // the serialization logic in Index was identical to that of Create when create was deprecated case INDEX: ((Index) operation).write(output); break; From abe6285dec7e5f800e280e1fc9a683c8e7298af5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Nov 2017 20:23:22 -0500 Subject: [PATCH 08/13] More symmetry --- .../elasticsearch/index/translog/Translog.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 8bbdc49ccf7c5..4942d8135cc6d 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -895,8 +895,8 @@ public static Type fromId(byte id) { * Reads the type and the operation from the given stream. The operation must be written with * {@link Operation#writeOperation(Operation, StreamOutput)} */ - static Operation readOperation(StreamInput input) throws IOException { - Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte()); + static Operation readOperation(final StreamInput input) throws IOException { + final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte()); switch (type) { case CREATE: // the de-serialization logic in Index was identical to that of Create when create was deprecated @@ -914,7 +914,7 @@ static Operation readOperation(StreamInput input) throws IOException { /** * Writes the type and translog operation to the given stream */ - static void writeOperation(Translog.Operation operation, StreamOutput output) throws IOException { + static void writeOperation(final Translog.Operation operation, final StreamOutput output) throws IOException { output.writeByte(operation.opType().id()); switch(operation.opType()) { case CREATE: @@ -967,7 +967,7 @@ public static class Index implements Operation { private final String routing; private final String parent; - private Index(StreamInput in) throws IOException { + private Index(final StreamInput in) throws IOException { final int format = in.readVInt(); // SERIALIZATION_FORMAT assert format >= FORMAT_2_X : "format was: " + format; id = in.readString(); @@ -1080,7 +1080,7 @@ public Source getSource() { return new Source(source, routing, parent); } - private void write(StreamOutput out) throws IOException { + private void write(final StreamOutput out) throws IOException { out.writeVInt(SERIALIZATION_FORMAT); out.writeString(id); out.writeString(type); @@ -1168,7 +1168,7 @@ public static class Delete implements Operation { private final long version; private final VersionType versionType; - private Delete(StreamInput in) throws IOException { + private Delete(final StreamInput in) throws IOException { final int format = in.readVInt();// SERIALIZATION_FORMAT assert format >= FORMAT_5_0 : "format was: " + format; if (format >= FORMAT_SINGLE_TYPE) { @@ -1263,7 +1263,7 @@ public Source getSource() { throw new IllegalStateException("trying to read doc source from delete operation"); } - private void write(StreamOutput out) throws IOException { + private void write(final StreamOutput out) throws IOException { out.writeVInt(SERIALIZATION_FORMAT); out.writeString(type); out.writeString(id); @@ -1348,7 +1348,7 @@ public NoOp(final long seqNo, final long primaryTerm, final String reason) { this.reason = reason; } - private void write(StreamOutput out) throws IOException { + private void write(final StreamOutput out) throws IOException { out.writeLong(seqNo); out.writeLong(primaryTerm); out.writeString(reason); From 29dd62a10b42775cd838c022133f521055d94c77 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Nov 2017 13:14:33 -0500 Subject: [PATCH 09/13] Iteration --- .../resync/ResyncReplicationRequest.java | 30 ++++++++----------- .../common/io/stream/StreamOutput.java | 7 +++++ .../index/shard/PrimaryReplicaSyncer.java | 4 ++- .../index/translog/Translog.java | 7 ++--- .../resync/ResyncReplicationRequestTests.java | 3 +- .../shard/PrimaryReplicaSyncerTests.java | 4 +-- .../index/translog/TranslogTests.java | 4 +-- 7 files changed, 31 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index 9883f58fd938a..614dc880265c3 100644 --- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action.resync; +import org.elasticsearch.Version; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -25,44 +26,38 @@ import org.elasticsearch.index.translog.Translog; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import java.util.Arrays; public final class ResyncReplicationRequest extends ReplicatedWriteRequest { - private List operations; + private Translog.Operation[] operations; ResyncReplicationRequest() { super(); } - public ResyncReplicationRequest(ShardId shardId, List operations) { + public ResyncReplicationRequest(ShardId shardId, Translog.Operation[] operations) { super(shardId); this.operations = operations; } - public List getOperations() { + public Translog.Operation[] getOperations() { return operations; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - final int size = in.readVInt(); - operations = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - operations.add(Translog.Operation.readOperation(in)); + if (in.getVersion().equals(Version.V_6_0_0)) { + throw new IllegalStateException("resync replication request serialization is broken in 6.0.0"); } + operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVInt(operations.size()); - for (int i = 0; i < operations.size(); i++) { - Translog.Operation.writeOperation(operations.get(i), out); - } + out.writeArray(Translog.Operation::writeOperation, operations); } @Override @@ -70,12 +65,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ResyncReplicationRequest that = (ResyncReplicationRequest) o; - return Objects.equals(getOperations(), that.getOperations()); + return Arrays.equals(operations, that.operations); } @Override public int hashCode() { - return Objects.hash(getOperations()); + return Arrays.hashCode(operations); } @Override @@ -84,7 +79,8 @@ public String toString() { "shardId=" + shardId + ", timeout=" + timeout + ", index='" + index + '\'' + - ", ops=" + operations.size() + + ", ops=" + operations.length + "}"; } + } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index eb2bbdc35788c..8cb0ee3a9b59f 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -706,6 +706,13 @@ public void writeDoubleArray(double[] values) throws IOException { } } + public void writeArray(Writer writer, T[] array) throws IOException { + writeVInt(array.length); + for (T value : array) { + writer.write(this, value); + } + } + public void writeArray(T[] array) throws IOException { writeVInt(array.length); for (T value: array) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 08d64cb82bc72..b1bd1c5b3138e 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -218,6 +218,8 @@ public void onFailure(Exception e) { } } + private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0]; + @Override protected void doRun() throws Exception { long size = 0; @@ -247,7 +249,7 @@ protected void doRun() throws Exception { if (!operations.isEmpty()) { task.setPhase("sending_ops"); - ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations); + ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY)); logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(), new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get()); syncAction.sync(request, task, primaryAllocationId, primaryTerm, this); diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 4942d8135cc6d..4373c8d05398b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.translog; -import com.vividsolutions.jts.util.Assert; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.index.Term; @@ -893,7 +892,7 @@ public static Type fromId(byte id) { /** * Reads the type and the operation from the given stream. The operation must be written with - * {@link Operation#writeOperation(Operation, StreamOutput)} + * {@link Operation#writeOperation(StreamOutput, Operation)} */ static Operation readOperation(final StreamInput input) throws IOException { final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte()); @@ -914,7 +913,7 @@ static Operation readOperation(final StreamInput input) throws IOException { /** * Writes the type and translog operation to the given stream */ - static void writeOperation(final Translog.Operation operation, final StreamOutput output) throws IOException { + static void writeOperation(final StreamOutput output, final Operation operation) throws IOException { output.writeByte(operation.opType().id()); switch(operation.opType()) { case CREATE: @@ -1493,7 +1492,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl // because closing it closes the underlying stream, which we don't // want to do here. out.resetDigest(); - Translog.Operation.writeOperation(op, out); + Translog.Operation.writeOperation(out, op); long checksum = out.getChecksum(); out.writeInt((int) checksum); } diff --git a/core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java b/core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java index e92bcb01a8401..f1f9fec34de59 100644 --- a/core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java @@ -30,7 +30,6 @@ import java.io.IOException; import java.nio.charset.Charset; -import java.util.Collections; import static org.hamcrest.Matchers.equalTo; @@ -40,7 +39,7 @@ public void testSerialization() throws IOException { final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8")); final Translog.Index index = new Translog.Index("type", "id", 0, Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, null, -1); final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); - final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, Collections.singletonList(index)); + final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index}); final BytesStreamOutput out = new BytesStreamOutput(); before.writeTo(out); diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 725d39279d27c..9c01cab57ddf5 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -48,7 +48,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { AtomicBoolean syncActionCalled = new AtomicBoolean(); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { - logger.info("Sending off {} operations", request.getOperations().size()); + logger.info("Sending off {} operations", request.getOperations().length); syncActionCalled.set(true); assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class)); listener.onResponse(new ResyncReplicationResponse()); @@ -98,7 +98,7 @@ public void testSyncerOnClosingShard() throws Exception { CountDownLatch syncCalledLatch = new CountDownLatch(1); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { - logger.info("Sending off {} operations", request.getOperations().size()); + logger.info("Sending off {} operations", request.getOperations().length); syncActionCalled.set(true); syncCalledLatch.countDown(); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 477a7ae849748..1a17e0dc6a06f 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2343,7 +2343,7 @@ public void testTranslogOpSerialization() throws Exception { Translog.Index index = new Translog.Index(eIndex, eIndexResult); BytesStreamOutput out = new BytesStreamOutput(); - Translog.Operation.writeOperation(index, out); + Translog.Operation.writeOperation(out, index); StreamInput in = out.bytes().streamInput(); Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readOperation(in); assertEquals(index, serializedIndex); @@ -2354,7 +2354,7 @@ public void testTranslogOpSerialization() throws Exception { Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult); out = new BytesStreamOutput(); - Translog.Operation.writeOperation(delete, out); + Translog.Operation.writeOperation(out, delete); in = out.bytes().streamInput(); Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in); assertEquals(delete, serializedDelete); From 42ee8af9742e98ee1cda4b3018cfa3e1135bbfb1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Nov 2017 18:38:01 -0500 Subject: [PATCH 10/13] Javadocs --- .../common/io/stream/StreamInput.java | 18 +++++++++++++++--- .../common/io/stream/StreamOutput.java | 13 ++++++++++++- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 31f53874f1949..274fdc5926998 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -688,9 +688,21 @@ public byte[] readByteArray() throws IOException { return bytes; } - public T[] readArray(Writeable.Reader reader, IntFunction arraySupplier) throws IOException { - int length = readArraySize(); - T[] values = arraySupplier.apply(length); + /** + * Reads an array from the stream using the specified {@link org.elasticsearch.common.io.stream.Writeable.Reader} to read array elements + * from the stream. This method can be seen as the reader version of {@link StreamOutput#writeArray(Writeable.Writer, Object[])}. It is + * assumed that the stream first contains a variable-length integer representing the size of the array, and then contains that many + * elements that can be read from the stream. + * + * @param reader the reader used to read individual elements + * @param arraySupplier a supplier used to construct a new array + * @param the type of the elements of the array + * @return an array read from the stream + * @throws IOException if an I/O exception occurs while reading the array + */ + public T[] readArray(final Writeable.Reader reader, final IntFunction arraySupplier) throws IOException { + final int length = readArraySize(); + final T[] values = arraySupplier.apply(length); for (int i = 0; i < length; i++) { values[i] = reader.read(this); } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 8cb0ee3a9b59f..0242d71bbdfa7 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -58,6 +58,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.function.IntFunction; /** * A stream from another node to this node. Technically, it can also be streamed from a byte array but that is mostly for testing. @@ -706,7 +707,17 @@ public void writeDoubleArray(double[] values) throws IOException { } } - public void writeArray(Writer writer, T[] array) throws IOException { + /** + * Writes the specified array to the stream using the specified {@link Writer} for each element in the array. This method can be seen as + * writer version of {@link StreamInput#readArray(Writeable.Reader, IntFunction)}. The length of array encoded as a variable-length + * integer is first written to the stream, and then the elements of the array are written to the stream. + * + * @param writer the writer used to write individual elements + * @param array the array + * @param the type of the elements of the array + * @throws IOException if an I/O exception occurs while writing the array + */ + public void writeArray(final Writer writer, final T[] array) throws IOException { writeVInt(array.length); for (T value : array) { writer.write(this, value); From 22a223b99f3115ad8ac5c920743f91e6f0725a55 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Nov 2017 18:38:30 -0500 Subject: [PATCH 11/13] Bail ASAP --- .../elasticsearch/action/resync/ResyncReplicationRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index 614dc880265c3..52052a945d01e 100644 --- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -47,10 +47,10 @@ public Translog.Operation[] getOperations() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); if (in.getVersion().equals(Version.V_6_0_0)) { throw new IllegalStateException("resync replication request serialization is broken in 6.0.0"); } + super.readFrom(in); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } From a9cfe3673ad46547669ed4426597638146b3469e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Nov 2017 18:40:43 -0500 Subject: [PATCH 12/13] Explain --- .../action/resync/ResyncReplicationRequest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index 52052a945d01e..94e1634d3a216 100644 --- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -48,6 +48,12 @@ public Translog.Operation[] getOperations() { @Override public void readFrom(StreamInput in) throws IOException { if (in.getVersion().equals(Version.V_6_0_0)) { + /* + * Resync replication request serialization was broken in 6.0.0 due to the elements of the stream not being prefixed with a + * byte indicating the type of the operation. + */ + // TODO: remove this check in 8.0.0 which provides no BWC guarantees with 6.x. + assert Version.CURRENT.major <= 7; throw new IllegalStateException("resync replication request serialization is broken in 6.0.0"); } super.readFrom(in); From 918aa79fde7aceba73468b158b93ca5d7d6632c6 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Nov 2017 18:41:55 -0500 Subject: [PATCH 13/13] More Javadocs --- .../action/resync/ResyncReplicationRequest.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index 94e1634d3a216..99a0f73605102 100644 --- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -28,6 +28,9 @@ import java.io.IOException; import java.util.Arrays; +/** + * Represents a batch of operations sent from the primary to its replicas during the primary-replica resync. + */ public final class ResyncReplicationRequest extends ReplicatedWriteRequest { private Translog.Operation[] operations; @@ -36,7 +39,7 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest