Skip to content

Commit 81cbbcb

Browse files
committed
Validate source of an index in LuceneChangesSnapshot (#32288)
Today it's possible to encounter an Index operation in Lucene whose _source is disabled, and _recovery_source was pruned by the MergePolicy. If it's the case, we create a Translog#Index without source and let the caller validate it later. However, this approach is challenging for the caller. Deletes and No-Ops don't allow invoking "source()" method. The caller has to make sure to call "source()" only on index operations. The current implementation in CCR does not follow this and fail to replica deletes or no-ops. Moreover, it's easier to reason if a Translog#Index always has the source.
1 parent 3016654 commit 81cbbcb

File tree

6 files changed

+49
-34
lines changed

6 files changed

+49
-34
lines changed

server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,17 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
259259
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]";
260260
} else {
261261
final BytesReference source = fields.source();
262+
if (source == null) {
263+
// TODO: Callers should ask for the range that source should be retained. Thus we should always
264+
// check for the existence source once we make peer-recovery to send ops after the local checkpoint.
265+
if (requiredFullRange) {
266+
throw new IllegalStateException("source not found for seqno=" + seqNo +
267+
" from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo);
268+
} else {
269+
skippedOperations++;
270+
return null;
271+
}
272+
}
262273
// TODO: pass the latest timestamp from engine.
263274
final long autoGeneratedIdTimestamp = -1;
264275
op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.EXTERNAL,

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,7 @@ public Index(String type, String id, long seqNo, long primaryTerm, long version,
10791079
String parent, long autoGeneratedIdTimestamp) {
10801080
this.type = type;
10811081
this.id = id;
1082-
this.source = source == null ? null : new BytesArray(source);
1082+
this.source = new BytesArray(source);
10831083
this.seqNo = seqNo;
10841084
this.primaryTerm = primaryTerm;
10851085
this.version = version;
@@ -1139,7 +1139,7 @@ public VersionType versionType() {
11391139

11401140
@Override
11411141
public Source getSource() {
1142-
return source == null ? null : new Source(source, routing, parent);
1142+
return new Source(source, routing, parent);
11431143
}
11441144

11451145
private void write(final StreamOutput out) throws IOException {

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,26 +1418,36 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc
14181418
final MapperService mapperService = createMapperService("test");
14191419
final boolean omitSourceAllTheTime = randomBoolean();
14201420
final Set<String> liveDocs = new HashSet<>();
1421+
final Set<String> liveDocsWithSource = new HashSet<>();
14211422
try (Store store = createStore();
14221423
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null,
14231424
globalCheckpoint::get))) {
14241425
int numDocs = scaledRandomIntBetween(10, 100);
14251426
for (int i = 0; i < numDocs; i++) {
1426-
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, randomBoolean()
1427-
|| omitSourceAllTheTime);
1427+
boolean useRecoverySource = randomBoolean() || omitSourceAllTheTime;
1428+
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, useRecoverySource);
14281429
engine.index(indexForDoc(doc));
14291430
liveDocs.add(doc.id());
1431+
if (useRecoverySource == false) {
1432+
liveDocsWithSource.add(Integer.toString(i));
1433+
}
14301434
}
14311435
for (int i = 0; i < numDocs; i++) {
1432-
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, randomBoolean()
1433-
|| omitSourceAllTheTime);
1436+
boolean useRecoverySource = randomBoolean() || omitSourceAllTheTime;
1437+
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, useRecoverySource);
14341438
if (randomBoolean()) {
14351439
engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
14361440
liveDocs.remove(doc.id());
1441+
liveDocsWithSource.remove(doc.id());
14371442
}
14381443
if (randomBoolean()) {
14391444
engine.index(indexForDoc(doc));
14401445
liveDocs.add(doc.id());
1446+
if (useRecoverySource == false) {
1447+
liveDocsWithSource.add(doc.id());
1448+
} else {
1449+
liveDocsWithSource.remove(doc.id());
1450+
}
14411451
}
14421452
if (randomBoolean()) {
14431453
engine.flush(randomBoolean(), true);
@@ -1453,12 +1463,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc
14531463
minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1);
14541464
}
14551465
engine.forceMerge(true, 1, false, false, false);
1456-
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService, (luceneOp, translogOp) -> {
1457-
if (luceneOp.seqNo() >= minSeqNoToRetain) {
1458-
assertNotNull(luceneOp.getSource());
1459-
assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source));
1460-
}
1461-
});
1466+
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
14621467
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService)
14631468
.stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
14641469
for (long seqno = 0; seqno <= engine.getLocalCheckpoint(); seqno++) {
@@ -1483,10 +1488,8 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc
14831488
globalCheckpoint.set(engine.getLocalCheckpoint());
14841489
engine.syncTranslog();
14851490
engine.forceMerge(true, 1, false, false, false);
1486-
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService, (luceneOp, translogOp) -> {
1487-
assertEquals(translogOp.getSource().source, B_1);
1488-
});
1489-
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size()));
1491+
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
1492+
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocsWithSource.size()));
14901493
}
14911494
}
14921495

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@
104104
import java.util.concurrent.CountDownLatch;
105105
import java.util.concurrent.atomic.AtomicInteger;
106106
import java.util.concurrent.atomic.AtomicLong;
107-
import java.util.function.BiConsumer;
108107
import java.util.function.BiFunction;
109108
import java.util.function.Function;
110109
import java.util.function.LongSupplier;
@@ -835,15 +834,7 @@ public static List<Translog.Operation> readAllOperationsInLucene(Engine engine,
835834
* Asserts the provided engine has a consistent document history between translog and Lucene index.
836835
*/
837836
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException {
838-
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapper, (luceneOp, translogOp) ->
839-
assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source)));
840-
}
841-
842-
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper,
843-
BiConsumer<Translog.Operation, Translog.Operation> assertSource)
844-
throws IOException {
845-
846-
if (mapper.types().isEmpty() || engine.config().getIndexSettings().isSoftDeleteEnabled() == false) {
837+
if (mapper.types().isEmpty() || engine.config().getIndexSettings().isSoftDeleteEnabled() == false) {
847838
return;
848839
}
849840
final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo();
@@ -880,7 +871,7 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
880871
assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm()));
881872
assertThat(luceneOp.opType(), equalTo(translogOp.opType()));
882873
if (luceneOp.opType() == Translog.Operation.Type.INDEX) {
883-
assertSource.accept(luceneOp, translogOp);
874+
assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source));
884875
}
885876
}
886877
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,6 @@ static Translog.Operation[] getOperations(IndexShard indexShard, long globalChec
297297
try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", fromSeqNo, toSeqNo, true)) {
298298
Translog.Operation op;
299299
while ((op = snapshot.next()) != null) {
300-
if (op.getSource() == null) {
301-
throw new IllegalStateException("source not found for operation: " + op + " fromSeqNo: " + fromSeqNo +
302-
" maxOperationCount: " + maxOperationCount);
303-
}
304300
operations.add(op);
305301
seenBytes += op.estimateSize();
306302
if (seenBytes > maxOperationSizeInBytes) {

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
import org.elasticsearch.Version;
99
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.DocWriteResponse;
11+
import org.elasticsearch.action.bulk.BulkItemResponse;
12+
import org.elasticsearch.action.delete.DeleteRequest;
1013
import org.elasticsearch.action.support.replication.TransportWriteAction;
1114
import org.elasticsearch.cluster.metadata.IndexMetaData;
1215
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -30,6 +33,7 @@
3033
import java.util.ArrayList;
3134
import java.util.Collections;
3235
import java.util.List;
36+
import java.util.Set;
3337
import java.util.concurrent.atomic.AtomicBoolean;
3438
import java.util.function.BiConsumer;
3539
import java.util.function.Consumer;
@@ -50,12 +54,22 @@ public void testSimpleCcrReplication() throws Exception {
5054
shardFollowTask.start(leaderGroup.getPrimary().getGlobalCheckpoint(), followerGroup.getPrimary().getGlobalCheckpoint());
5155
docCount += leaderGroup.appendDocs(randomInt(128));
5256
leaderGroup.syncGlobalCheckpoint();
53-
5457
leaderGroup.assertAllEqual(docCount);
55-
int expectedCount = docCount;
58+
Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
59+
assertBusy(() -> {
60+
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
61+
followerGroup.assertAllEqual(indexedDocIds.size());
62+
});
63+
// Deletes should be replicated to the follower
64+
List<String> deleteDocIds = randomSubsetOf(indexedDocIds);
65+
for (String deleteId : deleteDocIds) {
66+
BulkItemResponse resp = leaderGroup.delete(new DeleteRequest(index.getName(), "type", deleteId));
67+
assertThat(resp.getResponse().getResult(), equalTo(DocWriteResponse.Result.DELETED));
68+
}
69+
leaderGroup.syncGlobalCheckpoint();
5670
assertBusy(() -> {
5771
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
58-
followerGroup.assertAllEqual(expectedCount);
72+
followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size());
5973
});
6074
shardFollowTask.markAsCompleted();
6175
}

0 commit comments

Comments
 (0)