Skip to content

Commit eaec994

Browse files
committed
Synchronize WriteReplicaResult callbacks (#36770)
TransportWriteAction.WriteReplicaResult is not properly synchronized, which can lead to a data race between the thread that calls respond and the AsyncAfterWriteAction that calls either onSuccess or onFailure. This data race results in the response listener not being called, which ultimately results in a stuck replication task on the replica.
1 parent 59f30c4 commit eaec994

File tree

2 files changed

+52
-2
lines changed

2 files changed

+52
-2
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ public synchronized void respond(ActionListener<Response> listener) {
163163
* Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
164164
*/
165165
protected void respondIfPossible(Exception ex) {
166+
assert Thread.holdsLock(this);
166167
if (finishedAsyncActions && listener != null) {
167168
if (ex == null) {
168169
super.respond(listener);
@@ -206,7 +207,7 @@ public WriteReplicaResult(ReplicaRequest request, @Nullable Location location,
206207
}
207208

208209
@Override
209-
public void respond(ActionListener<TransportResponse.Empty> listener) {
210+
public synchronized void respond(ActionListener<TransportResponse.Empty> listener) {
210211
this.listener = listener;
211212
respondIfPossible(null);
212213
}
@@ -215,6 +216,7 @@ public void respond(ActionListener<TransportResponse.Empty> listener) {
215216
* Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
216217
*/
217218
protected void respondIfPossible(Exception ex) {
219+
assert Thread.holdsLock(this);
218220
if (finishedAsyncActions && listener != null) {
219221
if (ex == null) {
220222
super.respond(listener);
@@ -225,7 +227,7 @@ protected void respondIfPossible(Exception ex) {
225227
}
226228

227229
@Override
228-
public void onFailure(Exception ex) {
230+
public synchronized void onFailure(Exception ex) {
229231
finishedAsyncActions = true;
230232
respondIfPossible(ex);
231233
}

server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@
6565
import java.util.Collections;
6666
import java.util.HashSet;
6767
import java.util.Locale;
68+
import java.util.concurrent.BrokenBarrierException;
69+
import java.util.concurrent.CountDownLatch;
70+
import java.util.concurrent.CyclicBarrier;
6871
import java.util.concurrent.ExecutionException;
6972
import java.util.concurrent.TimeUnit;
7073
import java.util.concurrent.atomic.AtomicBoolean;
@@ -346,6 +349,51 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
346349
}
347350
}
348351

352+
public void testConcurrentWriteReplicaResultCompletion() throws InterruptedException {
353+
IndexShard replica = mock(IndexShard.class);
354+
when(replica.getTranslogDurability()).thenReturn(Translog.Durability.ASYNC);
355+
TestRequest request = new TestRequest();
356+
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
357+
TransportWriteAction.WriteReplicaResult<TestRequest> replicaResult = new TransportWriteAction.WriteReplicaResult<>(
358+
request, new Translog.Location(0, 0, 0), null, replica, logger);
359+
CyclicBarrier barrier = new CyclicBarrier(2);
360+
Runnable waitForBarrier = () -> {
361+
try {
362+
barrier.await();
363+
} catch (InterruptedException | BrokenBarrierException e) {
364+
throw new AssertionError(e);
365+
}
366+
};
367+
CountDownLatch completionLatch = new CountDownLatch(1);
368+
threadPool.generic().execute(() -> {
369+
waitForBarrier.run();
370+
replicaResult.respond(new ActionListener<TransportResponse.Empty>() {
371+
@Override
372+
public void onResponse(TransportResponse.Empty empty) {
373+
completionLatch.countDown();
374+
}
375+
376+
@Override
377+
public void onFailure(Exception e) {
378+
completionLatch.countDown();
379+
}
380+
});
381+
});
382+
if (randomBoolean()) {
383+
threadPool.generic().execute(() -> {
384+
waitForBarrier.run();
385+
replicaResult.onFailure(null);
386+
});
387+
} else {
388+
threadPool.generic().execute(() -> {
389+
waitForBarrier.run();
390+
replicaResult.onSuccess(false);
391+
});
392+
}
393+
394+
assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
395+
}
396+
349397
private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
350398

351399
private final boolean withDocumentFailureOnPrimary;

0 commit comments

Comments
 (0)