From 76c2165d97a7cc20b9d831eda77e10f9780667b8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Dec 2018 17:36:54 -0500 Subject: [PATCH 1/2] Fix translog bwc serialization --- .../elasticsearch/backwards/IndexingIT.java | 79 +++++++++++++++++++ .../index/translog/Translog.java | 3 + 2 files changed, 82 insertions(+) diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 9da2dafc40b81..8bc876d1a0f83 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.rest.action.document.RestGetAction; @@ -37,6 +38,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; @@ -275,6 +279,81 @@ public void testUpdateSnapshotStatus() throws Exception { request.setJsonEntity("{\"indices\": \"" + index + "\"}"); } + public void testResyncFromNewNodeToOldNode() throws Exception { + Nodes nodes = buildNodeAndVersions(); + logger.info("cluster discovered: {}", nodes.toString()); + assumeTrue("require at least one new node", nodes.getNewNodes().size() >= 1); + assumeTrue("require at least two old nodes", nodes.getNewNodes().size() >= 2); + List oldNodes = nodes.getBWCNodes(); + Node oldPrimaryOnOldNode = randomFrom(oldNodes); + oldNodes.remove(oldPrimaryOnOldNode); + Node replicaOnOldNode = randomFrom(oldNodes); + Node replicaOnNewNode = randomFrom(nodes.getNewNodes()); + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put("index.routing.allocation.include._name", oldPrimaryOnOldNode.getNodeName()); + String index = "resync_from_new_node_to_old_node"; + createIndex(index, settings.build()); + ensureGreen(index); + AtomicBoolean done = new AtomicBoolean(); + AtomicInteger totalDocs = new AtomicInteger(); + logger.info("--> allocations: old-primary={}, replica-on-old-node={} replica-on-new-node={}", + oldPrimaryOnOldNode.nodeName, replicaOnOldNode.nodeName, replicaOnNewNode.nodeName); + updateIndexSettings(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) + .put("index.routing.allocation.include._name", + String.join(",", oldPrimaryOnOldNode.getNodeName(), replicaOnNewNode.getNodeName(), replicaOnOldNode.getNodeName()))); + ensureGreen(index); + Thread[] indexers = new Thread[between(2, 4)]; // need a background indexing so primary-replica resync could happen. + for (int i = 0; i < indexers.length; i++) { + indexers[i] = new Thread(() -> { + while (done.get() == false) { + String id = Integer.toString(totalDocs.incrementAndGet()); + logger.info("--> indexing {}", id); + Request request = new Request("PUT", index + "/doc/" + id); + request.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(100) + "\"}"); + try { + client().performRequest(request); + } catch (Exception e) { + throw new AssertionError("failed to index", e); + } + } + }); + indexers[i].start(); + } + XContentBuilder cancelAllocationCommand = JsonXContent.contentBuilder() + .startObject() + .startArray("commands") + .startObject() + .startObject("cancel") + .field("index", index) + .field("shard", 0) + .field("node", oldPrimaryOnOldNode.getNodeName()) + .field("allow_primary", true) + .endObject() + .endObject() + .endArray() + .endObject(); + Request cancelAllocationRequest = new Request("POST", "/_cluster/reroute"); + cancelAllocationRequest.setJsonEntity(Strings.toString(cancelAllocationCommand)); + logger.info("--> cancel allocation on the primary {}", oldPrimaryOnOldNode.nodeName); + adminClient().performRequest(cancelAllocationRequest); + done.set(true); + for (Thread indexer : indexers) { + indexer.join(); + } + updateIndexSettings(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put("index.routing.allocation.include._name", + String.join(",", replicaOnNewNode.getNodeName(), replicaOnOldNode.getNodeName()))); + assertBusy(() -> ensureGreen(index), 2, TimeUnit.MINUTES); + assertBusy(() -> { + assertCount(index, "_only_nodes:" + replicaOnNewNode.getNodeName(), totalDocs.get()); + assertCount(index, "_only_nodes:" + replicaOnOldNode.getNodeName(), totalDocs.get()); + }); + } + private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { Request request = new Request("GET", index + "/_count"); request.addParameter("preference", preference); diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 83d81222bf58f..d8acba635f822 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1227,6 +1227,9 @@ private void write(final StreamOutput out) throws IOException { out.writeString(type); out.writeBytesReference(source); out.writeOptionalString(routing); + if (format < FORMAT_NO_PARENT) { + out.writeOptionalString(null); // _parent + } out.writeLong(version); if (format < FORMAT_NO_VERSION_TYPE) { out.writeByte(VersionType.EXTERNAL.getValue()); From 4b800e673c7059d6a50573076d2ac25e7447dd4f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 17 Dec 2018 15:19:04 -0500 Subject: [PATCH 2/2] replace mixed-cluster test with unit test --- .../elasticsearch/backwards/IndexingIT.java | 79 ------------------- .../index/translog/TranslogTests.java | 7 ++ 2 files changed, 7 insertions(+), 79 deletions(-) diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 8bc876d1a0f83..9da2dafc40b81 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.rest.action.document.RestGetAction; @@ -38,9 +37,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; @@ -279,81 +275,6 @@ public void testUpdateSnapshotStatus() throws Exception { request.setJsonEntity("{\"indices\": \"" + index + "\"}"); } - public void testResyncFromNewNodeToOldNode() throws Exception { - Nodes nodes = buildNodeAndVersions(); - logger.info("cluster discovered: {}", nodes.toString()); - assumeTrue("require at least one new node", nodes.getNewNodes().size() >= 1); - assumeTrue("require at least two old nodes", nodes.getNewNodes().size() >= 2); - List oldNodes = nodes.getBWCNodes(); - Node oldPrimaryOnOldNode = randomFrom(oldNodes); - oldNodes.remove(oldPrimaryOnOldNode); - Node replicaOnOldNode = randomFrom(oldNodes); - Node replicaOnNewNode = randomFrom(nodes.getNewNodes()); - Settings.Builder settings = Settings.builder() - .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) - .put("index.routing.allocation.include._name", oldPrimaryOnOldNode.getNodeName()); - String index = "resync_from_new_node_to_old_node"; - createIndex(index, settings.build()); - ensureGreen(index); - AtomicBoolean done = new AtomicBoolean(); - AtomicInteger totalDocs = new AtomicInteger(); - logger.info("--> allocations: old-primary={}, replica-on-old-node={} replica-on-new-node={}", - oldPrimaryOnOldNode.nodeName, replicaOnOldNode.nodeName, replicaOnNewNode.nodeName); - updateIndexSettings(index, Settings.builder() - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) - .put("index.routing.allocation.include._name", - String.join(",", oldPrimaryOnOldNode.getNodeName(), replicaOnNewNode.getNodeName(), replicaOnOldNode.getNodeName()))); - ensureGreen(index); - Thread[] indexers = new Thread[between(2, 4)]; // need a background indexing so primary-replica resync could happen. - for (int i = 0; i < indexers.length; i++) { - indexers[i] = new Thread(() -> { - while (done.get() == false) { - String id = Integer.toString(totalDocs.incrementAndGet()); - logger.info("--> indexing {}", id); - Request request = new Request("PUT", index + "/doc/" + id); - request.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(100) + "\"}"); - try { - client().performRequest(request); - } catch (Exception e) { - throw new AssertionError("failed to index", e); - } - } - }); - indexers[i].start(); - } - XContentBuilder cancelAllocationCommand = JsonXContent.contentBuilder() - .startObject() - .startArray("commands") - .startObject() - .startObject("cancel") - .field("index", index) - .field("shard", 0) - .field("node", oldPrimaryOnOldNode.getNodeName()) - .field("allow_primary", true) - .endObject() - .endObject() - .endArray() - .endObject(); - Request cancelAllocationRequest = new Request("POST", "/_cluster/reroute"); - cancelAllocationRequest.setJsonEntity(Strings.toString(cancelAllocationCommand)); - logger.info("--> cancel allocation on the primary {}", oldPrimaryOnOldNode.nodeName); - adminClient().performRequest(cancelAllocationRequest); - done.set(true); - for (Thread indexer : indexers) { - indexer.join(); - } - updateIndexSettings(index, Settings.builder() - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) - .put("index.routing.allocation.include._name", - String.join(",", replicaOnNewNode.getNodeName(), replicaOnOldNode.getNodeName()))); - assertBusy(() -> ensureGreen(index), 2, TimeUnit.MINUTES); - assertBusy(() -> { - assertCount(index, "_only_nodes:" + replicaOnNewNode.getNodeName(), totalDocs.get()); - assertCount(index, "_only_nodes:" + replicaOnOldNode.getNodeName(), totalDocs.get()); - }); - } - private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { Request request = new Request("GET", index + "/_count"); request.addParameter("preference", preference); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 7e73f9ef5175a..3eddeea2f2a8a 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -35,6 +35,7 @@ import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Assertions; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; @@ -72,6 +73,7 @@ import org.elasticsearch.index.translog.Translog.Location; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.VersionUtils; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -2814,9 +2816,12 @@ public void testTranslogOpSerialization() throws Exception { Engine.IndexResult eIndexResult = new Engine.IndexResult(1, randomPrimaryTerm, randomSeqNum, true); Translog.Index index = new Translog.Index(eIndex, eIndexResult); + Version wireVersion = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT); BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(wireVersion); Translog.Operation.writeOperation(out, index); StreamInput in = out.bytes().streamInput(); + in.setVersion(wireVersion); Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readOperation(in); assertEquals(index, serializedIndex); @@ -2826,8 +2831,10 @@ public void testTranslogOpSerialization() throws Exception { Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult); out = new BytesStreamOutput(); + out.setVersion(wireVersion); Translog.Operation.writeOperation(out, delete); in = out.bytes().streamInput(); + in.setVersion(wireVersion); Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in); assertEquals(delete, serializedDelete); }