From 3942bdbe8f6a536101db75648b5c3868bcc8914d Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 22 Apr 2024 09:52:14 +0100 Subject: [PATCH 1/5] Interpret `?timeout=-1` as infinite ack timeout APIs which perform cluster state updates typically accept the `?master_timeout=` and `?timeout=` parameters to respectively set the pending task queue timeout and the acking timeout for the cluster state update. Both of these parameters accept the value `-1`, but `?master_timeout=-1` means to wait indefinitely whereas `?timeout=-1` means the same thing as `?timeout=0`, namely that acking times out immediately on commit. There are some situations where it makes sense to wait for as long as possible for nodes to ack a cluster state update. In practice this wait is bounded by other mechanisms (e.g. the lag detector will remove the node from the cluster after a couple of minutes of failing to apply cluster state updates) but these are not really the concern of clients. Therefore with this commit we change the meaning of `?timeout=-1` to mean that the acking timeout is infinite. --- docs/reference/rest-api/common-parms.asciidoc | 9 ++-- ...pdateDataStreamGlobalRetentionService.java | 5 +- .../admin/indices/create/CreateIndexIT.java | 54 +++++++++++++++++++ .../cluster/service/MasterService.java | 8 +++ .../cluster/service/MasterServiceTests.java | 51 ++++++++++++++++++ 5 files changed, 119 insertions(+), 8 deletions(-) diff --git a/docs/reference/rest-api/common-parms.asciidoc b/docs/reference/rest-api/common-parms.asciidoc index e63f66217d8d7..dd264c0e5bcd2 100644 --- a/docs/reference/rest-api/common-parms.asciidoc +++ b/docs/reference/rest-api/common-parms.asciidoc @@ -1223,12 +1223,13 @@ the timeout expires, the request fails and returns an error. Defaults to `30s`. Can also be set to `-1` to indicate that the request should never timeout. end::master-timeout[] -tag::timeout[] `timeout`:: (Optional, <>) -Period to wait for a response. If no response is received before the timeout -expires, the request fails and returns an error. Defaults to `30s`. -end::timeout[] +Period to wait for a response from all relevant nodes in the cluster after +updating the cluster metadata. If no response is received before the timeout +expires, the cluster metadata update still applies but the response will +indicate that it was not completely acknowledged. Defaults to `30s`. +Can also be set to `-1` to indicate that the request should never timeout. end::timeoutparms[] tag::transform-id[] diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java index 5ac50e388e13a..4aae343e60999 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java @@ -81,12 +81,9 @@ public void removeGlobalRetention( List affectedDataStreams, final ActionListener listener ) { - final var ackTimeout = request.masterNodeTimeout().millis() < 0 ? TimeValue.MAX_VALUE : request.masterNodeTimeout(); - // NB a negative master node timeout means never to time out, but a negative ack timeout means to time out immediately. - // TODO when https://github.com/elastic/elasticsearch/issues/107044 is fixed, we can just use request.masterNodeTimeout() directly taskQueue.submitTask( "remove-data-stream-global-retention", - new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, ackTimeout), + new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, request.masterNodeTimeout()), request.masterNodeTimeout() ); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 7574cd0271c46..d9cc0c98c991b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -8,20 +8,28 @@ package org.elasticsearch.action.admin.indices.create; +import io.netty.handler.codec.http.HttpMethod; + import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Response; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; @@ -31,17 +39,24 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentFactory; +import java.io.IOException; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS; +import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse; +import static org.elasticsearch.test.rest.ESRestTestCase.entityAsMap; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -53,6 +68,11 @@ @ClusterScope(scope = Scope.TEST) public class CreateIndexIT extends ESIntegTestCase { + @Override + protected boolean addMockHttpTransport() { + return false; // expose HTTP requests + } + public void testCreationDateGivenFails() { try { prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_CREATION_DATE, 4L)).get(); @@ -358,4 +378,38 @@ public void testIndexNameInResponse() { assertEquals("Should have index name in response", "foo", response.index()); } + public void testInfiniteAckTimeout() throws IOException { + final var clusterService = internalCluster().getInstance(ClusterService.class); + final var barrier = new CyclicBarrier(2); + clusterService.getClusterApplierService().runOnApplierThread("block for test", Priority.NORMAL, cs -> { + safeAwait(barrier); + safeAwait(barrier); + }, ActionListener.noop()); + + safeAwait(barrier); + + final var request = ESRestTestCase.newXContentRequest( + HttpMethod.PUT, + "testindex", + (builder, params) -> builder.startObject("settings") + .field(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .field(SETTING_NUMBER_OF_REPLICAS, internalCluster().numDataNodes() - 1) + .endObject() + ); + request.addParameter("timeout", "-1"); + final var responseFuture = new PlainActionFuture(); + getRestClient().performRequestAsync(request, ActionTestUtils.wrapAsRestResponseListener(responseFuture)); + + if (randomBoolean()) { + safeSleep(scaledRandomIntBetween(1, 100)); + } + + assertFalse(responseFuture.isDone()); + safeAwait(barrier); + + final var response = FutureUtils.get(responseFuture, 10, TimeUnit.SECONDS); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertTrue((boolean) extractValue("acknowledged", entityAsMap(response))); + } + } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index a9f891e555f21..7f9720b64cca6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -711,6 +711,14 @@ public void onCommit(TimeValue commitTime) { assert false : "ackTimeout must always be present: " + contextPreservingAckListener; ackTimeout = TimeValue.ZERO; } + + if (ackTimeout.millis() < 0) { + if (countDown.countDown()) { + finish(); + } + return; + } + final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos())); if (timeLeft.nanos() == 0L) { onTimeout(); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 57fb819ccd50e..50030143ec354 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -1663,6 +1663,57 @@ public void onAckTimeout() { deterministicTaskQueue.runAllTasksInTimeOrder(); safeAwait(latch); } + + // check that -1 means an infinite ack timeout + { + final CountDownLatch latch = new CountDownLatch(2); + + publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> { + publishListener.onResponse(null); + ackListener.onCommit(TimeValue.timeValueMillis(randomLongBetween(0, TimeValue.timeValueDays(1).millis()))); + for (final var node : new DiscoveryNode[] { node1, node2, node3 }) { + deterministicTaskQueue.scheduleAt( + deterministicTaskQueue.getCurrentTimeMillis() + randomLongBetween(0, TimeValue.timeValueDays(1).millis()), + () -> ackListener.onNodeAck(node, null) + ); + } + }); + + masterService.submitUnbatchedStateUpdateTask( + "test2", + new AckedClusterStateUpdateTask(ackedRequest(TimeValue.MINUS_ONE, null), null) { + @Override + public ClusterState execute(ClusterState currentState) { + return ClusterState.builder(currentState).build(); + } + + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + assertTrue(acknowledged); + latch.countDown(); + return AcknowledgedResponse.TRUE; + } + + @Override + public void onFailure(Exception e) { + fail(); + } + + @Override + public void onAckTimeout() { + fail(); + } + } + ); + + deterministicTaskQueue.runAllTasks(); // NB not in time order, there's no timeout to avoid + safeAwait(latch); + } } } From 337763618ef59bf97289157267c8eabe347dc105 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 22 Apr 2024 10:05:48 +0100 Subject: [PATCH 2/5] Update docs/changelog/107675.yaml --- docs/changelog/107675.yaml | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 docs/changelog/107675.yaml diff --git a/docs/changelog/107675.yaml b/docs/changelog/107675.yaml new file mode 100644 index 0000000000000..ae3bd2f106e82 --- /dev/null +++ b/docs/changelog/107675.yaml @@ -0,0 +1,12 @@ +pr: 107675 +summary: Interpret `?timeout=-1` as infinite ack timeout +area: Cluster Coordination +type: "breaking, enhancement" +issues: [] +breaking: + title: Interpret `?timeout=-1` as infinite ack timeout + area: Cluster Coordination + details: Please describe the details of this change for the release notes. You can + use asciidoc. + impact: Please describe the impact of this change to users + notable: false From 7f4aab3d52b492476404185b7c8c4d49122cf0fc Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 22 Apr 2024 10:20:25 +0100 Subject: [PATCH 3/5] Update changelog --- docs/changelog/107675.yaml | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/changelog/107675.yaml b/docs/changelog/107675.yaml index ae3bd2f106e82..8981403de4671 100644 --- a/docs/changelog/107675.yaml +++ b/docs/changelog/107675.yaml @@ -1,12 +1,17 @@ pr: 107675 summary: Interpret `?timeout=-1` as infinite ack timeout area: Cluster Coordination -type: "breaking, enhancement" +type: enhancement issues: [] breaking: title: Interpret `?timeout=-1` as infinite ack timeout - area: Cluster Coordination - details: Please describe the details of this change for the release notes. You can - use asciidoc. - impact: Please describe the impact of this change to users + area: REST API + details: | + Today {es} accepts the parameter `?timeout=-1` in many APIs, but interprets + this to mean the same as `?timeout=0`. From 8.15 onwards `?timeout=-1` will + mean to wait indefinitely, aligning the behaviour of this parameter with + other similar parameters such as `?master_timeout`. + impact: | + Use `?timeout=0` to force relevant operations to time out immediately + instead of `?timeout=-1` notable: false From fc0854e6c969457616fcb0d1b054bd3f4b2a9456 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 30 Apr 2024 13:54:26 +0100 Subject: [PATCH 4/5] Update docs/changelog/107675.yaml --- docs/changelog/107675.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/107675.yaml b/docs/changelog/107675.yaml index 8981403de4671..3b5446e1104b4 100644 --- a/docs/changelog/107675.yaml +++ b/docs/changelog/107675.yaml @@ -1,7 +1,7 @@ pr: 107675 summary: Interpret `?timeout=-1` as infinite ack timeout area: Cluster Coordination -type: enhancement +type: "breaking, enhancement" issues: [] breaking: title: Interpret `?timeout=-1` as infinite ack timeout From 3a557e275776fd5de5b9b9e3925a760b5c96a4a6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 30 Apr 2024 14:02:32 +0100 Subject: [PATCH 5/5] Fix changelog --- docs/changelog/107675.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/107675.yaml b/docs/changelog/107675.yaml index 3b5446e1104b4..b1d51cd3f8538 100644 --- a/docs/changelog/107675.yaml +++ b/docs/changelog/107675.yaml @@ -1,7 +1,7 @@ pr: 107675 summary: Interpret `?timeout=-1` as infinite ack timeout area: Cluster Coordination -type: "breaking, enhancement" +type: breaking issues: [] breaking: title: Interpret `?timeout=-1` as infinite ack timeout