diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d5537ececc385..37c7ace855cf2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -710,9 +710,17 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o ensureWriteAllowed(origin); Engine.Index operation; try { - operation = prepareIndex(docMapper(sourceToParse.type()), indexSettings.getIndexVersionCreated(), sourceToParse, seqNo, - opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, - ifSeqNo, ifPrimaryTerm); + final String resolvedType = resolveType(sourceToParse.type()); + final SourceToParse sourceWithResolvedType; + if (resolvedType.equals(sourceToParse.type())) { + sourceWithResolvedType = sourceToParse; + } else { + sourceWithResolvedType = SourceToParse.source(sourceToParse.index(), resolvedType, sourceToParse.id(), + sourceToParse.source(), sourceToParse.getXContentType()); + sourceWithResolvedType.routing(sourceToParse.routing()); + } + operation = prepareIndex(docMapper(resolvedType), indexSettings.getIndexVersionCreated(), sourceWithResolvedType, + seqNo, opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, ifSeqNo, ifPrimaryTerm); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { return new Engine.IndexResult(update); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 6035a81a1b99f..0ca449b3ca7d6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -64,6 +64,7 @@ import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -862,4 +863,28 @@ public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Excepti client().search(countRequest).actionGet().getHits().getTotalHits().value, equalTo(numDocs + moreDocs)); } + public void testShardChangesWithDefaultDocType() throws Exception { + Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.translog.flush_threshold_size", "512mb") // do not flush + .put("index.soft_deletes.enabled", true).build(); + IndexService indexService = createIndex("index", settings, "user_doc", "title", "type=keyword"); + int numOps = between(1, 10); + for (int i = 0; i < numOps; i++) { + if (randomBoolean()) { + client().prepareIndex("index", randomFrom("_doc", "user_doc"), randomFrom("1", "2")) + .setSource("{}", XContentType.JSON).get(); + } else { + client().prepareDelete("index", randomFrom("_doc", "user_doc"), randomFrom("1", "2")).get(); + } + } + IndexShard shard = indexService.getShard(0); + try (Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true); + Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()) { + List opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true); + List opsFromTranslog = TestTranslog.drainSnapshot(translogSnapshot, true); + assertThat(opsFromLucene, equalTo(opsFromTranslog)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index 0e114233856c0..003054fc71550 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -34,7 +34,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Random; import java.util.Set; @@ -128,4 +130,16 @@ public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOExce public static long getCurrentTerm(Translog translog) { return translog.getCurrent().getPrimaryTerm(); } + + public static List drainSnapshot(Translog.Snapshot snapshot, boolean sortBySeqNo) throws IOException { + final List ops = new ArrayList<>(snapshot.totalOperations()); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + ops.add(op); + } + if (sortBySeqNo) { + ops.sort(Comparator.comparing(Translog.Operation::seqNo)); + } + return ops; + } }