Skip to content

Commit fc287bd

Browse files
authored
Interpret ?timeout=-1 as infinite ack timeout (#107675)
APIs which perform cluster state updates typically accept the `?master_timeout=` and `?timeout=` parameters to respectively set the pending task queue timeout and the acking timeout for the cluster state update. Both of these parameters accept the value `-1`, but `?master_timeout=-1` means to wait indefinitely whereas `?timeout=-1` means the same thing as `?timeout=0`, namely that acking times out immediately on commit. There are some situations where it makes sense to wait for as long as possible for nodes to ack a cluster state update. In practice this wait is bounded by other mechanisms (e.g. the lag detector will remove the node from the cluster after a couple of minutes of failing to apply cluster state updates) but these are not really the concern of clients. Therefore with this commit we change the meaning of `?timeout=-1` to mean that the acking timeout is infinite.
1 parent b0f58ab commit fc287bd

File tree

6 files changed

+136
-8
lines changed

6 files changed

+136
-8
lines changed

docs/changelog/107675.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
pr: 107675
2+
summary: Interpret `?timeout=-1` as infinite ack timeout
3+
area: Cluster Coordination
4+
type: breaking
5+
issues: []
6+
breaking:
7+
title: Interpret `?timeout=-1` as infinite ack timeout
8+
area: REST API
9+
details: |
10+
Today {es} accepts the parameter `?timeout=-1` in many APIs, but interprets
11+
this to mean the same as `?timeout=0`. From 8.15 onwards `?timeout=-1` will
12+
mean to wait indefinitely, aligning the behaviour of this parameter with
13+
other similar parameters such as `?master_timeout`.
14+
impact: |
15+
Use `?timeout=0` to force relevant operations to time out immediately
16+
instead of `?timeout=-1`
17+
notable: false

docs/reference/rest-api/common-parms.asciidoc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,12 +1223,13 @@ the timeout expires, the request fails and returns an error. Defaults to `30s`.
12231223
Can also be set to `-1` to indicate that the request should never timeout.
12241224
end::master-timeout[]
12251225

1226-
tag::timeout[]
12271226
`timeout`::
12281227
(Optional, <<time-units, time units>>)
1229-
Period to wait for a response. If no response is received before the timeout
1230-
expires, the request fails and returns an error. Defaults to `30s`.
1231-
end::timeout[]
1228+
Period to wait for a response from all relevant nodes in the cluster after
1229+
updating the cluster metadata. If no response is received before the timeout
1230+
expires, the cluster metadata update still applies but the response will
1231+
indicate that it was not completely acknowledged. Defaults to `30s`.
1232+
Can also be set to `-1` to indicate that the request should never timeout.
12321233
end::timeoutparms[]
12331234

12341235
tag::transform-id[]

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,9 @@ public void removeGlobalRetention(
8787
List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams,
8888
final ActionListener<UpdateDataStreamGlobalRetentionResponse> listener
8989
) {
90-
final var ackTimeout = request.masterNodeTimeout().millis() < 0 ? TimeValue.MAX_VALUE : request.masterNodeTimeout();
91-
// NB a negative master node timeout means never to time out, but a negative ack timeout means to time out immediately.
92-
// TODO when https://github.com/elastic/elasticsearch/issues/107044 is fixed, we can just use request.masterNodeTimeout() directly
9390
taskQueue.submitTask(
9491
"remove-data-stream-global-retention",
95-
new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, ackTimeout),
92+
new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, request.masterNodeTimeout()),
9693
request.masterNodeTimeout()
9794
);
9895
}

server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,28 @@
88

99
package org.elasticsearch.action.admin.indices.create;
1010

11+
import io.netty.handler.codec.http.HttpMethod;
12+
1113
import org.elasticsearch.action.ActionListener;
1214
import org.elasticsearch.action.ActionRequestBuilder;
1315
import org.elasticsearch.action.UnavailableShardsException;
1416
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1517
import org.elasticsearch.action.admin.indices.alias.Alias;
1618
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
19+
import org.elasticsearch.action.support.ActionTestUtils;
1720
import org.elasticsearch.action.support.ActiveShardCount;
1821
import org.elasticsearch.action.support.IndicesOptions;
22+
import org.elasticsearch.action.support.PlainActionFuture;
1923
import org.elasticsearch.action.support.master.AcknowledgedResponse;
24+
import org.elasticsearch.client.Response;
2025
import org.elasticsearch.cluster.ClusterState;
2126
import org.elasticsearch.cluster.metadata.IndexMetadata;
2227
import org.elasticsearch.cluster.metadata.MappingMetadata;
2328
import org.elasticsearch.cluster.metadata.Metadata;
29+
import org.elasticsearch.cluster.service.ClusterService;
30+
import org.elasticsearch.common.Priority;
2431
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.common.util.concurrent.FutureUtils;
2533
import org.elasticsearch.core.TimeValue;
2634
import org.elasticsearch.index.IndexNotFoundException;
2735
import org.elasticsearch.index.IndexService;
@@ -31,17 +39,24 @@
3139
import org.elasticsearch.test.ESIntegTestCase;
3240
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
3341
import org.elasticsearch.test.ESIntegTestCase.Scope;
42+
import org.elasticsearch.test.rest.ESRestTestCase;
3443
import org.elasticsearch.xcontent.XContentFactory;
3544

