From 669bd240cd316da152c8c5337a2dd00d48a0e6b9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 13 Sep 2018 00:10:06 -0400 Subject: [PATCH 1/6] Introduce long polling for changes Rather than scheduling pings to the leader index when we are caught up to the leader, this commit introduces long polling for changes. We will fire off a request to the leader which if we are already caught up will enter a poll on the leader side to listen for global checkpoint changes. These polls will timeout after a default of one minute, but can also be specified when creating the following task. We use these time outs as a way to keep statistics up to date, to not exaggerate time since last fetches, and to avoid pipes being broken. --- .../xpack/ccr/action/ShardChangesAction.java | 83 +++++++++++++--- .../xpack/ccr/action/ShardFollowNodeTask.java | 30 +++--- .../xpack/ccr/action/ShardFollowTask.java | 48 +++++----- .../ccr/action/ShardFollowTasksExecutor.java | 1 + .../action/TransportFollowIndexAction.java | 2 +- .../xpack/ccr/action/AutoFollowTests.java | 2 +- .../ccr/action/ShardChangesActionTests.java | 27 +++++- .../ccr/action/ShardFollowNodeTaskTests.java | 94 ++++++++++++++----- .../ShardFollowTaskReplicationTests.java | 7 ++ .../core/ccr/action/FollowIndexAction.java | 30 +++--- 10 files changed, 225 insertions(+), 99 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index eef3671d51683..b059a86e4db19 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; @@ -19,6 +20,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; @@ -36,8 +38,10 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeoutException; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class ShardChangesAction extends Action { @@ -59,6 +63,7 @@ public static class Request extends SingleShardRequest { private int maxOperationCount; private ShardId shardId; private String expectedHistoryUUID; + private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT; private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; public Request(ShardId shardId, String expectedHistoryUUID) { @@ -102,6 +107,14 @@ public String getExpectedHistoryUUID() { return expectedHistoryUUID; } + public TimeValue getPollTimeout() { + return pollTimeout; + } + + public void setPollTimeout(final TimeValue pollTimeout) { + this.pollTimeout = Objects.requireNonNull(pollTimeout, "pollTimeout"); + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -126,6 +139,7 @@ public void readFrom(StreamInput in) throws IOException { maxOperationCount = in.readVInt(); shardId = ShardId.readShardId(in); expectedHistoryUUID = in.readString(); + pollTimeout = in.readTimeValue(); maxOperationSizeInBytes = in.readVLong(); } @@ -136,6 +150,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(maxOperationCount); shardId.writeTo(out); out.writeString(expectedHistoryUUID); + out.writeTimeValue(pollTimeout); out.writeVLong(maxOperationSizeInBytes); } @@ -149,12 +164,13 @@ public boolean equals(final Object o) { maxOperationCount == request.maxOperationCount && Objects.equals(shardId, request.shardId) && Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) && + Objects.equals(pollTimeout, request.pollTimeout) && maxOperationSizeInBytes == request.maxOperationSizeInBytes; } @Override public int hashCode() { - return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes); + return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, pollTimeout, maxOperationSizeInBytes); } @Override @@ -164,6 +180,7 @@ public String toString() { ", maxOperationCount=" + maxOperationCount + ", shardId=" + shardId + ", expectedHistoryUUID=" + expectedHistoryUUID + + ", pollTimeout=" + pollTimeout + ", maxOperationSizeInBytes=" + maxOperationSizeInBytes + '}'; } @@ -265,19 +282,58 @@ public TransportAction(Settings settings, @Override protected Response shardOperation(Request request, ShardId shardId) throws IOException { - IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); - IndexShard indexShard = indexService.getShard(request.getShard().id()); - final SeqNoStats seqNoStats = indexShard.seqNoStats(); + final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); + final IndexShard indexShard = indexService.getShard(request.getShard().id()); + final SeqNoStats seqNoStats = indexShard.seqNoStats(); final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); final Translog.Operation[] operations = getOperations( indexShard, seqNoStats.getGlobalCheckpoint(), - request.fromSeqNo, - request.maxOperationCount, - request.expectedHistoryUUID, - request.maxOperationSizeInBytes); - return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); + request.getFromSeqNo(), + request.getMaxOperationCount(), + request.getExpectedHistoryUUID(), + request.getMaxOperationSizeInBytes()); + return getResponse(mappingVersion, seqNoStats, operations); + } + + @Override + protected void asyncShardOperation( + final Request request, + final ShardId shardId, + final ActionListener listener) throws IOException { + final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); + final IndexShard indexShard = indexService.getShard(request.getShard().id()); + final SeqNoStats seqNoStats = indexShard.seqNoStats(); + + + if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) { + assert request.getFromSeqNo() == 1 + seqNoStats.getGlobalCheckpoint(); + indexShard.addGlobalCheckpointListener( + request.getFromSeqNo(), + (g, e) -> { + if (g == UNASSIGNED_SEQ_NO) { + assert e != null; + if (e instanceof TimeoutException) { + final long mappingVersion = + clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); + final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); + listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY)); + } else { + listener.onFailure(e); + } + } else { + try { + super.asyncShardOperation(request, shardId, listener); + } catch (final IOException e1) { + listener.onFailure(e1); + } + } + }, + request.getPollTimeout()); + } else { + listener.onResponse(shardOperation(request, shardId)); + } } @Override @@ -300,7 +356,7 @@ protected Response newResponse() { } - private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; + static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; /** * Returns at most maxOperationCount operations from the specified from sequence number. @@ -324,7 +380,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard, historyUUID + "]"); } if (fromSeqNo > globalCheckpoint) { - return EMPTY_OPERATIONS_ARRAY; + throw new IllegalStateException( + "not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]"); } int seenBytes = 0; // - 1 is needed, because toSeqNo is inclusive @@ -344,4 +401,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard, return operations.toArray(EMPTY_OPERATIONS_ARRAY); } + static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, final Translog.Operation[] operations) { + return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index f88f21e407295..e1bad2134b18b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -48,8 +48,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private final String leaderIndex; private final ShardFollowTask params; + private final TimeValue pollTimeout; private final TimeValue maxRetryDelay; - private final TimeValue idleShardChangesRequestDelay; private final BiConsumer scheduler; private final LongSupplier relativeTimeProvider; @@ -80,8 +80,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.params = params; this.scheduler = scheduler; this.relativeTimeProvider = relativeTimeProvider; + this.pollTimeout = params.getPollTimeout(); this.maxRetryDelay = params.getMaxRetryDelay(); - this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay(); /* * We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of * concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry @@ -227,12 +227,16 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR } innerSendShardChangesRequest(from, maxOperationCount, response -> { - synchronized (ShardFollowNodeTask.this) { - totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); - numberOfSuccessfulFetches++; - fetchExceptions.remove(from); - operationsReceived += response.getOperations().length; - totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); + if (response.getOperations().length > 0) { + // do not count polls against fetch stats + synchronized (ShardFollowNodeTask.this) { + totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); + numberOfSuccessfulFetches++; + fetchExceptions.remove(from); + operationsReceived += response.getOperations().length; + totalTransferredBytes += + Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); + } } handleReadResponse(from, maxRequiredSeqNo, response); }, @@ -284,15 +288,7 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar } else { // read is completed, decrement numConcurrentReads--; - if (response.getOperations().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqNo) { - // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay - // future requests - LOGGER.trace("{} received no ops and no known ops to fetch, scheduling to coordinate reads", - params.getFollowShardId()); - scheduler.accept(idleShardChangesRequestDelay, this::coordinateReads); - } else { - coordinateReads(); - } + coordinateReads(); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index 62894b0ed99e6..2a01f72ca77aa 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -49,7 +49,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); - public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); + public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid"); @SuppressWarnings("unchecked") @@ -75,8 +75,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()), MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); PARSER.declareField(ConstructingObjectParser.constructorArg(), - (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), - IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); + (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()), + POLL_TIMEOUT, ObjectParser.ValueType.STRING); PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } @@ -90,23 +90,23 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private final int maxConcurrentWriteBatches; private final int maxWriteBufferSize; private final TimeValue maxRetryDelay; - private final TimeValue idleShardRetryDelay; + private final TimeValue pollTimeout; private final String recordedLeaderIndexHistoryUUID; private final Map headers; ShardFollowTask( - String leaderClusterAlias, - ShardId followShardId, - ShardId leaderShardId, - int maxBatchOperationCount, - int maxConcurrentReadBatches, - long maxBatchSizeInBytes, - int maxConcurrentWriteBatches, - int maxWriteBufferSize, - TimeValue maxRetryDelay, - TimeValue idleShardRetryDelay, - String recordedLeaderIndexHistoryUUID, - Map headers) { + final String leaderClusterAlias, + final ShardId followShardId, + final ShardId leaderShardId, + final int maxBatchOperationCount, + final int maxConcurrentReadBatches, + final long maxBatchSizeInBytes, + final int maxConcurrentWriteBatches, + final int maxWriteBufferSize, + final TimeValue maxRetryDelay, + final TimeValue pollTimeout, + final String recordedLeaderIndexHistoryUUID, + final Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; @@ -116,7 +116,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; this.maxWriteBufferSize = maxWriteBufferSize; this.maxRetryDelay = maxRetryDelay; - this.idleShardRetryDelay = idleShardRetryDelay; + this.pollTimeout = pollTimeout; this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID; this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } @@ -131,7 +131,7 @@ public ShardFollowTask(StreamInput in) throws IOException { this.maxConcurrentWriteBatches = in.readVInt(); this.maxWriteBufferSize = in.readVInt(); this.maxRetryDelay = in.readTimeValue(); - this.idleShardRetryDelay = in.readTimeValue(); + this.pollTimeout = in.readTimeValue(); this.recordedLeaderIndexHistoryUUID = in.readString(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } @@ -172,8 +172,8 @@ public TimeValue getMaxRetryDelay() { return maxRetryDelay; } - public TimeValue getIdleShardRetryDelay() { - return idleShardRetryDelay; + public TimeValue getPollTimeout() { + return pollTimeout; } public String getTaskId() { @@ -204,7 +204,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(maxConcurrentWriteBatches); out.writeVInt(maxWriteBufferSize); out.writeTimeValue(maxRetryDelay); - out.writeTimeValue(idleShardRetryDelay); + out.writeTimeValue(pollTimeout); out.writeString(recordedLeaderIndexHistoryUUID); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -231,7 +231,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); - builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); + builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID); builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); @@ -251,7 +251,7 @@ public boolean equals(Object o) { maxBatchSizeInBytes == that.maxBatchSizeInBytes && maxWriteBufferSize == that.maxWriteBufferSize && Objects.equals(maxRetryDelay, that.maxRetryDelay) && - Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) && + Objects.equals(pollTimeout, that.pollTimeout) && Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) && Objects.equals(headers, that.headers); } @@ -268,7 +268,7 @@ public int hashCode() { maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, - idleShardRetryDelay, + pollTimeout, recordedLeaderIndexHistoryUUID, headers ); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 7b63e73ee59fa..16b2e046beab9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -138,6 +138,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co request.setFromSeqNo(from); request.setMaxOperationCount(maxOperationCount); request.setMaxOperationSizeInBytes(params.getMaxBatchSizeInBytes()); + request.setPollTimeout(params.getPollTimeout()); leaderClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); } }; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java index 9e1d2cc44acbd..fc0b68db25bfd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java @@ -188,7 +188,7 @@ void start( request.getMaxConcurrentWriteBatches(), request.getMaxWriteBufferSize(), request.getMaxRetryDelay(), - request.getIdleShardRetryDelay(), + request.getPollTimeout(), recordedLeaderShardHistoryUUID, filteredHeaders); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java index 514c233188aaa..6cea48cada8db 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java @@ -168,7 +168,7 @@ public void testAutoFollowParameterAreDelegated() throws Exception { assertThat(shardFollowTask.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay())); } if (request.getIdleShardRetryDelay() != null) { - assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay())); + assertThat(shardFollowTask.getPollTimeout(), equalTo(request.getIdleShardRetryDelay())); } }); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index b973fbac3ce3e..88802be4e3851 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -24,12 +24,15 @@ import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.LongStream; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; public class ShardChangesActionTests extends ESSingleNodeTestCase { @@ -65,13 +68,27 @@ public void testGetOperations() throws Exception { assertThat(seenSeqNos, equalTo(expectedSeqNos)); } - // get operations for a range no operations exists: - Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), - numWrites, numWrites + 1, indexShard.getHistoryUUID(), Long.MAX_VALUE); - assertThat(operations.length, equalTo(0)); + { + // get operations for a range for which no operations exist + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> ShardChangesAction.getOperations( + indexShard, + indexShard.getGlobalCheckpoint(), + numWrites, + numWrites + 1, + indexShard.getHistoryUUID(), + Long.MAX_VALUE)); + final String message = String.format( + Locale.ROOT, + "not exposing operations from [%d] greater than the global checkpoint [%d]", + numWrites, + indexShard.getGlobalCheckpoint()); + assertThat(e, hasToString(containsString(message))); + } // get operations for a range some operations do not exist: - operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), + Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), numWrites - 10, numWrites + 10, indexShard.getHistoryUUID(), Long.MAX_VALUE); assertThat(operations.length, equalTo(10)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 101b258075973..e4b9c97179bb2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -57,6 +58,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Queue leaderGlobalCheckpoints; private Queue followerGlobalCheckpoints; private Queue maxSeqNos; + private Queue responseSizes; public void testCoordinateReads() { ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE); @@ -224,6 +226,69 @@ public void testReceiveRetryableError() { assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } + public void testReceiveTimeout() { + final ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + final int numberOfTimeouts = randomIntBetween(1, 32); + for (int i = 0; i < numberOfTimeouts; i++) { + mappingVersions.add(1L); + leaderGlobalCheckpoints.add(63L); + maxSeqNos.add(63L); + responseSizes.add(0); + } + + final AtomicInteger counter = new AtomicInteger(); + beforeSendShardChangesRequest = status -> { + if (counter.get() <= numberOfTimeouts) { + assertThat(status.numberOfSuccessfulFetches(), equalTo(0L)); + assertThat(status.totalFetchTimeMillis(), equalTo(0L)); + assertThat(status.operationsReceived(), equalTo(0L)); + assertThat(status.totalTransferredBytes(), equalTo(0L)); + + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); + assertThat(status.totalFetchTimeMillis(), equalTo(0L)); + assertThat(status.numberOfFailedFetches(), equalTo(0L)); + } else { + // otherwise we will keep looping as if we were repeatedly polling and timing out + simulateResponse.set(false); + } + counter.incrementAndGet(); + }; + + mappingVersions.add(1L); + mappingVersions.add(1L); + leaderGlobalCheckpoints.add(63L); + maxSeqNos.add(63L); + simulateResponse.set(true); + + task.coordinateReads(); + + // one request for each request that we simulate timedout, plus our request that receives a reply, and then a follow-up request + assertThat(shardChangesRequests, hasSize(1 + 1 + numberOfTimeouts)); + for (final long[] shardChangesRequest : shardChangesRequests.subList(0, shardChangesRequests.size() - 2)) { + assertNotNull(shardChangesRequest); + assertThat(shardChangesRequest.length, equalTo(2)); + assertThat(shardChangesRequest[0], equalTo(0L)); + assertThat(shardChangesRequest[1], equalTo(64L)); + } + final long[] lastShardChangesRequest = shardChangesRequests.get(shardChangesRequests.size() - 1); + assertNotNull(lastShardChangesRequest); + assertThat(lastShardChangesRequest.length, equalTo(2)); + assertThat(lastShardChangesRequest[0], equalTo(64L)); + assertThat(lastShardChangesRequest[1], equalTo(64L)); + + final ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.numberOfSuccessfulFetches(), equalTo(1L)); + assertThat(status.numberOfFailedFetches(), equalTo(0L)); + assertThat(status.numberOfConcurrentReads(), equalTo(1)); + assertThat(status.numberOfConcurrentWrites(), equalTo(1)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + assertThat(status.leaderMaxSeqNo(), equalTo(63L)); + + assertThat(counter.get(), equalTo(1 + 1 + numberOfTimeouts)); + } + public void testReceiveNonRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); @@ -355,29 +420,6 @@ public void testReceiveNothingExpectedSomething() { assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } - public void testDelayCoordinatesRead() { - int[] counter = new int[]{0}; - scheduler = (delay, task) -> { - counter[0]++; - task.run(); - }; - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 63, -1); - - task.coordinateReads(); - assertThat(shardChangesRequests.size(), equalTo(1)); - assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); - assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - - shardChangesRequests.clear(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); - // Also invokes coordinateReads() - task.innerHandleReadResponse(0L, 63L, response); - task.innerHandleReadResponse(64L, 63L, - new ShardChangesAction.Response(0, 63L, 63L, new Translog.Operation[0])); - assertThat(counter[0], equalTo(1)); - } - public void testMappingUpdate() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); @@ -651,6 +693,7 @@ private ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, leaderGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>(); maxSeqNos = new LinkedList<>(); + responseSizes = new LinkedList<>(); return new ShardFollowNodeTask( 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) { @@ -697,8 +740,9 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con if (readFailure != null) { errorHandler.accept(readFailure); } else if (simulateResponse.get()) { - final Translog.Operation[] operations = new Translog.Operation[requestBatchSize]; - for (int i = 0; i < requestBatchSize; i++) { + final int responseSize = responseSizes.size() == 0 ? requestBatchSize : responseSizes.poll(); + final Translog.Operation[] operations = new Translog.Operation[responseSize]; + for (int i = 0; i < responseSize; i++) { operations[i] = new Translog.NoOp(from + i, 0, "test"); } final ShardChangesAction.Response response = new ShardChangesAction.Response( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 9b04390a3a740..0bb263d3c4406 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -160,6 +160,9 @@ public void testChangeHistoryUUID() throws Exception { recoverShardFromStore(leaderGroup.getPrimary()); String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); + // force the global checkpoint on the leader to advance + leaderGroup.appendDocs(64); + assertBusy(() -> { assertThat(shardFollowTask.isStopped(), is(true)); assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + @@ -259,6 +262,10 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co for (IndexShard indexShard : indexShards) { try { final SeqNoStats seqNoStats = indexShard.seqNoStats(); + if (from > seqNoStats.getGlobalCheckpoint()) { + handler.accept(ShardChangesAction.getResponse(1L, seqNoStats, ShardChangesAction.EMPTY_OPERATIONS_ARRAY)); + return; + } Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, maxOperationCount, params.getRecordedLeaderIndexHistoryUUID(), params.getMaxBatchSizeInBytes()); // hard code mapping version; this is ok, as mapping updates are not tested here diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java index 2c311356d4943..75bcd316de5f7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java @@ -34,7 +34,7 @@ public final class FollowIndexAction extends Action { public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1; public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE; public static final TimeValue DEFAULT_RETRY_TIMEOUT = new TimeValue(500); - public static final TimeValue DEFAULT_IDLE_SHARD_RETRY_DELAY = TimeValue.timeValueSeconds(10); + public static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1); private FollowIndexAction() { super(NAME); @@ -55,7 +55,7 @@ public static class Request extends ActionRequest implements ToXContentObject { private static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); private static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); private static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); - private static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); + private static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, (args, followerIndex) -> { if (args[1] != null) { @@ -80,8 +80,8 @@ public static class Request extends ActionRequest implements ToXContentObject { ObjectParser.ValueType.STRING); PARSER.declareField( ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), - IDLE_SHARD_RETRY_DELAY, + (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()), + POLL_TIMEOUT, ObjectParser.ValueType.STRING); } @@ -148,10 +148,10 @@ public TimeValue getMaxRetryDelay() { return maxRetryDelay; } - private TimeValue idleShardRetryDelay; + private TimeValue pollTimeout; - public TimeValue getIdleShardRetryDelay() { - return idleShardRetryDelay; + public TimeValue getPollTimeout() { + return pollTimeout; } public Request( @@ -163,7 +163,7 @@ public Request( final Integer maxConcurrentWriteBatches, final Integer maxWriteBufferSize, final TimeValue maxRetryDelay, - final TimeValue idleShardRetryDelay) { + final TimeValue pollTimeout) { if (leaderIndex == null) { throw new IllegalArgumentException(LEADER_INDEX_FIELD.getPreferredName() + " is missing"); @@ -203,7 +203,7 @@ public Request( } final TimeValue actualRetryTimeout = maxRetryDelay == null ? DEFAULT_RETRY_TIMEOUT : maxRetryDelay; - final TimeValue actualIdleShardRetryDelay = idleShardRetryDelay == null ? DEFAULT_IDLE_SHARD_RETRY_DELAY : idleShardRetryDelay; + final TimeValue actualPollTimeout = pollTimeout == null ? DEFAULT_POLL_TIMEOUT : pollTimeout; this.leaderIndex = leaderIndex; this.followerIndex = followerIndex; @@ -213,7 +213,7 @@ public Request( this.maxConcurrentWriteBatches = actualMaxConcurrentWriteBatches; this.maxWriteBufferSize = actualMaxWriteBufferSize; this.maxRetryDelay = actualRetryTimeout; - this.idleShardRetryDelay = actualIdleShardRetryDelay; + this.pollTimeout = actualPollTimeout; } public Request() { @@ -236,7 +236,7 @@ public void readFrom(final StreamInput in) throws IOException { maxConcurrentWriteBatches = in.readVInt(); maxWriteBufferSize = in.readVInt(); maxRetryDelay = in.readOptionalTimeValue(); - idleShardRetryDelay = in.readOptionalTimeValue(); + pollTimeout = in.readOptionalTimeValue(); } @Override @@ -250,7 +250,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVInt(maxConcurrentWriteBatches); out.writeVInt(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); - out.writeOptionalTimeValue(idleShardRetryDelay); + out.writeOptionalTimeValue(pollTimeout); } @Override @@ -265,7 +265,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); - builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); + builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); } builder.endObject(); return builder; @@ -282,7 +282,7 @@ public boolean equals(final Object o) { maxConcurrentWriteBatches == request.maxConcurrentWriteBatches && maxWriteBufferSize == request.maxWriteBufferSize && Objects.equals(maxRetryDelay, request.maxRetryDelay) && - Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) && + Objects.equals(pollTimeout, request.pollTimeout) && Objects.equals(leaderIndex, request.leaderIndex) && Objects.equals(followerIndex, request.followerIndex); } @@ -298,7 +298,7 @@ public int hashCode() { maxConcurrentWriteBatches, maxWriteBufferSize, maxRetryDelay, - idleShardRetryDelay + pollTimeout ); } } From d4f8a9818525579bdac3a05083a81cf2b5ebc996 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 13 Sep 2018 14:50:30 -0400 Subject: [PATCH 2/6] Fork it over --- .../org/elasticsearch/xpack/ccr/action/ShardChangesAction.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index b059a86e4db19..6bc1f41b5ce49 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -306,7 +306,6 @@ protected void asyncShardOperation( final IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); - if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) { assert request.getFromSeqNo() == 1 + seqNoStats.getGlobalCheckpoint(); indexShard.addGlobalCheckpointListener( @@ -332,7 +331,7 @@ protected void asyncShardOperation( }, request.getPollTimeout()); } else { - listener.onResponse(shardOperation(request, shardId)); + super.asyncShardOperation(request, shardId, listener); } } From 2890693f01e6ec37f88d6015c9b05bad5c0200a7 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 13 Sep 2018 15:02:18 -0400 Subject: [PATCH 3/6] Ensure we do not lose exceptions --- .../xpack/ccr/action/ShardChangesAction.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 6bc1f41b5ce49..e24086996b89f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -314,10 +314,14 @@ protected void asyncShardOperation( if (g == UNASSIGNED_SEQ_NO) { assert e != null; if (e instanceof TimeoutException) { - final long mappingVersion = - clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); - final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); - listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY)); + try { + final long mappingVersion = + clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); + final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); + listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY)); + } catch (final Exception caught) { + listener.onFailure(caught); + } } else { listener.onFailure(e); } From 15d1af9685a5ac64600939feb6595888c3551dbd Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 14 Sep 2018 09:43:02 -0400 Subject: [PATCH 4/6] Iteration --- .../xpack/ccr/action/ShardChangesAction.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index e24086996b89f..92112c691deb4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; @@ -307,12 +308,16 @@ protected void asyncShardOperation( final SeqNoStats seqNoStats = indexShard.seqNoStats(); if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) { - assert request.getFromSeqNo() == 1 + seqNoStats.getGlobalCheckpoint(); + logger.trace("waiting for global checkpoint advancement to [{}]", request.getFromSeqNo()); indexShard.addGlobalCheckpointListener( request.getFromSeqNo(), (g, e) -> { if (g == UNASSIGNED_SEQ_NO) { assert e != null; + logger.trace( + () -> new ParameterizedMessage( + "exception waiting for global checkpoint advancement to [{}]", request.getFromSeqNo()), + e); if (e instanceof TimeoutException) { try { final long mappingVersion = @@ -320,16 +325,19 @@ protected void asyncShardOperation( final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY)); } catch (final Exception caught) { + caught.addSuppressed(e); listener.onFailure(caught); } } else { listener.onFailure(e); } } else { + assert request.getFromSeqNo() <= g; + logger.trace("global checkpoint advanced to [{}] after waiting for [{}]", g, request.getFromSeqNo()); try { super.asyncShardOperation(request, shardId, listener); - } catch (final IOException e1) { - listener.onFailure(e1); + } catch (final IOException caught) { + listener.onFailure(caught); } } }, From 9e32f937914af1145726fe9991109d1643158488 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 14 Sep 2018 09:58:34 -0400 Subject: [PATCH 5/6] Add some trace logging --- .../xpack/ccr/action/ShardChangesAction.java | 75 ++++++++++++------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 92112c691deb4..77ff4d836954d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -308,37 +308,20 @@ protected void asyncShardOperation( final SeqNoStats seqNoStats = indexShard.seqNoStats(); if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) { - logger.trace("waiting for global checkpoint advancement to [{}]", request.getFromSeqNo()); + logger.trace( + "waiting for global checkpoint advancement from [{}] to [{}]", + seqNoStats.getGlobalCheckpoint(), + request.getFromSeqNo()); indexShard.addGlobalCheckpointListener( request.getFromSeqNo(), (g, e) -> { - if (g == UNASSIGNED_SEQ_NO) { - assert e != null; - logger.trace( - () -> new ParameterizedMessage( - "exception waiting for global checkpoint advancement to [{}]", request.getFromSeqNo()), - e); - if (e instanceof TimeoutException) { - try { - final long mappingVersion = - clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); - final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); - listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY)); - } catch (final Exception caught) { - caught.addSuppressed(e); - listener.onFailure(caught); - } - } else { - listener.onFailure(e); - } + if (g != UNASSIGNED_SEQ_NO) { + assert request.getFromSeqNo() <= g + : "only advanced to [" + g + "] while waiting for [" + request.getFromSeqNo() + "]"; + globalCheckpointAdvanced(shardId, g, request, listener); } else { - assert request.getFromSeqNo() <= g; - logger.trace("global checkpoint advanced to [{}] after waiting for [{}]", g, request.getFromSeqNo()); - try { - super.asyncShardOperation(request, shardId, listener); - } catch (final IOException caught) { - listener.onFailure(caught); - } + assert e != null; + globalCheckpointAdvancementFailure(shardId, e, request, listener, indexShard); } }, request.getPollTimeout()); @@ -347,6 +330,44 @@ protected void asyncShardOperation( } } + private void globalCheckpointAdvanced( + final ShardId shardId, + final long globalCheckpoint, + final Request request, + final ActionListener listener) { + logger.trace("global checkpoint advanced to [{}] after waiting for [{}]", globalCheckpoint, request.getFromSeqNo()); + try { + super.asyncShardOperation(request, shardId, listener); + } catch (final IOException caught) { + listener.onFailure(caught); + } + } + + private void globalCheckpointAdvancementFailure( + final ShardId shardId, + final Exception e, + final Request request, + final ActionListener listener, + final IndexShard indexShard) { + logger.trace( + () -> new ParameterizedMessage( + "exception waiting for global checkpoint advancement to [{}]", request.getFromSeqNo()), + e); + if (e instanceof TimeoutException) { + try { + final long mappingVersion = + clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); + final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); + listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY)); + } catch (final Exception caught) { + caught.addSuppressed(e); + listener.onFailure(caught); + } + } else { + listener.onFailure(e); + } + } + @Override protected boolean resolveIndex(Request request) { return false; From 7d1f975dbec2ca904790426b8ab25ac99227b8ed Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 14 Sep 2018 10:00:31 -0400 Subject: [PATCH 6/6] Add shard ID --- .../xpack/ccr/action/ShardChangesAction.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 77ff4d836954d..bf2bbd5af8a5c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -309,7 +309,8 @@ protected void asyncShardOperation( if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) { logger.trace( - "waiting for global checkpoint advancement from [{}] to [{}]", + "{} waiting for global checkpoint advancement from [{}] to [{}]", + shardId, seqNoStats.getGlobalCheckpoint(), request.getFromSeqNo()); indexShard.addGlobalCheckpointListener( @@ -317,7 +318,7 @@ protected void asyncShardOperation( (g, e) -> { if (g != UNASSIGNED_SEQ_NO) { assert request.getFromSeqNo() <= g - : "only advanced to [" + g + "] while waiting for [" + request.getFromSeqNo() + "]"; + : shardId + " only advanced to [" + g + "] while waiting for [" + request.getFromSeqNo() + "]"; globalCheckpointAdvanced(shardId, g, request, listener); } else { assert e != null; @@ -335,7 +336,7 @@ private void globalCheckpointAdvanced( final long globalCheckpoint, final Request request, final ActionListener listener) { - logger.trace("global checkpoint advanced to [{}] after waiting for [{}]", globalCheckpoint, request.getFromSeqNo()); + logger.trace("{} global checkpoint advanced to [{}] after waiting for [{}]", shardId, globalCheckpoint, request.getFromSeqNo()); try { super.asyncShardOperation(request, shardId, listener); } catch (final IOException caught) { @@ -351,7 +352,7 @@ private void globalCheckpointAdvancementFailure( final IndexShard indexShard) { logger.trace( () -> new ParameterizedMessage( - "exception waiting for global checkpoint advancement to [{}]", request.getFromSeqNo()), + "{} exception waiting for global checkpoint advancement to [{}]", shardId, request.getFromSeqNo()), e); if (e instanceof TimeoutException) { try {