diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index eef3671d51683..bf2bbd5af8a5c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -5,7 +5,9 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; @@ -19,6 +21,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; @@ -36,8 +39,10 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeoutException; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class ShardChangesAction extends Action { @@ -59,6 +64,7 @@ public static class Request extends SingleShardRequest { private int maxOperationCount; private ShardId shardId; private String expectedHistoryUUID; + private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT; private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; public Request(ShardId shardId, String expectedHistoryUUID) { @@ -102,6 +108,14 @@ public String getExpectedHistoryUUID() { return expectedHistoryUUID; } + public TimeValue getPollTimeout() { + return pollTimeout; + } + + public void setPollTimeout(final TimeValue pollTimeout) { + this.pollTimeout = Objects.requireNonNull(pollTimeout, "pollTimeout"); + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -126,6 +140,7 @@ public void readFrom(StreamInput in) throws IOException { maxOperationCount = in.readVInt(); shardId = ShardId.readShardId(in); expectedHistoryUUID = in.readString(); + pollTimeout = in.readTimeValue(); maxOperationSizeInBytes = in.readVLong(); } @@ -136,6 +151,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(maxOperationCount); shardId.writeTo(out); out.writeString(expectedHistoryUUID); + out.writeTimeValue(pollTimeout); out.writeVLong(maxOperationSizeInBytes); } @@ -149,12 +165,13 @@ public boolean equals(final Object o) { maxOperationCount == request.maxOperationCount && Objects.equals(shardId, request.shardId) && Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) && + Objects.equals(pollTimeout, request.pollTimeout) && maxOperationSizeInBytes == request.maxOperationSizeInBytes; } @Override public int hashCode() { - return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes); + return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, pollTimeout, maxOperationSizeInBytes); } @Override @@ -164,6 +181,7 @@ public String toString() { ", maxOperationCount=" + maxOperationCount + ", shardId=" + shardId + ", expectedHistoryUUID=" + expectedHistoryUUID + + ", pollTimeout=" + pollTimeout + ", maxOperationSizeInBytes=" + maxOperationSizeInBytes + '}'; } @@ -265,19 +283,90 @@ public TransportAction(Settings settings, @Override protected Response shardOperation(Request request, ShardId shardId) throws IOException { - IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); - IndexShard indexShard = indexService.getShard(request.getShard().id()); - final SeqNoStats seqNoStats = indexShard.seqNoStats(); + final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); + final IndexShard indexShard = indexService.getShard(request.getShard().id()); + final SeqNoStats seqNoStats = indexShard.seqNoStats(); final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); final Translog.Operation[] operations = getOperations( indexShard, seqNoStats.getGlobalCheckpoint(), - request.fromSeqNo, - request.maxOperationCount, - request.expectedHistoryUUID, - request.maxOperationSizeInBytes); - return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); + request.getFromSeqNo(), + request.getMaxOperationCount(), + request.getExpectedHistoryUUID(), + request.getMaxOperationSizeInBytes()); + return getResponse(mappingVersion, seqNoStats, operations); + } + + @Override + protected void asyncShardOperation( + final Request request, + final ShardId shardId, + final ActionListener listener) throws IOException { + final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); + final IndexShard indexShard = indexService.getShard(request.getShard().id()); + final SeqNoStats seqNoStats = indexShard.seqNoStats(); + + if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) { + logger.trace( + "{} waiting for global checkpoint advancement from [{}] to [{}]", + shardId, + seqNoStats.getGlobalCheckpoint(), + request.getFromSeqNo()); + indexShard.addGlobalCheckpointListener( + request.getFromSeqNo(), + (g, e) -> { + if (g != UNASSIGNED_SEQ_NO) { + assert request.getFromSeqNo() <= g + : shardId + " only advanced to [" + g + "] while waiting for [" + request.getFromSeqNo() + "]"; + globalCheckpointAdvanced(shardId, g, request, listener); + } else { + assert e != null; + globalCheckpointAdvancementFailure(shardId, e, request, listener, indexShard); + } + }, + request.getPollTimeout()); + } else { + super.asyncShardOperation(request, shardId, listener); + } + } + + private void globalCheckpointAdvanced( + final ShardId shardId, + final long globalCheckpoint, + final Request request, + final ActionListener listener) { + logger.trace("{} global checkpoint advanced to [{}] after waiting for [{}]", shardId, globalCheckpoint, request.getFromSeqNo()); + try { + super.asyncShardOperation(request, shardId, listener); + } catch (final IOException caught) { + listener.onFailure(caught); + } + } + + private void globalCheckpointAdvancementFailure( + final ShardId shardId, + final Exception e, + final Request request, + final ActionListener listener, + final IndexShard indexShard) { + logger.trace( + () -> new ParameterizedMessage( + "{} exception waiting for global checkpoint advancement to [{}]", shardId, request.getFromSeqNo()), + e); + if (e instanceof TimeoutException) { + try { + final long mappingVersion = + clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); + final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); + listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY)); + } catch (final Exception caught) { + caught.addSuppressed(e); + listener.onFailure(caught); + } + } else { + listener.onFailure(e); + } } @Override @@ -300,7 +389,7 @@ protected Response newResponse() { } - private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; + static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; /** * Returns at most maxOperationCount operations from the specified from sequence number. @@ -324,7 +413,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard, historyUUID + "]"); } if (fromSeqNo > globalCheckpoint) { - return EMPTY_OPERATIONS_ARRAY; + throw new IllegalStateException( + "not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]"); } int seenBytes = 0; // - 1 is needed, because toSeqNo is inclusive @@ -344,4 +434,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard, return operations.toArray(EMPTY_OPERATIONS_ARRAY); } + static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, final Translog.Operation[] operations) { + return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 7faebfdc26c31..6bf880661fc82 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -50,8 +50,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private final String leaderIndex; private final ShardFollowTask params; + private final TimeValue pollTimeout; private final TimeValue maxRetryDelay; - private final TimeValue idleShardChangesRequestDelay; private final BiConsumer scheduler; private final LongSupplier relativeTimeProvider; @@ -82,8 +82,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.params = params; this.scheduler = scheduler; this.relativeTimeProvider = relativeTimeProvider; + this.pollTimeout = params.getPollTimeout(); this.maxRetryDelay = params.getMaxRetryDelay(); - this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay(); /* * We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of * concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry @@ -229,12 +229,16 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR } innerSendShardChangesRequest(from, maxOperationCount, response -> { - synchronized (ShardFollowNodeTask.this) { - totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); - numberOfSuccessfulFetches++; - fetchExceptions.remove(from); - operationsReceived += response.getOperations().length; - totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); + if (response.getOperations().length > 0) { + // do not count polls against fetch stats + synchronized (ShardFollowNodeTask.this) { + totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); + numberOfSuccessfulFetches++; + fetchExceptions.remove(from); + operationsReceived += response.getOperations().length; + totalTransferredBytes += + Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); + } } handleReadResponse(from, maxRequiredSeqNo, response); }, @@ -286,15 +290,7 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar } else { // read is completed, decrement numConcurrentReads--; - if (response.getOperations().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqNo) { - // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay - // future requests - LOGGER.trace("{} received no ops and no known ops to fetch, scheduling to coordinate reads", - params.getFollowShardId()); - scheduler.accept(idleShardChangesRequestDelay, this::coordinateReads); - } else { - coordinateReads(); - } + coordinateReads(); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index 62894b0ed99e6..2a01f72ca77aa 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -49,7 +49,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); - public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); + public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid"); @SuppressWarnings("unchecked") @@ -75,8 +75,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()), MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); PARSER.declareField(ConstructingObjectParser.constructorArg(), - (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), - IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); + (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()), + POLL_TIMEOUT, ObjectParser.ValueType.STRING); PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } @@ -90,23 +90,23 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private final int maxConcurrentWriteBatches; private final int maxWriteBufferSize; private final TimeValue maxRetryDelay; - private final TimeValue idleShardRetryDelay; + private final TimeValue pollTimeout; private final String recordedLeaderIndexHistoryUUID; private final Map headers; ShardFollowTask( - String leaderClusterAlias, - ShardId followShardId, - ShardId leaderShardId, - int maxBatchOperationCount, - int maxConcurrentReadBatches, - long maxBatchSizeInBytes, - int maxConcurrentWriteBatches, - int maxWriteBufferSize, - TimeValue maxRetryDelay, - TimeValue idleShardRetryDelay, - String recordedLeaderIndexHistoryUUID, - Map headers) { + final String leaderClusterAlias, + final ShardId followShardId, + final ShardId leaderShardId, + final int maxBatchOperationCount, + final int maxConcurrentReadBatches, + final long maxBatchSizeInBytes, + final int maxConcurrentWriteBatches, + final int maxWriteBufferSize, + final TimeValue maxRetryDelay, + final TimeValue pollTimeout, + final String recordedLeaderIndexHistoryUUID, + final Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; @@ -116,7 +116,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; this.maxWriteBufferSize = maxWriteBufferSize; this.maxRetryDelay = maxRetryDelay; - this.idleShardRetryDelay = idleShardRetryDelay; + this.pollTimeout = pollTimeout; this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID; this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } @@ -131,7 +131,7 @@ public ShardFollowTask(StreamInput in) throws IOException { this.maxConcurrentWriteBatches = in.readVInt(); this.maxWriteBufferSize = in.readVInt(); this.maxRetryDelay = in.readTimeValue(); - this.idleShardRetryDelay = in.readTimeValue(); + this.pollTimeout = in.readTimeValue(); this.recordedLeaderIndexHistoryUUID = in.readString(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } @@ -172,8 +172,8 @@ public TimeValue getMaxRetryDelay() { return maxRetryDelay; } - public TimeValue getIdleShardRetryDelay() { - return idleShardRetryDelay; + public TimeValue getPollTimeout() { + return pollTimeout; } public String getTaskId() { @@ -204,7 +204,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(maxConcurrentWriteBatches); out.writeVInt(maxWriteBufferSize); out.writeTimeValue(maxRetryDelay); - out.writeTimeValue(idleShardRetryDelay); + out.writeTimeValue(pollTimeout); out.writeString(recordedLeaderIndexHistoryUUID); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -231,7 +231,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); - builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); + builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID); builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); @@ -251,7 +251,7 @@ public boolean equals(Object o) { maxBatchSizeInBytes == that.maxBatchSizeInBytes && maxWriteBufferSize == that.maxWriteBufferSize && Objects.equals(maxRetryDelay, that.maxRetryDelay) && - Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) && + Objects.equals(pollTimeout, that.pollTimeout) && Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) && Objects.equals(headers, that.headers); } @@ -268,7 +268,7 @@ public int hashCode() { maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, - idleShardRetryDelay, + pollTimeout, recordedLeaderIndexHistoryUUID, headers ); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 86556480567b7..8e1c1a27a369c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -148,6 +148,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co request.setFromSeqNo(from); request.setMaxOperationCount(maxOperationCount); request.setMaxOperationSizeInBytes(params.getMaxBatchSizeInBytes()); + request.setPollTimeout(params.getPollTimeout()); leaderClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); } }; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java index 9e1d2cc44acbd..fc0b68db25bfd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java @@ -188,7 +188,7 @@ void start( request.getMaxConcurrentWriteBatches(), request.getMaxWriteBufferSize(), request.getMaxRetryDelay(), - request.getIdleShardRetryDelay(), + request.getPollTimeout(), recordedLeaderShardHistoryUUID, filteredHeaders); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java index 514c233188aaa..6cea48cada8db 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java @@ -168,7 +168,7 @@ public void testAutoFollowParameterAreDelegated() throws Exception { assertThat(shardFollowTask.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay())); } if (request.getIdleShardRetryDelay() != null) { - assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay())); + assertThat(shardFollowTask.getPollTimeout(), equalTo(request.getIdleShardRetryDelay())); } }); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index b973fbac3ce3e..88802be4e3851 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -24,12 +24,15 @@ import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.LongStream; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; public class ShardChangesActionTests extends ESSingleNodeTestCase { @@ -65,13 +68,27 @@ public void testGetOperations() throws Exception { assertThat(seenSeqNos, equalTo(expectedSeqNos)); } - // get operations for a range no operations exists: - Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), - numWrites, numWrites + 1, indexShard.getHistoryUUID(), Long.MAX_VALUE); - assertThat(operations.length, equalTo(0)); + { + // get operations for a range for which no operations exist + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> ShardChangesAction.getOperations( + indexShard, + indexShard.getGlobalCheckpoint(), + numWrites, + numWrites + 1, + indexShard.getHistoryUUID(), + Long.MAX_VALUE)); + final String message = String.format( + Locale.ROOT, + "not exposing operations from [%d] greater than the global checkpoint [%d]", + numWrites, + indexShard.getGlobalCheckpoint()); + assertThat(e, hasToString(containsString(message))); + } // get operations for a range some operations do not exist: - operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), + Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), numWrites - 10, numWrites + 10, indexShard.getHistoryUUID(), Long.MAX_VALUE); assertThat(operations.length, equalTo(10)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 71a97bf8207ab..ea4a1c12b45e1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -58,6 +59,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Queue leaderGlobalCheckpoints; private Queue followerGlobalCheckpoints; private Queue maxSeqNos; + private Queue responseSizes; public void testCoordinateReads() { ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE); @@ -226,6 +228,69 @@ public void testReceiveRetryableError() { assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } + public void testReceiveTimeout() { + final ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + final int numberOfTimeouts = randomIntBetween(1, 32); + for (int i = 0; i < numberOfTimeouts; i++) { + mappingVersions.add(1L); + leaderGlobalCheckpoints.add(63L); + maxSeqNos.add(63L); + responseSizes.add(0); + } + + final AtomicInteger counter = new AtomicInteger(); + beforeSendShardChangesRequest = status -> { + if (counter.get() <= numberOfTimeouts) { + assertThat(status.numberOfSuccessfulFetches(), equalTo(0L)); + assertThat(status.totalFetchTimeMillis(), equalTo(0L)); + assertThat(status.operationsReceived(), equalTo(0L)); + assertThat(status.totalTransferredBytes(), equalTo(0L)); + + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); + assertThat(status.totalFetchTimeMillis(), equalTo(0L)); + assertThat(status.numberOfFailedFetches(), equalTo(0L)); + } else { + // otherwise we will keep looping as if we were repeatedly polling and timing out + simulateResponse.set(false); + } + counter.incrementAndGet(); + }; + + mappingVersions.add(1L); + mappingVersions.add(1L); + leaderGlobalCheckpoints.add(63L); + maxSeqNos.add(63L); + simulateResponse.set(true); + + task.coordinateReads(); + + // one request for each request that we simulate timedout, plus our request that receives a reply, and then a follow-up request + assertThat(shardChangesRequests, hasSize(1 + 1 + numberOfTimeouts)); + for (final long[] shardChangesRequest : shardChangesRequests.subList(0, shardChangesRequests.size() - 2)) { + assertNotNull(shardChangesRequest); + assertThat(shardChangesRequest.length, equalTo(2)); + assertThat(shardChangesRequest[0], equalTo(0L)); + assertThat(shardChangesRequest[1], equalTo(64L)); + } + final long[] lastShardChangesRequest = shardChangesRequests.get(shardChangesRequests.size() - 1); + assertNotNull(lastShardChangesRequest); + assertThat(lastShardChangesRequest.length, equalTo(2)); + assertThat(lastShardChangesRequest[0], equalTo(64L)); + assertThat(lastShardChangesRequest[1], equalTo(64L)); + + final ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.numberOfSuccessfulFetches(), equalTo(1L)); + assertThat(status.numberOfFailedFetches(), equalTo(0L)); + assertThat(status.numberOfConcurrentReads(), equalTo(1)); + assertThat(status.numberOfConcurrentWrites(), equalTo(1)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + assertThat(status.leaderMaxSeqNo(), equalTo(63L)); + + assertThat(counter.get(), equalTo(1 + 1 + numberOfTimeouts)); + } + public void testReceiveNonRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); @@ -357,29 +422,6 @@ public void testReceiveNothingExpectedSomething() { assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } - public void testDelayCoordinatesRead() { - int[] counter = new int[]{0}; - scheduler = (delay, task) -> { - counter[0]++; - task.run(); - }; - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 63, -1); - - task.coordinateReads(); - assertThat(shardChangesRequests.size(), equalTo(1)); - assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); - assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - - shardChangesRequests.clear(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); - // Also invokes coordinateReads() - task.innerHandleReadResponse(0L, 63L, response); - task.innerHandleReadResponse(64L, 63L, - new ShardChangesAction.Response(0, 63L, 63L, new Translog.Operation[0])); - assertThat(counter[0], equalTo(1)); - } - public void testMappingUpdate() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); @@ -653,6 +695,7 @@ private ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, leaderGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>(); maxSeqNos = new LinkedList<>(); + responseSizes = new LinkedList<>(); return new ShardFollowNodeTask( 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) { @@ -699,8 +742,9 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con if (readFailure != null) { errorHandler.accept(readFailure); } else if (simulateResponse.get()) { - final Translog.Operation[] operations = new Translog.Operation[requestBatchSize]; - for (int i = 0; i < requestBatchSize; i++) { + final int responseSize = responseSizes.size() == 0 ? requestBatchSize : responseSizes.poll(); + final Translog.Operation[] operations = new Translog.Operation[responseSize]; + for (int i = 0; i < responseSize; i++) { operations[i] = new Translog.NoOp(from + i, 0, "test"); } final ShardChangesAction.Response response = new ShardChangesAction.Response( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 9b04390a3a740..0bb263d3c4406 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -160,6 +160,9 @@ public void testChangeHistoryUUID() throws Exception { recoverShardFromStore(leaderGroup.getPrimary()); String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); + // force the global checkpoint on the leader to advance + leaderGroup.appendDocs(64); + assertBusy(() -> { assertThat(shardFollowTask.isStopped(), is(true)); assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + @@ -259,6 +262,10 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co for (IndexShard indexShard : indexShards) { try { final SeqNoStats seqNoStats = indexShard.seqNoStats(); + if (from > seqNoStats.getGlobalCheckpoint()) { + handler.accept(ShardChangesAction.getResponse(1L, seqNoStats, ShardChangesAction.EMPTY_OPERATIONS_ARRAY)); + return; + } Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, maxOperationCount, params.getRecordedLeaderIndexHistoryUUID(), params.getMaxBatchSizeInBytes()); // hard code mapping version; this is ok, as mapping updates are not tested here diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java index d5a0b0408c565..65136b41a29e0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java @@ -36,8 +36,8 @@ public final class FollowIndexAction extends Action { public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1; public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE; static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500); - static final TimeValue DEFAULT_IDLE_SHARD_RETRY_DELAY = TimeValue.timeValueSeconds(10); static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5); + public static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1); private FollowIndexAction() { super(NAME); @@ -58,7 +58,7 @@ public static class Request extends ActionRequest implements ToXContentObject { private static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); private static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); private static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay"); - private static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); + private static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, (args, followerIndex) -> { if (args[1] != null) { @@ -83,8 +83,8 @@ public static class Request extends ActionRequest implements ToXContentObject { ObjectParser.ValueType.STRING); PARSER.declareField( ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), - IDLE_SHARD_RETRY_DELAY, + (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()), + POLL_TIMEOUT, ObjectParser.ValueType.STRING); } @@ -151,10 +151,10 @@ public TimeValue getMaxRetryDelay() { return maxRetryDelay; } - private TimeValue idleShardRetryDelay; + private TimeValue pollTimeout; - public TimeValue getIdleShardRetryDelay() { - return idleShardRetryDelay; + public TimeValue getPollTimeout() { + return pollTimeout; } public Request( @@ -166,7 +166,7 @@ public Request( final Integer maxConcurrentWriteBatches, final Integer maxWriteBufferSize, final TimeValue maxRetryDelay, - final TimeValue idleShardRetryDelay) { + final TimeValue pollTimeout) { if (leaderIndex == null) { throw new IllegalArgumentException(LEADER_INDEX_FIELD.getPreferredName() + " is missing"); @@ -206,7 +206,7 @@ public Request( } final TimeValue actualRetryTimeout = maxRetryDelay == null ? DEFAULT_MAX_RETRY_DELAY : maxRetryDelay; - final TimeValue actualIdleShardRetryDelay = idleShardRetryDelay == null ? DEFAULT_IDLE_SHARD_RETRY_DELAY : idleShardRetryDelay; + final TimeValue actualPollTimeout = pollTimeout == null ? DEFAULT_POLL_TIMEOUT : pollTimeout; this.leaderIndex = leaderIndex; this.followerIndex = followerIndex; @@ -216,7 +216,7 @@ public Request( this.maxConcurrentWriteBatches = actualMaxConcurrentWriteBatches; this.maxWriteBufferSize = actualMaxWriteBufferSize; this.maxRetryDelay = actualRetryTimeout; - this.idleShardRetryDelay = actualIdleShardRetryDelay; + this.pollTimeout = actualPollTimeout; } public Request() { @@ -252,7 +252,7 @@ public void readFrom(final StreamInput in) throws IOException { maxConcurrentWriteBatches = in.readVInt(); maxWriteBufferSize = in.readVInt(); maxRetryDelay = in.readOptionalTimeValue(); - idleShardRetryDelay = in.readOptionalTimeValue(); + pollTimeout = in.readOptionalTimeValue(); } @Override @@ -266,7 +266,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVInt(maxConcurrentWriteBatches); out.writeVInt(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); - out.writeOptionalTimeValue(idleShardRetryDelay); + out.writeOptionalTimeValue(pollTimeout); } @Override @@ -281,7 +281,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep()); - builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); + builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); } builder.endObject(); return builder; @@ -298,7 +298,7 @@ public boolean equals(final Object o) { maxConcurrentWriteBatches == request.maxConcurrentWriteBatches && maxWriteBufferSize == request.maxWriteBufferSize && Objects.equals(maxRetryDelay, request.maxRetryDelay) && - Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) && + Objects.equals(pollTimeout, request.pollTimeout) && Objects.equals(leaderIndex, request.leaderIndex) && Objects.equals(followerIndex, request.followerIndex); } @@ -314,7 +314,7 @@ public int hashCode() { maxConcurrentWriteBatches, maxWriteBufferSize, maxRetryDelay, - idleShardRetryDelay + pollTimeout ); } }