45+
import java.io.IOException;
3646
import java.util.Map;
3747
import java.util.concurrent.CountDownLatch;
48+
import java.util.concurrent.CyclicBarrier;
49+
import java.util.concurrent.TimeUnit;
3850
import java.util.concurrent.atomic.AtomicInteger;
3951
import java.util.function.BiFunction;
4052

53+
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
4154
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS;
55+
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
4256
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4357
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
4458
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
59+
import static org.elasticsearch.test.rest.ESRestTestCase.entityAsMap;
4560
import static org.hamcrest.Matchers.allOf;
4661
import static org.hamcrest.Matchers.equalTo;
4762
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -53,6 +68,11 @@
5368
@ClusterScope(scope = Scope.TEST)
5469
public class CreateIndexIT extends ESIntegTestCase {
5570

71+
@Override
72+
protected boolean addMockHttpTransport() {
73+
return false; // expose HTTP requests
74+
}
75+
5676
public void testCreationDateGivenFails() {
5777
try {
5878
prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_CREATION_DATE, 4L)).get();
@@ -370,4 +390,38 @@ public void testIndexNameInResponse() {
370390
assertEquals("Should have index name in response", "foo", response.index());
371391
}
372392

393+
public void testInfiniteAckTimeout() throws IOException {
394+
final var clusterService = internalCluster().getInstance(ClusterService.class);
395+
final var barrier = new CyclicBarrier(2);
396+
clusterService.getClusterApplierService().runOnApplierThread("block for test", Priority.NORMAL, cs -> {
397+
safeAwait(barrier);
398+
safeAwait(barrier);
399+
}, ActionListener.noop());
400+
401+
safeAwait(barrier);
402+
403+
final var request = ESRestTestCase.newXContentRequest(
404+
HttpMethod.PUT,
405+
"testindex",
406+
(builder, params) -> builder.startObject("settings")
407+
.field(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
408+
.field(SETTING_NUMBER_OF_REPLICAS, internalCluster().numDataNodes() - 1)
409+
.endObject()
410+
);
411+
request.addParameter("timeout", "-1");
412+
final var responseFuture = new PlainActionFuture<Response>();
413+
getRestClient().performRequestAsync(request, ActionTestUtils.wrapAsRestResponseListener(responseFuture));
414+
415+
if (randomBoolean()) {
416+
safeSleep(scaledRandomIntBetween(1, 100));
417+
}
418+
419+
assertFalse(responseFuture.isDone());
420+
safeAwait(barrier);
421+
422+
final var response = FutureUtils.get(responseFuture, 10, TimeUnit.SECONDS);
423+
assertEquals(200, response.getStatusLine().getStatusCode());
424+
assertTrue((boolean) extractValue("acknowledged", entityAsMap(response)));
425+
}
426+
373427
}

server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,14 @@ public void onCommit(TimeValue commitTime) {
711711
assert false : "ackTimeout must always be present: " + contextPreservingAckListener;
712712
ackTimeout = TimeValue.ZERO;
713713
}
714+
715+
if (ackTimeout.millis() < 0) {
716+
if (countDown.countDown()) {
717+
finish();
718+
}
719+
return;
720+
}
721+
714722
final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos()));
715723
if (timeLeft.nanos() == 0L) {
716724
onTimeout();

server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,6 +1663,57 @@ public void onAckTimeout() {
16631663
deterministicTaskQueue.runAllTasksInTimeOrder();
16641664
safeAwait(latch);
16651665
}
1666+
1667+
// check that -1 means an infinite ack timeout
1668+
{
1669+
final CountDownLatch latch = new CountDownLatch(2);
1670+
1671+
publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
1672+
publishListener.onResponse(null);
1673+
ackListener.onCommit(TimeValue.timeValueMillis(randomLongBetween(0, TimeValue.timeValueDays(1).millis())));
1674+
for (final var node : new DiscoveryNode[] { node1, node2, node3 }) {
1675+
deterministicTaskQueue.scheduleAt(
1676+
deterministicTaskQueue.getCurrentTimeMillis() + randomLongBetween(0, TimeValue.timeValueDays(1).millis()),
1677+
() -> ackListener.onNodeAck(node, null)
1678+
);
1679+
}
1680+
});
1681+
1682+
masterService.submitUnbatchedStateUpdateTask(
1683+
"test2",
1684+
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.MINUS_ONE, null), null) {
1685+
@Override
1686+
public ClusterState execute(ClusterState currentState) {
1687+
return ClusterState.builder(currentState).build();
1688+
}
1689+
1690+
@Override
1691+
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
1692+
latch.countDown();
1693+
}
1694+
1695+
@Override
1696+
protected AcknowledgedResponse newResponse(boolean acknowledged) {
1697+
assertTrue(acknowledged);
1698+
latch.countDown();
1699+
return AcknowledgedResponse.TRUE;
1700+
}
1701+
1702+
@Override
1703+
public void onFailure(Exception e) {
1704+
fail();
1705+
}
1706+
1707+
@Override
1708+
public void onAckTimeout() {
1709+
fail();
1710+
}
1711+
}
1712+
);
1713+
1714+
deterministicTaskQueue.runAllTasks(); // NB not in time order, there's no timeout to avoid
1715+
safeAwait(latch);
1716+
}
16661717
}
16671718
}
16681719

0 commit comments

Comments
 (0)