diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index a39d763238205..783188fbb6546 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -258,10 +258,21 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]"; } else { final BytesReference source = fields.source(); + if (source == null) { + // TODO: Callers should ask for the range that source should be retained. Thus we should always + // check for the existence source once we make peer-recovery to send ops after the local checkpoint. + if (requiredFullRange) { + throw new IllegalStateException("source not found for seqno=" + seqNo + + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo); + } else { + skippedOperations++; + return null; + } + } // TODO: pass the latest timestamp from engine. final long autoGeneratedIdTimestamp = -1; op = new Translog.Index(type, id, seqNo, primaryTerm, version, - source == null ? null : source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp); + source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp); } } assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() : "Unexpected operation; " + diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 36a4b7cf0b170..4d53a74e8e1a1 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1061,7 +1061,7 @@ public Index(String type, String id, long seqNo, long primaryTerm, long version, byte[] source, String routing, long autoGeneratedIdTimestamp) { this.type = type; this.id = id; - this.source = source == null ? null : new BytesArray(source); + this.source = new BytesArray(source); this.seqNo = seqNo; this.primaryTerm = primaryTerm; this.version = version; @@ -1111,7 +1111,7 @@ public long version() { @Override public Source getSource() { - return source == null ? null : new Source(source, routing); + return new Source(source, routing); } private void write(final StreamOutput out) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f7742b64c1942..1520b84f0dec0 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1418,26 +1418,36 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc final MapperService mapperService = createMapperService("test"); final boolean omitSourceAllTheTime = randomBoolean(); final Set liveDocs = new HashSet<>(); + final Set liveDocsWithSource = new HashSet<>(); try (Store store = createStore(); InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) { int numDocs = scaledRandomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, randomBoolean() - || omitSourceAllTheTime); + boolean useRecoverySource = randomBoolean() || omitSourceAllTheTime; + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, useRecoverySource); engine.index(indexForDoc(doc)); liveDocs.add(doc.id()); + if (useRecoverySource == false) { + liveDocsWithSource.add(Integer.toString(i)); + } } for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, randomBoolean() - || omitSourceAllTheTime); + boolean useRecoverySource = randomBoolean() || omitSourceAllTheTime; + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, useRecoverySource); if (randomBoolean()) { engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); liveDocs.remove(doc.id()); + liveDocsWithSource.remove(doc.id()); } if (randomBoolean()) { engine.index(indexForDoc(doc)); liveDocs.add(doc.id()); + if (useRecoverySource == false) { + liveDocsWithSource.add(doc.id()); + } else { + liveDocsWithSource.remove(doc.id()); + } } if (randomBoolean()) { engine.flush(randomBoolean(), true); @@ -1453,12 +1463,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1); } engine.forceMerge(true, 1, false, false, false); - assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService, (luceneOp, translogOp) -> { - if (luceneOp.seqNo() >= minSeqNoToRetain) { - assertNotNull(luceneOp.getSource()); - assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source)); - } - }); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); Map ops = readAllOperationsInLucene(engine, mapperService) .stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); for (long seqno = 0; seqno <= engine.getLocalCheckpoint(); seqno++) { @@ -1483,10 +1488,8 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc globalCheckpoint.set(engine.getLocalCheckpoint()); engine.syncTranslog(); engine.forceMerge(true, 1, false, false, false); - assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService, (luceneOp, translogOp) -> { - assertEquals(translogOp.getSource().source, B_1); - }); - assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size())); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); + assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocsWithSource.size())); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 9a8480fcb635d..7e72dcf2aedf6 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -104,7 +104,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.LongSupplier; @@ -822,14 +821,6 @@ public static List readAllOperationsInLucene(Engine engine, * Asserts the provided engine has a consistent document history between translog and Lucene index. */ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException { - assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapper, (luceneOp, translogOp) -> - assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source))); - } - - public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper, - BiConsumer assertSource) - throws IOException { - if (mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false) { return; } @@ -867,7 +858,7 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm())); assertThat(luceneOp.opType(), equalTo(translogOp.opType())); if (luceneOp.opType() == Translog.Operation.Type.INDEX) { - assertSource.accept(luceneOp, translogOp); + assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source)); } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 23954bb0c802a..46f5423ba4cf1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -283,10 +283,6 @@ static Translog.Operation[] getOperations(IndexShard indexShard, long globalChec try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", fromSeqNo, toSeqNo, true)) { Translog.Operation op; while ((op = snapshot.next()) != null) { - if (op.getSource() == null) { - throw new IllegalStateException("source not found for operation: " + op + " fromSeqNo: " + fromSeqNo + - " maxOperationCount: " + maxOperationCount); - } operations.add(op); seenBytes += op.estimateSize(); if (seenBytes > maxOperationSizeInBytes) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 8b7f01f38853d..1c973d11d207a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -7,6 +7,9 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; @@ -30,6 +33,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -50,12 +54,22 @@ public void testSimpleCcrReplication() throws Exception { shardFollowTask.start(leaderGroup.getPrimary().getGlobalCheckpoint(), followerGroup.getPrimary().getGlobalCheckpoint()); docCount += leaderGroup.appendDocs(randomInt(128)); leaderGroup.syncGlobalCheckpoint(); - leaderGroup.assertAllEqual(docCount); - int expectedCount = docCount; + Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size()); + }); + // Deletes should be replicated to the follower + List deleteDocIds = randomSubsetOf(indexedDocIds); + for (String deleteId : deleteDocIds) { + BulkItemResponse resp = leaderGroup.delete(new DeleteRequest(index.getName(), "type", deleteId)); + assertThat(resp.getResponse().getResult(), equalTo(DocWriteResponse.Result.DELETED)); + } + leaderGroup.syncGlobalCheckpoint(); assertBusy(() -> { assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); - followerGroup.assertAllEqual(expectedCount); + followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size()); }); shardFollowTask.markAsCompleted(); }