From eed37173bbae6c085600ec53e2edb780c8a0bf42 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Dec 2018 19:28:48 -0500 Subject: [PATCH 1/7] Use type of mapper instead of source when parse doc We introduce a typeless API in #35790 where we translate the default docType "_doc" to the user-defined docType. However, we still use docType from the source rather the translated type (i.e, type of the Mapper) when parsing a document. This leads to a situation where we have two translog operations for the same document with different types: - prvOp [Index{id='9LCpwGcBkJN7eZxaB54L', type='_doc', seqNo=1, primaryTerm=1, version=1, autoGeneratedIdTimestamp=1545125562123}] - newOp [Index{id='9LCpwGcBkJN7eZxaB54L', type='not_doc', seqNo=1, primaryTerm=1, version=1, autoGeneratedIdTimestamp=-1}] Closes #36769 --- .../index/mapper/DocumentParser.java | 6 +++++- .../index/shard/IndexShardIT.java | 20 +++++++++++++++++++ .../index/translog/TestTranslog.java | 14 +++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index 54e59691f80d5..ed02062959e71 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -147,11 +147,15 @@ private static boolean isEmptyDoc(Mapping mapping, XContentParser parser) throws private static ParsedDocument parsedDocument(SourceToParse source, ParseContext.InternalParseContext context, Mapping update) { + // use the docType of the mapper rather than of the source as we may have translated _doc to the user-defined type. + final String docType = context.docMapper().type(); + assert source.type().equals(context.docMapper().type()) || source.type().equals(MapperService.SINGLE_MAPPING_NAME) : + "unexpected docType; mapper type [" + docType + " source type [" + source.type() + "]"; return new ParsedDocument( context.version(), context.seqID(), context.sourceToParse().id(), - context.sourceToParse().type(), + docType, source.routing(), context.docs(), context.sourceToParse().source(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 6035a81a1b99f..2d18c2518b56e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -64,6 +64,7 @@ import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -862,4 +863,23 @@ public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Excepti client().search(countRequest).actionGet().getHits().getTotalHits().value, equalTo(numDocs + moreDocs)); } + public void testShardChangesWithDefaultDocType() throws Exception { + Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.translog.flush_threshold_size", "512mb") // do not trim translog + .put("index.soft_deletes.enabled", true).build(); + IndexService indexService = createIndex("index", settings, "user_doc", "title", "type=keyword"); + client().prepareIndex("index", "_doc", "1").setSource("{}", XContentType.JSON).get(); + client().prepareDelete("index", "_doc", "1").get(); + client().prepareIndex("index", "user_doc", "2").setSource("{}", XContentType.JSON).get(); + client().prepareDelete("index", "user_doc", "2").get(); + IndexShard shard = indexService.getShard(0); + try (Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, 3, true); + Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()) { + List opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true); + List opsFromTranslog = TestTranslog.drainSnapshot(translogSnapshot, true); + assertThat(opsFromLucene, equalTo(opsFromTranslog)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index 0e114233856c0..003054fc71550 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -34,7 +34,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Random; import java.util.Set; @@ -128,4 +130,16 @@ public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOExce public static long getCurrentTerm(Translog translog) { return translog.getCurrent().getPrimaryTerm(); } + + public static List drainSnapshot(Translog.Snapshot snapshot, boolean sortBySeqNo) throws IOException { + final List ops = new ArrayList<>(snapshot.totalOperations()); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + ops.add(op); + } + if (sortBySeqNo) { + ops.sort(Comparator.comparing(Translog.Operation::seqNo)); + } + return ops; + } } From 62f764c51e77e57d714aed2235c763f2d7734234 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Dec 2018 19:30:53 -0500 Subject: [PATCH 2/7] assertion --- .../java/org/elasticsearch/index/mapper/DocumentParser.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index ed02062959e71..05eeba860100c 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -150,7 +150,7 @@ private static ParsedDocument parsedDocument(SourceToParse source, ParseContext. // use the docType of the mapper rather than of the source as we may have translated _doc to the user-defined type. final String docType = context.docMapper().type(); assert source.type().equals(context.docMapper().type()) || source.type().equals(MapperService.SINGLE_MAPPING_NAME) : - "unexpected docType; mapper type [" + docType + " source type [" + source.type() + "]"; + "unexpected docType; mapper type [" + docType + "] source type [" + source.type() + "]"; return new ParsedDocument( context.version(), context.seqID(), From ffed38e154c2482cdce0396358c94996b08b13ae Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 21 Dec 2018 00:27:18 -0500 Subject: [PATCH 3/7] rewrite source --- .../elasticsearch/index/mapper/DocumentParser.java | 8 +++----- .../org/elasticsearch/index/shard/IndexShard.java | 13 ++++++++++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index 05eeba860100c..e3a7fd75e5f53 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -147,15 +147,13 @@ private static boolean isEmptyDoc(Mapping mapping, XContentParser parser) throws private static ParsedDocument parsedDocument(SourceToParse source, ParseContext.InternalParseContext context, Mapping update) { - // use the docType of the mapper rather than of the source as we may have translated _doc to the user-defined type. - final String docType = context.docMapper().type(); - assert source.type().equals(context.docMapper().type()) || source.type().equals(MapperService.SINGLE_MAPPING_NAME) : - "unexpected docType; mapper type [" + docType + "] source type [" + source.type() + "]"; + assert source.type().equals(context.docMapper().type()): + "mapper type [" + context.docMapper().type() + "] != source type [" + source.type() + "]"; return new ParsedDocument( context.version(), context.seqID(), context.sourceToParse().id(), - docType, + source.type(), source.routing(), context.docs(), context.sourceToParse().source(), diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d5537ececc385..c8681f2c0eca7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -710,9 +710,16 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o ensureWriteAllowed(origin); Engine.Index operation; try { - operation = prepareIndex(docMapper(sourceToParse.type()), indexSettings.getIndexVersionCreated(), sourceToParse, seqNo, - opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, - ifSeqNo, ifPrimaryTerm); + final String resolvedType = resolveType(sourceToParse.type()); + final SourceToParse sourceWithResolvedType; + if (resolvedType.equals(sourceToParse.type())) { + sourceWithResolvedType = sourceToParse; + } else { + sourceWithResolvedType = SourceToParse.source(sourceToParse.index(), resolvedType, sourceToParse.id(), + sourceToParse.source(), sourceToParse.getXContentType()); + } + operation = prepareIndex(docMapper(resolvedType), indexSettings.getIndexVersionCreated(), sourceWithResolvedType, + seqNo, opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, ifSeqNo, ifPrimaryTerm); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { return new Engine.IndexResult(update); From 3d604200987c3376959d5916df87236379f73423 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 21 Dec 2018 00:37:48 -0500 Subject: [PATCH 4/7] random test --- .../index/mapper/DocumentParser.java | 2 +- .../elasticsearch/index/shard/IndexShardIT.java | 17 +++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index e3a7fd75e5f53..4b0ceca5d766d 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -147,7 +147,7 @@ private static boolean isEmptyDoc(Mapping mapping, XContentParser parser) throws private static ParsedDocument parsedDocument(SourceToParse source, ParseContext.InternalParseContext context, Mapping update) { - assert source.type().equals(context.docMapper().type()): + assert source.type().equals(context.docMapper().type()) : "mapper type [" + context.docMapper().type() + "] != source type [" + source.type() + "]"; return new ParsedDocument( context.version(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 2d18c2518b56e..0ca449b3ca7d6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -867,15 +867,20 @@ public void testShardChangesWithDefaultDocType() throws Exception { Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) - .put("index.translog.flush_threshold_size", "512mb") // do not trim translog + .put("index.translog.flush_threshold_size", "512mb") // do not flush .put("index.soft_deletes.enabled", true).build(); IndexService indexService = createIndex("index", settings, "user_doc", "title", "type=keyword"); - client().prepareIndex("index", "_doc", "1").setSource("{}", XContentType.JSON).get(); - client().prepareDelete("index", "_doc", "1").get(); - client().prepareIndex("index", "user_doc", "2").setSource("{}", XContentType.JSON).get(); - client().prepareDelete("index", "user_doc", "2").get(); + int numOps = between(1, 10); + for (int i = 0; i < numOps; i++) { + if (randomBoolean()) { + client().prepareIndex("index", randomFrom("_doc", "user_doc"), randomFrom("1", "2")) + .setSource("{}", XContentType.JSON).get(); + } else { + client().prepareDelete("index", randomFrom("_doc", "user_doc"), randomFrom("1", "2")).get(); + } + } IndexShard shard = indexService.getShard(0); - try (Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, 3, true); + try (Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true); Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()) { List opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true); List opsFromTranslog = TestTranslog.drainSnapshot(translogSnapshot, true); From 83f5dc984a90f811fcbe412fca297765545b4964 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 21 Dec 2018 00:39:38 -0500 Subject: [PATCH 5/7] restore -> context.sourceToParse().type() --- .../java/org/elasticsearch/index/mapper/DocumentParser.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index 4b0ceca5d766d..e183385302f43 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -153,7 +153,7 @@ private static ParsedDocument parsedDocument(SourceToParse source, ParseContext. context.version(), context.seqID(), context.sourceToParse().id(), - source.type(), + context.sourceToParse().type(), source.routing(), context.docs(), context.sourceToParse().source(), From c3460847a4c96875e13b110886b125711e955de4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 21 Dec 2018 01:41:25 -0500 Subject: [PATCH 6/7] remove assertion --- .../java/org/elasticsearch/index/mapper/DocumentParser.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index e183385302f43..54e59691f80d5 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -147,8 +147,6 @@ private static boolean isEmptyDoc(Mapping mapping, XContentParser parser) throws private static ParsedDocument parsedDocument(SourceToParse source, ParseContext.InternalParseContext context, Mapping update) { - assert source.type().equals(context.docMapper().type()) : - "mapper type [" + context.docMapper().type() + "] != source type [" + source.type() + "]"; return new ParsedDocument( context.version(), context.seqID(), From dbedec6d42e0fc75599fd6fce15b4b5e1bf39cc9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 23 Dec 2018 06:02:44 -0500 Subject: [PATCH 7/7] copy routing --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c8681f2c0eca7..37c7ace855cf2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -717,6 +717,7 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o } else { sourceWithResolvedType = SourceToParse.source(sourceToParse.index(), resolvedType, sourceToParse.id(), sourceToParse.source(), sourceToParse.getXContentType()); + sourceWithResolvedType.routing(sourceToParse.routing()); } operation = prepareIndex(docMapper(resolvedType), indexSettings.getIndexVersionCreated(), sourceWithResolvedType, seqNo, opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, ifSeqNo, ifPrimaryTerm);