Skip to content

Commit 5e95b8b

Browse files
committed
Retrying replication requests on replica doesn't call onRetry (#21189)
Replication request may arrive at a replica before the replica's node has processed a required mapping update. In these cases the TransportReplicationAction will retry the request once a new cluster state arrives. Sadly that retry logic failed to call `ReplicationRequest#onRetry`, causing duplicates in the append only use case. This commit fixes this and also the test which missed the check. I also added an assertion which would have helped finding the source of the duplicates. This was discovered by https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+multijob-unix-compatibility/os=opensuse/174/ Relates #20211
1 parent 86eb792 commit 5e95b8b

File tree

5 files changed

+46
-4
lines changed

5 files changed

+46
-4
lines changed

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ public void onFailure(Exception e) {
473473
transportReplicaAction,
474474
request),
475475
e);
476+
request.onRetry();
476477
final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
477478
observer.waitForNextChange(new ClusterStateObserver.Listener() {
478479
@Override

core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,12 @@ public boolean delete() {
4848
public long ramBytesUsed() {
4949
return BASE_RAM_BYTES_USED;
5050
}
51+
52+
@Override
53+
public String toString() {
54+
return "DeleteVersionValue{" +
55+
"version=" + version() + ", " +
56+
"time=" + time +
57+
'}';
58+
}
5159
}

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.lucene.search.IndexSearcher;
3636
import org.apache.lucene.search.SearcherFactory;
3737
import org.apache.lucene.search.SearcherManager;
38+
import org.apache.lucene.search.TermQuery;
3839
import org.apache.lucene.store.AlreadyClosedException;
3940
import org.apache.lucene.store.Directory;
4041
import org.apache.lucene.store.LockObtainFailedException;
@@ -474,7 +475,8 @@ private void innerIndex(Index index) throws IOException {
474475
// if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the
475476
// lucene index without checking the version map but we still do the version check
476477
final boolean forceUpdateDocument;
477-
if (canOptimizeAddDocument(index)) {
478+
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
479+
if (canOptimizeAddDocument) {
478480
long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
479481
if (index.isRetry()) {
480482
forceUpdateDocument = true;
@@ -513,7 +515,8 @@ private void innerIndex(Index index) throws IOException {
513515
final long updatedVersion = updateVersion(index, currentVersion, expectedVersion);
514516
index.setCreated(deleted);
515517
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
516-
// document does not exists, we can optimize for create
518+
// document does not exists, we can optimize for create, but double check if assertions are running
519+
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
517520
index(index, indexWriter);
518521
} else {
519522
update(index, indexWriter);
@@ -522,6 +525,26 @@ private void innerIndex(Index index) throws IOException {
522525
}
523526
}
524527

528+
/**
529+
* Asserts that the doc in the index operation really doesn't exist
530+
*/
531+
private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException {
532+
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
533+
if (versionValue != null) {
534+
if (versionValue.delete() == false || allowDeleted == false) {
535+
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")");
536+
}
537+
} else {
538+
try (final Searcher searcher = acquireSearcher("assert doc doesn't exist")) {
539+
final long docsWithId = searcher.searcher().count(new TermQuery(index.uid()));
540+
if (docsWithId > 0) {
541+
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists [" + docsWithId + "] times in index");
542+
}
543+
}
544+
}
545+
return true;
546+
}
547+
525548
private long updateVersion(Engine.Operation op, long currentVersion, long expectedVersion) {
526549
final long updatedVersion = op.versionType().updateVersion(currentVersion, expectedVersion);
527550
op.updateVersion(updatedVersion);

core/src/main/java/org/elasticsearch/index/engine/VersionValue.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,11 @@ public long ramBytesUsed() {
5757
public Collection<Accountable> getChildResources() {
5858
return Collections.emptyList();
5959
}
60+
61+
@Override
62+
public String toString() {
63+
return "VersionValue{" +
64+
"version=" + version +
65+
'}';
66+
}
6067
}

core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -867,8 +867,11 @@ protected ReplicaResult shardOperationOnReplica(Request request) {
867867
final CapturingTransport.CapturedRequest capturedRequest = capturedRequests.get(0);
868868
assertThat(capturedRequest.action, equalTo("testActionWithExceptions[r]"));
869869
assertThat(capturedRequest.request, instanceOf(TransportReplicationAction.ConcreteShardRequest.class));
870-
assertThat(((TransportReplicationAction.ConcreteShardRequest<?>) capturedRequest.request).getRequest(), equalTo(request));
871-
assertThat(((TransportReplicationAction.ConcreteShardRequest<?>) capturedRequest.request).getTargetAllocationID(),
870+
final TransportReplicationAction.ConcreteShardRequest<Request> concreteShardRequest =
871+
(TransportReplicationAction.ConcreteShardRequest<Request>) capturedRequest.request;
872+
assertThat(concreteShardRequest.getRequest(), equalTo(request));
873+
assertThat(concreteShardRequest.getRequest().isRetrySet.get(), equalTo(true));
874+
assertThat(concreteShardRequest.getTargetAllocationID(),
872875
equalTo(replica.allocationId().getId()));
873876
}
874877

0 commit comments

Comments
 (0)