Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public class InternalEngine extends Engine {
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
/**
Expand Down Expand Up @@ -186,7 +187,7 @@ public InternalEngine(EngineConfig engineConfig) {
this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint, startingCommit);
writer = createWriter(startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadOrGenerateHistoryUUID(writer);
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
Expand Down Expand Up @@ -345,15 +346,20 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
}
}

private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue());
break;
final String key = entry.getKey();
if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
assert maxUnsafeAutoIdTimestamp.get() == -1 :
"max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue()));
}
if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 :
"max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]";
maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue()));
}
}
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
}

@Override
Expand Down Expand Up @@ -803,11 +809,24 @@ public IndexResult index(Index index) throws IOException {

private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
final IndexingStrategy plan;
if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) {
// no need to deal with out of order delivery - we never saw this one
final boolean appendOnlyRequest = canOptimizeAddDocument(index);
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
/*
* As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue
* a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before
* the original append-only. In this case we can't simply proceed with the append only without consulting the version map.
* If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen
* the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
} else {
if (appendOnlyRequest == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we introduce a method similar to mayHaveBeenIndexedBefore that does the check and also updates the maxSeqNoOfNonAppendOnlyOperations? I think it's good to have both marker handling consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently @s1monw prefered the reverse. I'm fine with leaving as is.

maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
}
versionMap.enforceSafeAccess();
// drop out of order operations
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
Expand Down Expand Up @@ -942,6 +961,11 @@ private boolean mayHaveBeenIndexedBefore(Index index) {
return mayHaveBeenIndexBefore;
}

// for testing
long getMaxSeqNoOfNonAppendOnlyOperations() {
return maxSeqNoOfNonAppendOnlyOperations.get();
}

private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.addDocuments(docs);
Expand Down Expand Up @@ -1097,6 +1121,9 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
+ delete.versionType() + "]";
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
// unlike the primary, replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4530,4 +4530,96 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception {
}
}

public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final Path translogPath = createTempDir();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) {
final CountDownLatch latch = new CountDownLatch(1);
final Thread appendOnlyIndexer = new Thread(() -> {
try {
latch.countDown();
final int numDocs = scaledRandomIntBetween(100, 1000);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null);
if (randomBoolean()) {
engine.index(appendOnlyReplica(doc, randomBoolean(), 1, engine.getLocalCheckpointTracker().generateSeqNo()));
} else {
engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong()));
}
}
} catch (Exception ex) {
throw new RuntimeException("Failed to index", ex);
}
});
appendOnlyIndexer.setName("append-only indexer");
appendOnlyIndexer.start();
latch.await();
long maxSeqNoOfNonAppendOnly = SequenceNumbers.NO_OPS_PERFORMED;
final int numOps = scaledRandomIntBetween(100, 1000);
for (int i = 0; i < numOps; i++) {
ParsedDocument parsedDocument = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null);
if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean());
if (randomBoolean()) {
engine.index(doc);
} else {
engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(),
doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis()));
}
maxSeqNoOfNonAppendOnly = seqno;
} else { // On primary - do not update max_seqno for non-append-only operations
if (randomBoolean()) {
engine.index(indexForDoc(parsedDocument));
} else {
engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), newUid(parsedDocument.id())));
}
}
}
appendOnlyIndexer.join(120_000);
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.syncTranslog();
engine.flush();
}
try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) {
assertThat("max_seqno from non-append-only was not bootstrap from the safe commit",
engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(globalCheckpoint.get()));
}
}

public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception {
long lookupTimes = 0L;
final LocalCheckpointTracker localCheckpointTracker = engine.getLocalCheckpointTracker();
final int initDocs = between(0, 10);
for (int i = 0; i < initDocs; i++) {
index(engine, i);
lookupTimes++;
}
// doc1 is delayed and arrived after a non-append-only op.
final long seqNoAppendOnly1 = localCheckpointTracker.generateSeqNo();
final long seqnoNormalOp = localCheckpointTracker.generateSeqNo();
if (randomBoolean()) {
engine.index(replicaIndexForDoc(
testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false));
} else {
engine.delete(replicaDeleteForDoc("d", 1, seqnoNormalOp, randomNonNegativeLong()));
}
lookupTimes++;
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(seqnoNormalOp));

// should not optimize for doc1 and process as a regular doc (eg. look up in version map)
engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null),
false, randomNonNegativeLong(), seqNoAppendOnly1));
lookupTimes++;
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));

// optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map.
engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, testDocumentWithTextField(), SOURCE, null),
false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo()));
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
Expand Down Expand Up @@ -419,6 +420,27 @@ private void updateGCDeleteCycle(IndexShard shard, TimeValue interval) {
shard.onSettingsChanged();
}

/**
* This test ensures the consistency between primary and replica when non-append-only (eg. index request with id or delete) operation
* of the same document is processed before the original append-only request on replicas. The append-only document can be exposed and
* deleted on the primary before it is added to replica. Replicas should treat a late append-only request as a regular index request.
*/
public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception {
try (ReplicationGroup shards = createGroup(1)) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
// Append-only request - without id
final BulkShardRequest indexRequest = indexOnPrimary(
new IndexRequest(index.getName(), "type", null).source("{}", XContentType.JSON), primary);
final String docId = Iterables.get(getShardDocUIDs(primary), 0);
final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", docId), primary);
deleteOnReplica(deleteRequest, shards, replica);
indexOnReplica(indexRequest, shards, replica);
shards.assertAllEqual(0);
}
}

/** Throws <code>documentFailure</code> on every indexing operation */
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
final String documentFailureMessage;
Expand Down