Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/changelog/107675.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
pr: 107675
summary: Interpret `?timeout=-1` as infinite ack timeout
area: Cluster Coordination
type: breaking
issues: []
breaking:
title: Interpret `?timeout=-1` as infinite ack timeout
area: REST API
details: |
Today {es} accepts the parameter `?timeout=-1` in many APIs, but interprets
this to mean the same as `?timeout=0`. From 8.15 onwards `?timeout=-1` will
mean to wait indefinitely, aligning the behaviour of this parameter with
other similar parameters such as `?master_timeout`.
impact: |
Use `?timeout=0` to force relevant operations to time out immediately
instead of `?timeout=-1`
notable: false
9 changes: 5 additions & 4 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1223,12 +1223,13 @@ the timeout expires, the request fails and returns an error. Defaults to `30s`.
Can also be set to `-1` to indicate that the request should never timeout.
end::master-timeout[]
Comment on lines 1223 to 1224
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to also explain how master_timeout is computed. By reading the code, it seems to start when the task is added to the queue and expires if the task does not get processed by the master. And I believe during this waiting, the task is visible via the PendingTasks API?


tag::timeout[]
`timeout`::
(Optional, <<time-units, time units>>)
Period to wait for a response. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
end::timeout[]
Period to wait for a response from all relevant nodes in the cluster after
updating the cluster metadata. If no response is received before the timeout
expires, the cluster metadata update still applies but the response will
indicate that it was not completely acknowledged. Defaults to `30s`.
Can also be set to `-1` to indicate that the request should never timeout.
end::timeoutparms[]

tag::transform-id[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,9 @@ public void removeGlobalRetention(
List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams,
final ActionListener<UpdateDataStreamGlobalRetentionResponse> listener
) {
final var ackTimeout = request.masterNodeTimeout().millis() < 0 ? TimeValue.MAX_VALUE : request.masterNodeTimeout();
// NB a negative master node timeout means never to time out, but a negative ack timeout means to time out immediately.
// TODO when https://github.com/elastic/elasticsearch/issues/107044 is fixed, we can just use request.masterNodeTimeout() directly
taskQueue.submitTask(
"remove-data-stream-global-retention",
new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, ackTimeout),
new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, request.masterNodeTimeout()),
request.masterNodeTimeout()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,28 @@

package org.elasticsearch.action.admin.indices.create;

import io.netty.handler.codec.http.HttpMethod;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
Expand All @@ -31,17 +39,24 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.XContentFactory;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
import static org.elasticsearch.test.rest.ESRestTestCase.entityAsMap;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -53,6 +68,11 @@
@ClusterScope(scope = Scope.TEST)
public class CreateIndexIT extends ESIntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // expose HTTP requests
}

public void testCreationDateGivenFails() {
try {
prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_CREATION_DATE, 4L)).get();
Expand Down Expand Up @@ -370,4 +390,38 @@ public void testIndexNameInResponse() {
assertEquals("Should have index name in response", "foo", response.index());
}

