Skip to content

Commit 5be22df

Browse files
committed
Force execution of finish shard bulk request (#51957) (#52484)
Currently the shard bulk request can be rejected by the write threadpool after a mapping update. This introduces a scenario where the mapping listener thread will attempt to finish the request and fsync. This thread can potentially be a transport thread. This commit fixes this issue by forcing the finish action to happen on the write threadpool. Fixes #51904.
1 parent 0f5d505 commit 5be22df

File tree

2 files changed

+128
-10
lines changed

2 files changed

+128
-10
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -168,16 +168,29 @@ protected void doRun() throws Exception {
168168

169169
@Override
170170
public void onRejection(Exception e) {
171-
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
172-
while (context.hasMoreOperationsToExecute()) {
173-
context.setRequestToExecute(context.getCurrent());
174-
final DocWriteRequest<?> docWriteRequest = context.getRequestToExecute();
175-
onComplete(
176-
exceptionToResult(
177-
e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()),
178-
context, null);
179-
}
180-
finishRequest();
171+
// We must finish the outstanding request. Finishing the outstanding request can include
172+
//refreshing and fsyncing. Therefore, we must force execution on the WRITE thread.
173+
executor.execute(new ActionRunnable<PrimaryResult<BulkShardRequest, BulkShardResponse>>(listener) {
174+
175+
@Override
176+
protected void doRun() {
177+
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
178+
while (context.hasMoreOperationsToExecute()) {
179+
context.setRequestToExecute(context.getCurrent());
180+
final DocWriteRequest<?> docWriteRequest = context.getRequestToExecute();
181+
onComplete(
182+
exceptionToResult(
183+
e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()),
184+
context, null);
185+
}
186+
finishRequest();
187+
}
188+
189+
@Override
190+
public boolean isForceExecution() {
191+
return true;
192+
}
193+
});
181194
}
182195

183196
private void finishRequest() {

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.cluster.metadata.IndexMetaData;
4141
import org.elasticsearch.common.lucene.uid.Versions;
4242
import org.elasticsearch.common.settings.Settings;
43+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4344
import org.elasticsearch.index.IndexSettings;
4445
import org.elasticsearch.index.VersionType;
4546
import org.elasticsearch.index.engine.Engine;
@@ -53,13 +54,18 @@
5354
import org.elasticsearch.index.shard.ShardId;
5455
import org.elasticsearch.index.translog.Translog;
5556
import org.elasticsearch.rest.RestStatus;
57+
import org.elasticsearch.threadpool.TestThreadPool;
58+
import org.elasticsearch.threadpool.ThreadPool;
5659

5760
import java.io.IOException;
5861
import java.util.Collections;
62+
import java.util.concurrent.BrokenBarrierException;
5963
import java.util.concurrent.CountDownLatch;
64+
import java.util.concurrent.CyclicBarrier;
6065
import java.util.concurrent.atomic.AtomicInteger;
6166

6267
import static org.hamcrest.CoreMatchers.equalTo;
68+
import static org.hamcrest.CoreMatchers.instanceOf;
6369
import static org.hamcrest.CoreMatchers.not;
6470
import static org.hamcrest.CoreMatchers.notNullValue;
6571
import static org.hamcrest.Matchers.arrayWithSize;
@@ -818,6 +824,105 @@ public void testRetries() throws Exception {
818824
latch.await();
819825
}
820826

827+
public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
828+
TestThreadPool rejectingThreadPool = new TestThreadPool(
829+
"TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate",
830+
Settings.builder()
831+
.put("thread_pool." + ThreadPool.Names.WRITE + ".size", 1)
832+
.put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", 1)
833+
.build());
834+
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
835+
rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
836+
try {
837+
cyclicBarrier.await();
838+
logger.info("blocking the write executor");
839+
cyclicBarrier.await();
840+
logger.info("unblocked the write executor");
841+
} catch (Exception e) {
842+
throw new RuntimeException(e);
843+
}
844+
});
845+
try {
846+
cyclicBarrier.await();
847+
// Place a task in the queue to block next enqueue
848+
rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {});
849+
850+
BulkItemRequest[] items = new BulkItemRequest[2];
851+
DocWriteRequest<IndexRequest> writeRequest1 = new IndexRequest("index").id("id")
852+
.source(Requests.INDEX_CONTENT_TYPE, "foo", 1);
853+
DocWriteRequest<IndexRequest> writeRequest2 = new IndexRequest("index").id("id")
854+
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
855+
items[0] = new BulkItemRequest(0, writeRequest1);
856+
items[1] = new BulkItemRequest(1, writeRequest2);
857+
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
858+
859+
Engine.IndexResult mappingUpdate =
860+
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
861+
Translog.Location resultLocation1 = new Translog.Location(42, 36, 36);
862+
Translog.Location resultLocation2 = new Translog.Location(42, 42, 42);
863+
Engine.IndexResult success1 = new FakeIndexResult(1, 1, 10, true, resultLocation1);
864+
Engine.IndexResult success2 = new FakeIndexResult(1, 1, 13, true, resultLocation2);
865+
866+
IndexShard shard = mock(IndexShard.class);
867+
when(shard.shardId()).thenReturn(shardId);
868+
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
869+
.thenReturn(success1, mappingUpdate, success2);
870+
when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong())).thenCallRealMethod();
871+
when(shard.mapperService()).thenReturn(mock(MapperService.class));
872+
873+
randomlySetIgnoredPrimaryResponse(items[0]);
874+
875+
AtomicInteger updateCalled = new AtomicInteger();
876+
877+
final CountDownLatch latch = new CountDownLatch(1);
878+
TransportShardBulkAction.performOnPrimary(
879+
bulkShardRequest, shard, null, rejectingThreadPool::absoluteTimeInMillis, (update, shardId, type, listener) -> {
880+
// There should indeed be a mapping update
881+
assertNotNull(update);
882+
updateCalled.incrementAndGet();
883+
listener.onResponse(null);
884+
try {
885+
// Release blocking task now that the continue write execution has been rejected and
886+
// the finishRequest execution has been force enqueued
887+
cyclicBarrier.await();
888+
} catch (InterruptedException | BrokenBarrierException e) {
889+
throw new IllegalStateException(e);
890+
}
891+
}, listener -> listener.onResponse(null), new LatchedActionListener<>(
892+
ActionTestUtils.assertNoFailureListener(result ->
893+
// Assert that we still need to fsync the location that was successfully written
894+
assertThat(((WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location,
895+
equalTo(resultLocation1))), latch),
896+
rejectingThreadPool);
897+
latch.await();
898+
899+
assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));
900+
901+
verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
902+
903+
BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse();
904+
assertThat(primaryResponse1.getItemId(), equalTo(0));
905+
assertThat(primaryResponse1.getId(), equalTo("id"));
906+
assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
907+
assertFalse(primaryResponse1.isFailed());
908+
assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED));
909+
assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L));
910+
911+
BulkItemResponse primaryResponse2 = bulkShardRequest.items()[1].getPrimaryResponse();
912+
assertThat(primaryResponse2.getItemId(), equalTo(1));
913+
assertThat(primaryResponse2.getId(), equalTo("id"));
914+
assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
915+
assertTrue(primaryResponse2.isFailed());
916+
assertNull(primaryResponse2.getResponse());
917+
assertEquals(primaryResponse2.status(), RestStatus.TOO_MANY_REQUESTS);
918+
assertThat(primaryResponse2.getFailure().getCause(), instanceOf(EsRejectedExecutionException.class));
919+
920+
closeShards(shard);
921+
} finally {
922+
rejectingThreadPool.shutdownNow();
923+
}
924+
}
925+
821926
private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
822927
if (randomBoolean()) {
823928
// add a response to the request and thereby check that it is ignored for the primary.

0 commit comments

Comments
 (0)