-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Retain soft-deleted documents for rollback #31846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
An operation whose seqno is greater than the global checkpoint is subject to undoing when the primary fails over. If that operation updates or deletes existing documents in Lucene, those documents are also subject to undoing. Thus, we need to retain them during merges until they are no longer subject to rollback.
|
Pinging @elastic/es-distributed |
|
I looked at this and I was a bit unhappy that we had to make the resolving methods more complicated by introducing another wrapper class. I think that There is some bwc complications that we agreed to address in a backport pr only. |
|
@bleskes I've updated the resolveDocSeqNo method. Can you have a look? Thank you. |
bleskes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I'd love @jpountz to check the lucene parts (I can supply context). Also, I think we miss a test for the retention policy, no?
| } else { | ||
| plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, | ||
| index.seqNo(), index.version()); | ||
| plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version(), prevSeqNo.getAsLong()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add an assertion that loads the primary term and checks equality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this assertion but it was violated. I added a TODO in resolveDocSeqNo method
| this.indexIntoLucene = indexIntoLucene; | ||
| this.addStaleOpToLucene = addStaleOpToLucene; | ||
| this.seqNoOfNewerVersion = seqNoOfNewerVersion; | ||
| assert addStaleOpToLucene == false || seqNoOfNewerVersion >= 0 : "stale op [" + seqNoForIndexing + "] with invalid newer seqno"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also assert it's higher than the seqNoForIndexing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also assert that if addStateOpToLucene is true, seqNoNewerVersion is unassigned? (move the assertion next to the other ones please)
| final long versionForIndexing; | ||
| final boolean indexIntoLucene; | ||
| final boolean addStaleOpToLucene; | ||
| final long seqNoOfNewerVersion; // the seqno of the newer copy of this _uid if exists; otherwise -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe call this seqNoOfNewerDocIfStale? (trying to avoid the word version as it's confusing)
| } else { | ||
| plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, | ||
| delete.seqNo(), delete.version()); | ||
| plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about an assertion loading the term if seq no is equal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to wait for the actual rollback to add the assertion here. I added a TODO in resolveDocSeqNo.
jpountz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parts that I understand look good, @bleskes kindly provided context so that I better understand this change. I just left a note that the new retention query might be slow. There are potential way that we could reduce the slow down by adding a small cache of the max value of the UPDATED_BY_SEQNO_NAME field as a follow-up.
| .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE), BooleanClause.Occur.SHOULD) | ||
| .add(NumericDocValuesField.newSlowRangeQuery(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, | ||
| globalCheckpointSupplier.getAsLong() + 1, Long.MAX_VALUE), BooleanClause.Occur.SHOULD) | ||
| .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding a doc-value query as a SHOULD clause might mean we need to do a linear scan to find matches of this query. We don't have much of a choice so I would still let the change in and benchmark, but we might want to still add a comment here.
@bleskes The retention is tested in Engine#testForceMergeWithSoftDeletesRetention. |
| Map<Long, Long> seqNos = readUpdatedBySeqNos.apply(engine); | ||
| assertThat(seqNos, hasEntry(0L, 1L)); // 0 -> 1 | ||
| assertThat(seqNos, hasEntry(1L, 4L)); // 1 -> 3 -> 4 | ||
| assertThat(seqNos, hasEntry(2L, 3L)); // 2 -> 3 (stale) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 . Can we also have stale delete operations ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we have an entry for term seq3? also check that the map size is 3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| engine.index(replicaIndexForDoc(createParsedDoc("1", null), 5, 4, false)); | ||
| Map<Long, Long> seqNos = readUpdatedBySeqNos.apply(engine); | ||
| assertThat(seqNos, hasEntry(0L, 1L)); // 0 -> 1 | ||
| assertThat(seqNos, hasEntry(1L, 4L)); // 1 -> 3 -> 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we see 3 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment
| assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : | ||
| "Delete tombstone document but _tombstone field is not set [" + doc + " ]"; | ||
| doc.add(softDeleteField); | ||
| if (plan.addStaleOpToLucene || plan.currentlyDeleted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need UPDATED_BY_SEQNO_NAME to be updated here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. A regular delete tombstone is soft-deleted before indexing into Lucene, thus "softUpdateDocument" method won't affect on that document. I think we should not add the updated_by_seqno field to the stale deletes to ensure the consistency between deletes tombstones(i.e., all deletes don't have updated_by_seqno value).
However, I found the case which might be an issue if we remove the safe commit. This happens as follows:
- Index a doc (version=1) (seq#0)
- Delete that doc (seq#1)
- Index that doc (version=2) (seq#2)
Suppose the global checkpoint is 1, and the seq#2 in the danger zone, we trigger force merge. The problem is seq#2 only updates the updated_by_seqno of seq#0 because the delete is invisible while seq#0 is still visible (until refresh). A force merge will reclaim the delete and retain seq#0 and seq#2. A rollback seq#2 would restore seq#0 which is incorrect.
We can avoid this issue if a document is invisible immediately after it's soft-deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we discussed this yesterday and this has to do with the fact that lucene treats a document that is index as soft deleted (i.e., the tombstone) differently than a doc that is index and then later soft deleted while that doc is in the indexing buffer. As a work around we decide to detect this and force a refresh to maintain semantics. This will allow this work to continue while we work on a solution in lucene.
| engine.refresh("test"); | ||
| Map<Long, Long> seqNos = readUpdatedBySeqNos.apply(engine); | ||
| assertThat(seqNos, hasEntry(0L, 2L)); // 0 -> 1 -> 2 | ||
| assertThat(seqNos, hasEntry(1L, 2L)); // 1 -> 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert that the total size is what we expect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
sorry. I have no idea why I missed the tests. I left some comments there. |
|
I have another approach that requires some changes:
Suppose we have three operations of a single document id index-1 (i1), delete-2 (d2), and index-3 (i3) with the following processing orders: a. i1, d2, i3 -> updated_by_seqno { 2, _, _ } c. d2, i1, i3 -> updated_by_seqno { 2, _, _ } e. i3, i1, d2 -> updated_by_seqno { 3, 3, _}
I think we are good now with these changes. WDYT? |
|
Discussed with Boaz, we agreed not to proceed with this approach because:
We will instead go with Simon's approach which re-adds a delete tombstone with |
|
@bleskes It's ready for another go. Can you please have a look? Thank you! |
|
@dnhatn and I discussed the current approach and sadly it still has some bugs - consider the following indexing order for the same doc: index v1 (seq 2) , delete (seq 4) and index (seq 10). With the current approach that loads the current doc version from lucene with a normal searcher, when index (seq 10) comes in, it will not see the delete (seq 4) and will not add an extra tombstone doc with update_by_seq_no set to 10. We can fix this by using a reader that exposes tombstone docs but then we have another problem - if the global checkpoint is 6, the merge policy is free to reclame the delete tombstone (it's not marked with update by seq no yet) and it is free to leave the index v1 (seq 2) in the index. This will trick the rollback to think index v1 is the rollback doc. We've had some ideas on other approaches but it becomes clear that our testing is lacking as we keep finding issues during reviews rather than by failing tests. Nhat and I had an idea on how to write a test that should cover all these edge cases and that's going to be a first goal now (i.e., having a test that fails for this issue and also the problems we found with previous iterations). We're going to give this a day or two and if we fail to be able to write a test we feel confident about we're going to invest in a TLA+ model before proceeding. We chose to wait with the TLA+ model as it will take some time to write and will be based on our current model of lucene (which may be flawed - we are still working on mapping it out) rather than using lucene it self. |
|
@bleskes I've added a test which simulates Lucene merge in f6487e1. This test is able to detect the current issue. Below is an example that it found. Can you please have a look? (/cc @ywelsch) |
| List<Engine.Operation> operations = new ArrayList<>(); | ||
| int numOps = scaledRandomIntBetween(10, 200); | ||
| for (int seqNo = 0; seqNo < numOps; seqNo++) { | ||
| String id = Integer.toString(between(1, 5)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use the random history generator? maybe we should extend it if we need to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adjusted the history generator method and use it here.
| .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); | ||
| final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); | ||
| final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); | ||
| realisticShuffleOperations(operations); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why be realistic here? Pure random is a better test, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran several hundred iterations with the pure random but no failure. The reason is the operations were shuffled so much so that the local checkpoint was not advanced enough to allow us to reclaim documents or tombstone. Then I introduced this shuffle method.
| } | ||
| processedOps.put(op.seqNo(), op); | ||
| if (between(1, 20) == 1) { | ||
| assertDocumentsForRollback(engine, globalCheckpoint, processedOps); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not do it every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An interesting case is when the updated_by_seqno is updated twice. This method needs to refresh, and if we refresh every operation, that case will disappear.
| if (between(1, 5) == 1) { | ||
| engine.maybePruneDeletes(); | ||
| } | ||
| if (between(1, 20) == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not do it every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it 50%
|
@dnhatn I think that test is good. I would prefer to change it a bit to not rely on the engine at all in it's expectations - it knows the series of indexing operations it has performed and thus can figure out what the "rollback" version should be. I also left some questions about things that weren't clear to me. |
|
sorry. I got confused, the test already keeps track of indexed ops on it's own and uses that to compute expected rollback targets. |
|
@dnhatn I think that we can close this PR for now? |
|
I am closing this as we are going to implement a commit-based rollback. |
An operation whose seqno is greater than the global checkpoint is
subject to undoing when the primary fails over. If that operation
updates or deletes existing documents in Lucene, those documents are
also subject to undoing. Thus, we need to retain them during merges
until they are no longer subject to rollback.