public void testInfiniteAckTimeout() throws IOException {
final var clusterService = internalCluster().getInstance(ClusterService.class);
final var barrier = new CyclicBarrier(2);
clusterService.getClusterApplierService().runOnApplierThread("block for test", Priority.NORMAL, cs -> {
safeAwait(barrier);
safeAwait(barrier);
}, ActionListener.noop());

safeAwait(barrier);

final var request = ESRestTestCase.newXContentRequest(
HttpMethod.PUT,
"testindex",
(builder, params) -> builder.startObject("settings")
.field(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.field(SETTING_NUMBER_OF_REPLICAS, internalCluster().numDataNodes() - 1)
.endObject()
);
request.addParameter("timeout", "-1");
final var responseFuture = new PlainActionFuture<Response>();
getRestClient().performRequestAsync(request, ActionTestUtils.wrapAsRestResponseListener(responseFuture));

if (randomBoolean()) {
safeSleep(scaledRandomIntBetween(1, 100));
}

assertFalse(responseFuture.isDone());
safeAwait(barrier);

final var response = FutureUtils.get(responseFuture, 10, TimeUnit.SECONDS);
assertEquals(200, response.getStatusLine().getStatusCode());
assertTrue((boolean) extractValue("acknowledged", entityAsMap(response)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,14 @@ public void onCommit(TimeValue commitTime) {
assert false : "ackTimeout must always be present: " + contextPreservingAckListener;
ackTimeout = TimeValue.ZERO;
}

if (ackTimeout.millis() < 0) {
if (countDown.countDown()) {
finish();
}
return;
Comment on lines +715 to +719
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really related to this PR but for my learning. I am trying to understand the sequence of how cluster state happens. Is the following order correct?

  1. Master sends cluster state publish requests to all nodes
  2. After receiving publish response from a quorum of nodes, this onCommit is called.
  3. Master sends apply commit requests to all nodes
  4. On each apply commit response, master calls onNodeAck.

When step 2 completes and all nodes responded in step 4, the overall request is considered as acknowledged.

Reading the code here, it seems to me that when onCommit is called, there is a chance that step 4 has already completed (since it checks the countDown and call finish). But I am not sure how that can happen since onCommit is called before any apply commit request can be sent (code)? Or is it to take care of single node cluster? I must be missing something (or even many things). I'd appreciate if you could help clarify it. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequence is right.

You could be right about onCommit never actually finishing the acking today. FWIW this code was added in #31303 (6.4.0) and we've rewritten much of the surrounding code since then. That said, it's a fairly delicate argument to prove this, whereas the "obviously correct" code as written today is robust and not meaningfully less efficient.

In particular, it's hard to see that onCommit is called before any ApplyCommit is sent. The relevant code is org.elasticsearch.cluster.coordination.Publication.PublicationTarget#handlePublishResponse:

        void handlePublishResponse(PublishResponse publishResponse) {
            assert isWaitingForQuorum() : this;
            logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode);
            if (applyCommitRequest.isPresent()) {
                sendApplyCommit();
            } else {
                try {
                    Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> {
                        assert applyCommitRequest.isPresent() == false;
                        applyCommitRequest = Optional.of(applyCommit);
                        ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime));
                        publicationTargets.stream()
                            .filter(PublicationTarget::isWaitingForQuorum)
                            .forEach(PublicationTarget::sendApplyCommit);
                    });
                } catch (Exception e) {
                    setFailed(e);
                    onPossibleCommitFailure();
                }
            }
        }

As written, you could have the committing thread setting applyCommitRequest, then pausing before calling ackListener.onCommit, while another thread concurrently processes a later PublishResponse, discovers that applyCommitRequest.isPresent() and sends the ApplyCommit. But then if you look at how this code is called eventually you discover that this all happens underneath Coordinator#mutex so these things cannot happen concurrently. But relying on a mutex in Coordinator to protect against concurrency in Publication as part of the correctness argument for MasterService is too deeply opaque for my liking.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the explanation. TIL 🙇

}

final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos()));
if (timeLeft.nanos() == 0L) {
onTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,57 @@ public void onAckTimeout() {
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that -1 means an infinite ack timeout
{
final CountDownLatch latch = new CountDownLatch(2);

publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
publishListener.onResponse(null);
ackListener.onCommit(TimeValue.timeValueMillis(randomLongBetween(0, TimeValue.timeValueDays(1).millis())));
for (final var node : new DiscoveryNode[] { node1, node2, node3 }) {
deterministicTaskQueue.scheduleAt(
deterministicTaskQueue.getCurrentTimeMillis() + randomLongBetween(0, TimeValue.timeValueDays(1).millis()),
() -> ackListener.onNodeAck(node, null)
);
}
});

masterService.submitUnbatchedStateUpdateTask(
"test2",
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.MINUS_ONE, null), null) {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).build();
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
protected AcknowledgedResponse newResponse(boolean acknowledged) {
assertTrue(acknowledged);
latch.countDown();
return AcknowledgedResponse.TRUE;
}

@Override
public void onFailure(Exception e) {
fail();
}

@Override
public void onAckTimeout() {
fail();
}
}
);

deterministicTaskQueue.runAllTasks(); // NB not in time order, there's no timeout to avoid
safeAwait(latch);
}
}
}

Expand Down