Skip to content

Commit 40c7ae6

Browse files
authored
Rewrite SourceToParse with resolved docType (#36921)
We introduce a typeless API in #35790 where we translate the default docType "_doc" to the user-defined docType. However, we do not rewrite the SourceToParse with the resolved docType. This leads to a situation where we have two translog operations for the same document with different types: - prvOp [Index{id='9LCpwGcBkJN7eZxaB54L', type='_doc', seqNo=1, primaryTerm=1, version=1, autoGeneratedIdTimestamp=1545125562123}] - newOp [Index{id='9LCpwGcBkJN7eZxaB54L', type='not_doc', seqNo=1, primaryTerm=1, version=1, autoGeneratedIdTimestamp=-1}] Closes #36769
1 parent 9137d92 commit 40c7ae6

File tree

3 files changed

+50
-3
lines changed

3 files changed

+50
-3
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -710,9 +710,17 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o
710710
ensureWriteAllowed(origin);
711711
Engine.Index operation;
712712
try {
713-
operation = prepareIndex(docMapper(sourceToParse.type()), indexSettings.getIndexVersionCreated(), sourceToParse, seqNo,
714-
opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry,
715-
ifSeqNo, ifPrimaryTerm);
713+
final String resolvedType = resolveType(sourceToParse.type());
714+
final SourceToParse sourceWithResolvedType;
715+
if (resolvedType.equals(sourceToParse.type())) {
716+
sourceWithResolvedType = sourceToParse;
717+
} else {
718+
sourceWithResolvedType = SourceToParse.source(sourceToParse.index(), resolvedType, sourceToParse.id(),
719+
sourceToParse.source(), sourceToParse.getXContentType());
720+
sourceWithResolvedType.routing(sourceToParse.routing());
721+
}
722+
operation = prepareIndex(docMapper(resolvedType), indexSettings.getIndexVersionCreated(), sourceWithResolvedType,
723+
seqNo, opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, ifSeqNo, ifPrimaryTerm);
716724
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
717725
if (update != null) {
718726
return new Engine.IndexResult(update);

server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.elasticsearch.index.flush.FlushStats;
6565
import org.elasticsearch.index.mapper.SourceToParse;
6666
import org.elasticsearch.index.seqno.SequenceNumbers;
67+
import org.elasticsearch.index.translog.TestTranslog;
6768
import org.elasticsearch.index.translog.Translog;
6869
import org.elasticsearch.indices.IndicesService;
6970
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@@ -862,4 +863,28 @@ public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Excepti
862863
client().search(countRequest).actionGet().getHits().getTotalHits().value, equalTo(numDocs + moreDocs));
863864
}
864865

866+
public void testShardChangesWithDefaultDocType() throws Exception {
867+
Settings settings = Settings.builder()
868+
.put("index.number_of_shards", 1)
869+
.put("index.number_of_replicas", 0)
870+
.put("index.translog.flush_threshold_size", "512mb") // do not flush
871+
.put("index.soft_deletes.enabled", true).build();
872+
IndexService indexService = createIndex("index", settings, "user_doc", "title", "type=keyword");
873+
int numOps = between(1, 10);
874+
for (int i = 0; i < numOps; i++) {
875+
if (randomBoolean()) {
876+
client().prepareIndex("index", randomFrom("_doc", "user_doc"), randomFrom("1", "2"))
877+
.setSource("{}", XContentType.JSON).get();
878+
} else {
879+
client().prepareDelete("index", randomFrom("_doc", "user_doc"), randomFrom("1", "2")).get();
880+
}
881+
}
882+
IndexShard shard = indexService.getShard(0);
883+
try (Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true);
884+
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()) {
885+
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);
886+
List<Translog.Operation> opsFromTranslog = TestTranslog.drainSnapshot(translogSnapshot, true);
887+
assertThat(opsFromLucene, equalTo(opsFromTranslog));
888+
}
889+
}
865890
}

server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import java.nio.file.Files;
3535
import java.nio.file.Path;
3636
import java.nio.file.StandardOpenOption;
37+
import java.util.ArrayList;
3738
import java.util.Collection;
39+
import java.util.Comparator;
3840
import java.util.List;
3941
import java.util.Random;
4042
import java.util.Set;
@@ -128,4 +130,16 @@ public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOExce
128130
public static long getCurrentTerm(Translog translog) {
129131
return translog.getCurrent().getPrimaryTerm();
130132
}
133+
134+
public static List<Translog.Operation> drainSnapshot(Translog.Snapshot snapshot, boolean sortBySeqNo) throws IOException {
135+
final List<Translog.Operation> ops = new ArrayList<>(snapshot.totalOperations());
136+
Translog.Operation op;
137+
while ((op = snapshot.next()) != null) {
138+
ops.add(op);
139+
}
140+
if (sortBySeqNo) {
141+
ops.sort(Comparator.comparing(Translog.Operation::seqNo));
142+
}
143+
return ops;
144+
}
131145
}

0 commit comments

Comments
 (0)