From 3468fc2211b4ba99799b65018473d00d62e35279 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 21 Aug 2018 23:13:23 -0400 Subject: [PATCH 1/6] Track fetch exceptions for shard follow tasks This commit adds tracking and reporting for fetch exceptions. We track fetch exceptions per fetch, keeping track of up to the maximum number of concurrent fetches. With each failing fetch, we associate the from sequence number with the exception that caused the fetch. We report these in the CCR stats endpoint, and add some testing for this tracking. --- .../test/AbstractSerializingTestCase.java | 7 +- .../xpack/ccr/action/ShardChangesAction.java | 11 ++ .../xpack/ccr/action/ShardFollowNodeTask.java | 78 ++++++++++++- .../ShardFollowNodeTaskRandomTests.java | 6 + .../ShardFollowNodeTaskStatusTests.java | 62 ++++++++++- .../ccr/action/ShardFollowNodeTaskTests.java | 105 +++++++++++++++++- .../rest-api-spec/test/ccr/stats.yml | 1 + 7 files changed, 261 insertions(+), 9 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializingTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializingTestCase.java index 6ec32f6654fff..3e25b91e06d3e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializingTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializingTestCase.java @@ -36,7 +36,7 @@ public abstract class AbstractSerializingTestCase PARSER = new ConstructingObjectParser<>(NAME, args -> new Status( (int) args[0], @@ -459,7 +481,16 @@ public static class Status implements Task.Status { (long) args[15], (long) args[16], (long) args[17], - (long) args[18])); + (long) args[18], + new TreeMap<>( + ((List>) args[19]) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); + + static final ConstructingObjectParser, Void> ENTRY_PARSER = + new ConstructingObjectParser<>( + "", + args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1])); static { PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); @@ -481,6 +512,20 @@ public static class Status implements Task.Status { PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); + PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), ENTRY_PARSER, FETCH_ERRORS); + } + + static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no"); + static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception"); + + static { + ENTRY_PARSER.declareLong( + ConstructingObjectParser.constructorArg(), + FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO); + ENTRY_PARSER.declareObject( + ConstructingObjectParser.constructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), + FETCH_EXCEPTIONS_ENTRY_EXCEPTION); } private final int shardId; @@ -597,6 +642,12 @@ public long numberOfOperationsIndexed() { return numberOfOperationsIndexed; } + private final NavigableMap fetchExceptions; + + public NavigableMap fetchExceptions() { + return fetchExceptions; + } + Status( final int shardId, final long leaderGlobalCheckpoint, @@ -616,7 +667,8 @@ public long numberOfOperationsIndexed() { final long totalIndexTimeMillis, final long numberOfSuccessfulBulkOperations, final long numberOfFailedBulkOperations, - final long numberOfOperationsIndexed) { + final long numberOfOperationsIndexed, + final NavigableMap fetchExceptions) { this.shardId = shardId; this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.leaderMaxSeqNo = leaderMaxSeqNo; @@ -636,6 +688,7 @@ public long numberOfOperationsIndexed() { this.numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations; this.numberOfFailedBulkOperations = numberOfFailedBulkOperations; this.numberOfOperationsIndexed = numberOfOperationsIndexed; + this.fetchExceptions = fetchExceptions; } public Status(final StreamInput in) throws IOException { @@ -658,6 +711,7 @@ public Status(final StreamInput in) throws IOException { this.numberOfSuccessfulBulkOperations = in.readVLong(); this.numberOfFailedBulkOperations = in.readVLong(); this.numberOfOperationsIndexed = in.readVLong(); + this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException)); } @Override @@ -686,6 +740,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVLong(numberOfSuccessfulBulkOperations); out.writeVLong(numberOfFailedBulkOperations); out.writeVLong(numberOfOperationsIndexed); + out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException); } @Override @@ -720,6 +775,23 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations); builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations); builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed); + builder.startArray(FETCH_ERRORS.getPreferredName()); + { + for (final Map.Entry entry : fetchExceptions.entrySet()) { + builder.startObject(); + { + builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey()); + builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName()); + builder.startObject(); + { + ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); + } + builder.endObject(); + } + builder.endObject(); + } + } + builder.endArray(); } builder.endObject(); return builder; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index f4cd7a680f4e9..b96d5b47ec263 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -32,6 +32,7 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class ShardFollowNodeTaskRandomTests extends ESTestCase { @@ -54,6 +55,11 @@ private void startAndAssertAndStopTask(ShardFollowNodeTask task, TestRun testRun ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.leaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint)); assertThat(status.followerGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint)); + final long numberOfFailedFetches = + testRun.responses.values().stream().flatMap(List::stream).filter(f -> f.exception != null).count(); + assertThat(status.numberOfFailedFetches(), equalTo(numberOfFailedFetches)); + // the failures were able to be retried so fetch failures should have cleared + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); assertThat(status.indexMetadataVersion(), equalTo(testRun.finalIndexMetaDataVerion)); }); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 6138ba96d5439..4eb4283091959 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -6,11 +6,20 @@ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase { @@ -21,6 +30,7 @@ protected ShardFollowNodeTask.Status doParseInstance(XContentParser parser) thro @Override protected ShardFollowNodeTask.Status createTestInstance() { + // if you change this constructor, reflect the changes in the hand-written assertions below return new ShardFollowNodeTask.Status( randomInt(), randomNonNegativeLong(), @@ -40,7 +50,57 @@ protected ShardFollowNodeTask.Status createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong()); + randomNonNegativeLong(), + randomReadExceptions()); + } + + @Override + protected void assertEqualInstances(final ShardFollowNodeTask.Status expectedInstance, final ShardFollowNodeTask.Status newInstance) { + assertNotSame(expectedInstance, newInstance); + assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId())); + assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint())); + assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo())); + assertThat(newInstance.followerGlobalCheckpoint(), equalTo(expectedInstance.followerGlobalCheckpoint())); + assertThat(newInstance.lastRequestedSeqNo(), equalTo(expectedInstance.lastRequestedSeqNo())); + assertThat(newInstance.numberOfConcurrentReads(), equalTo(expectedInstance.numberOfConcurrentReads())); + assertThat(newInstance.numberOfConcurrentWrites(), equalTo(expectedInstance.numberOfConcurrentWrites())); + assertThat(newInstance.numberOfQueuedWrites(), equalTo(expectedInstance.numberOfQueuedWrites())); + assertThat(newInstance.indexMetadataVersion(), equalTo(expectedInstance.indexMetadataVersion())); + assertThat(newInstance.totalFetchTimeMillis(), equalTo(expectedInstance.totalFetchTimeMillis())); + assertThat(newInstance.numberOfSuccessfulFetches(), equalTo(expectedInstance.numberOfSuccessfulFetches())); + assertThat(newInstance.numberOfFailedFetches(), equalTo(expectedInstance.numberOfFailedFetches())); + assertThat(newInstance.operationsReceived(), equalTo(expectedInstance.operationsReceived())); + assertThat(newInstance.totalTransferredBytes(), equalTo(expectedInstance.totalTransferredBytes())); + assertThat(newInstance.totalIndexTimeMillis(), equalTo(expectedInstance.totalIndexTimeMillis())); + assertThat(newInstance.numberOfSuccessfulBulkOperations(), equalTo(expectedInstance.numberOfSuccessfulBulkOperations())); + assertThat(newInstance.numberOfFailedBulkOperations(), equalTo(expectedInstance.numberOfFailedBulkOperations())); + assertThat(newInstance.numberOfOperationsIndexed(), equalTo(expectedInstance.numberOfOperationsIndexed())); + assertThat(newInstance.fetchExceptions().size(), equalTo(expectedInstance.fetchExceptions().size())); + assertThat(newInstance.fetchExceptions().keySet(), equalTo(expectedInstance.fetchExceptions().keySet())); + for (final Map.Entry entry : newInstance.fetchExceptions().entrySet()) { + // x-content loses the exception + final ElasticsearchException expected = expectedInstance.fetchExceptions().get(entry.getKey()); + assertThat(entry.getValue().getMessage(), containsString(expected.getMessage())); + assertNotNull(entry.getValue().getCause()); + assertThat( + entry.getValue().getCause(), + anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class))); + assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage())); + } + } + + @Override + protected boolean assertToXContentEquivalence() { + return false; + } + + private NavigableMap randomReadExceptions() { + final int count = randomIntBetween(0, 16); + final NavigableMap readExceptions = new TreeMap<>(); + for (int i = 0; i < count; i++) { + readExceptions.put(randomNonNegativeLong(), new ElasticsearchException(new IllegalStateException("index [" + i + "]"))); + } + return readExceptions; } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 9eda637dc9df2..54aef6bd3d116 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; @@ -20,8 +21,10 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; @@ -29,6 +32,8 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; @@ -39,11 +44,17 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private List> bulkShardOperationRequests; private BiConsumer scheduler = (delay, task) -> task.run(); + private Consumer beforeSendShardChangesRequest = status -> {}; + + private AtomicBoolean simulateResponse = new AtomicBoolean(); + private Queue readFailures; private Queue writeFailures; private Queue mappingUpdateFailures; private Queue imdVersions; + private Queue leaderGlobalCheckpoints; private Queue followerGlobalCheckpoints; + private Queue maxSeqNos; public void testCoordinateReads() { ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE); @@ -169,6 +180,27 @@ public void testReceiveRetryableError() { for (int i = 0; i < max; i++) { readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } + imdVersions.add(1L); + leaderGlobalCheckpoints.add(63L); + maxSeqNos.add(63L); + simulateResponse.set(true); + final AtomicLong retryCounter = new AtomicLong(); + // before each retry, we assert the fetch failures; after the last retry, the fetch failure should clear + beforeSendShardChangesRequest = status -> { + assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get())); + if (retryCounter.get() > 0) { + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); + assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); + assertThat(cause.getShardId().getId(), equalTo(0)); + } + retryCounter.incrementAndGet(); + }; task.coordinateReads(); // NUmber of requests is equal to initial request + retried attempts @@ -178,10 +210,14 @@ public void testReceiveRetryableError() { assertThat(shardChangesRequest[1], equalTo(64L)); } - assertThat(task.isStopped(), equalTo(false)); + assertFalse("task is not stopped", task.isStopped()); ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.numberOfFailedFetches(), equalTo((long)max)); + assertThat(status.numberOfSuccessfulFetches(), equalTo(1L)); + // the fetch failure has cleared + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } @@ -194,6 +230,23 @@ public void testReceiveRetryableErrorRetriedTooManyTimes() { for (int i = 0; i < max; i++) { readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } + final AtomicLong retryCounter = new AtomicLong(); + // before each retry, we assert the fetch failures; after the last retry, the fetch failure should persist + beforeSendShardChangesRequest = status -> { + assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get())); + if (retryCounter.get() > 0) { + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); + assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); + assertThat(cause.getShardId().getId(), equalTo(0)); + } + retryCounter.incrementAndGet(); + }; task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(11)); @@ -202,12 +255,22 @@ public void testReceiveRetryableErrorRetriedTooManyTimes() { assertThat(shardChangesRequest[1], equalTo(64L)); } - assertThat(task.isStopped(), equalTo(true)); + assertTrue("task is stopped", task.isStopped()); assertThat(fatalError, notNullValue()); assertThat(fatalError.getMessage(), containsString("retrying failed [")); ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.numberOfFailedFetches(), equalTo(11L)); + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); + assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); + assertThat(cause.getShardId().getId(), equalTo(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } @@ -216,19 +279,38 @@ public void testReceiveNonRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); - Exception failure = new RuntimeException(); + Exception failure = new RuntimeException("replication failed"); readFailures.add(failure); + final AtomicBoolean invoked = new AtomicBoolean(); + // since there will be only one failure, this should only be invoked once and there should not be a fetch failure + beforeSendShardChangesRequest = status -> { + if (invoked.compareAndSet(false, true)) { + assertThat(status.numberOfFailedFetches(), equalTo(0L)); + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); + } else { + fail("invoked twice"); + } + }; task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - assertThat(task.isStopped(), equalTo(true)); + assertTrue("task is stopped", task.isStopped()); assertThat(fatalError, sameInstance(failure)); ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.numberOfFailedFetches(), equalTo(1L)); + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(RuntimeException.class)); + final RuntimeException cause = (RuntimeException) entry.getValue().getCause(); + assertThat(cause.getMessage(), equalTo("replication failed")); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } @@ -642,7 +724,9 @@ ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxCon writeFailures = new LinkedList<>(); mappingUpdateFailures = new LinkedList<>(); imdVersions = new LinkedList<>(); + leaderGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>(); + maxSeqNos = new LinkedList<>(); return new ShardFollowNodeTask( 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) { @@ -683,10 +767,23 @@ protected void innerSendBulkShardOperationsRequest( @Override protected void innerSendShardChangesRequest(long from, int requestBatchSize, Consumer handler, Consumer errorHandler) { + beforeSendShardChangesRequest.accept(getStatus()); shardChangesRequests.add(new long[]{from, requestBatchSize}); Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll(); if (readFailure != null) { errorHandler.accept(readFailure); + } else if (simulateResponse.get()) { + final Translog.Operation[] operations = new Translog.Operation[requestBatchSize]; + for (int i = 0; i < requestBatchSize; i++) { + operations[i] = new Translog.NoOp(from + i, 0, "test"); + } + final ShardChangesAction.Response response = + new ShardChangesAction.Response( + imdVersions.poll(), + leaderGlobalCheckpoints.poll(), + maxSeqNos.poll(), + operations); + handler.accept(response); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml index 94af4c345fda1..63a8afd478f9b 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml @@ -43,6 +43,7 @@ - match: { bar.0.number_of_successful_bulk_operations: 0 } - match: { bar.0.number_of_failed_bulk_operations: 0 } - match: { bar.0.number_of_operations_indexed: 0 } + - length: { bar.0.fetch_errors: 0 } - do: ccr.unfollow_index: From 1736935dcf5b9c749a6d9e4fa9cb781e085b52b4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 21 Aug 2018 23:21:15 -0400 Subject: [PATCH 2/6] Name. That. Parser! --- .../elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 9644405f2255f..95b851b32ddfa 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -487,9 +487,11 @@ public static class Status implements Task.Status { .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); + public static final String ENTRY_NAME = "shard-follow-node-task-status-fetch-errors-entry"; + static final ConstructingObjectParser, Void> ENTRY_PARSER = new ConstructingObjectParser<>( - "", + ENTRY_NAME, args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1])); static { From b944503715cc85ef922709f41e6f0469c00c45de Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 22 Aug 2018 00:21:25 -0400 Subject: [PATCH 3/6] Fix naming --- .../xpack/ccr/action/ShardFollowNodeTask.java | 8 ++++---- .../src/test/resources/rest-api-spec/test/ccr/stats.yml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 95b851b32ddfa..61a4a8b8d1e9a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -458,7 +458,7 @@ public static class Status implements Task.Status { static final ParseField NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD = new ParseField("number_of_successful_bulk_operations"); static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField("number_of_failed_bulk_operations"); static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed"); - static final ParseField FETCH_ERRORS = new ParseField("fetch_errors"); + static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions"); @SuppressWarnings("unchecked") static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, @@ -487,7 +487,7 @@ public static class Status implements Task.Status { .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); - public static final String ENTRY_NAME = "shard-follow-node-task-status-fetch-errors-entry"; + public static final String ENTRY_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; static final ConstructingObjectParser, Void> ENTRY_PARSER = new ConstructingObjectParser<>( @@ -514,7 +514,7 @@ public static class Status implements Task.Status { PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); - PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), ENTRY_PARSER, FETCH_ERRORS); + PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), ENTRY_PARSER, FETCH_EXCEPTIONS); } static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no"); @@ -777,7 +777,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations); builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations); builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed); - builder.startArray(FETCH_ERRORS.getPreferredName()); + builder.startArray(FETCH_EXCEPTIONS.getPreferredName()); { for (final Map.Entry entry : fetchExceptions.entrySet()) { builder.startObject(); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml index 63a8afd478f9b..a38698a45be41 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml @@ -43,7 +43,7 @@ - match: { bar.0.number_of_successful_bulk_operations: 0 } - match: { bar.0.number_of_failed_bulk_operations: 0 } - match: { bar.0.number_of_operations_indexed: 0 } - - length: { bar.0.fetch_errors: 0 } + - length: { bar.0.fetch_exceptions: 0 } - do: ccr.unfollow_index: From 36ad0d990d541fbfe7ac2ac8662fd6905289e7ce Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 22 Aug 2018 10:04:44 -0400 Subject: [PATCH 4/6] Naming --- .../xpack/ccr/action/ShardFollowNodeTask.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 61a4a8b8d1e9a..b509d0a831024 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -487,11 +487,11 @@ public static class Status implements Task.Status { .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); - public static final String ENTRY_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; + public static final String FETCH_EXCEPTIONS_ENTRY_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; - static final ConstructingObjectParser, Void> ENTRY_PARSER = + static final ConstructingObjectParser, Void> FETCH_EXCEPTIONS_ENTRY_PARSER = new ConstructingObjectParser<>( - ENTRY_NAME, + FETCH_EXCEPTIONS_ENTRY_NAME, args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1])); static { @@ -514,17 +514,15 @@ public static class Status implements Task.Status { PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); - PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), ENTRY_PARSER, FETCH_EXCEPTIONS); + PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS); } static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no"); static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception"); static { - ENTRY_PARSER.declareLong( - ConstructingObjectParser.constructorArg(), - FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO); - ENTRY_PARSER.declareObject( + FETCH_EXCEPTIONS_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO); + FETCH_EXCEPTIONS_ENTRY_PARSER.declareObject( ConstructingObjectParser.constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), FETCH_EXCEPTIONS_ENTRY_EXCEPTION); From 91e6ac9167702e391bf4c9cba8b9340f5cfe7ac3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 22 Aug 2018 10:10:46 -0400 Subject: [PATCH 5/6] More naming --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 4 +- .../xpack/ccr/action/ShardFollowNodeTask.java | 52 +++++++++---------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index d76af9f3c5352..5a6860c8caa9a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -154,7 +154,7 @@ public List getNamedWriteables() { ShardFollowTask::new), // Task statuses - new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.NAME, + new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.STATUS_PARSER_NAME, ShardFollowNodeTask.Status::new) ); } @@ -166,7 +166,7 @@ public List getNamedXContent() { ShardFollowTask::fromXContent), // Task statuses - new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.NAME), + new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.STATUS_PARSER_NAME), ShardFollowNodeTask.Status::fromXContent) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index b509d0a831024..f2b5b7b3772d2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -437,7 +437,7 @@ public synchronized Status getStatus() { public static class Status implements Task.Status { - public static final String NAME = "shard-follow-node-task-status"; + public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status"; static final ParseField SHARD_ID = new ParseField("shard_id"); static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint"); @@ -461,7 +461,7 @@ public static class Status implements Task.Status { static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions"); @SuppressWarnings("unchecked") - static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + static final ConstructingObjectParser STATUS_PARSER = new ConstructingObjectParser<>(STATUS_PARSER_NAME, args -> new Status( (int) args[0], (long) args[1], @@ -487,34 +487,34 @@ public static class Status implements Task.Status { .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); - public static final String FETCH_EXCEPTIONS_ENTRY_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; + public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; static final ConstructingObjectParser, Void> FETCH_EXCEPTIONS_ENTRY_PARSER = new ConstructingObjectParser<>( - FETCH_EXCEPTIONS_ENTRY_NAME, + FETCH_EXCEPTIONS_ENTRY_PARSER_NAME, args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1])); static { - PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); - PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); + STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS); } static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no"); @@ -716,7 +716,7 @@ public Status(final StreamInput in) throws IOException { @Override public String getWriteableName() { - return NAME; + return STATUS_PARSER_NAME; } @Override @@ -798,7 +798,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa } public static Status fromXContent(final XContentParser parser) { - return PARSER.apply(parser, null); + return STATUS_PARSER.apply(parser, null); } @Override From 477c3bf5e7bb8fd823bfd566bb95418609058c00 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 23 Aug 2018 22:58:46 -0400 Subject: [PATCH 6/6] Fix line length --- .../ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 5a6860c8caa9a..b00883f5c2af2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -166,9 +166,10 @@ public List getNamedXContent() { ShardFollowTask::fromXContent), // Task statuses - new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.STATUS_PARSER_NAME), - ShardFollowNodeTask.Status::fromXContent) - ); + new NamedXContentRegistry.Entry( + ShardFollowNodeTask.Status.class, + new ParseField(ShardFollowNodeTask.Status.STATUS_PARSER_NAME), + ShardFollowNodeTask.Status::fromXContent)); } /**