From df6f9669dccd762416e644f5956f350e618f0d87 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 16 Oct 2018 20:16:10 +0200 Subject: [PATCH 1/3] [CCR] Add total fetch time took stat, that keeps track how much time was spent on fetches from the leader cluster perspective. --- .../xpack/ccr/action/ShardChangesAction.java | 34 +++++++++++++++---- .../xpack/ccr/action/ShardFollowNodeTask.java | 3 ++ .../ccr/action/ShardChangesResponseTests.java | 3 +- .../ShardFollowNodeTaskRandomTests.java | 11 +++--- .../ShardFollowNodeTaskStatusTests.java | 1 + .../ccr/action/ShardFollowNodeTaskTests.java | 8 +++-- .../ShardFollowTaskReplicationTests.java | 5 +-- .../xpack/ccr/action/StatsResponsesTests.java | 1 + .../ccr/FollowStatsMonitoringDocTests.java | 5 ++- .../core/ccr/ShardFollowNodeTaskStatus.java | 25 ++++++++++++-- .../src/main/resources/monitoring-es.json | 3 ++ 11 files changed, 78 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 4c82c90b2d544..e50cbc08df37f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -67,6 +68,8 @@ public static class Request extends SingleShardRequest { private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT; private ByteSizeValue maxBatchSize = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE; + private volatile long relativeStartNanos; + public Request(ShardId shardId, String expectedHistoryUUID) { super(shardId.getIndexName()); this.shardId = shardId; @@ -220,6 +223,12 @@ public Translog.Operation[] getOperations() { return operations; } + private long tookInMillis; + + public long getTookInMillis() { + return tookInMillis; + } + Response() { } @@ -228,13 +237,15 @@ public Translog.Operation[] getOperations() { final long globalCheckpoint, final long maxSeqNo, final long maxSeqNoOfUpdatesOrDeletes, - final Translog.Operation[] operations) { + final Translog.Operation[] operations, + final long tookInMillis) { this.mappingVersion = mappingVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; this.operations = operations; + this.tookInMillis = tookInMillis; } @Override @@ -245,6 +256,7 @@ public void readFrom(final StreamInput in) throws IOException { maxSeqNo = in.readZLong(); maxSeqNoOfUpdatesOrDeletes = in.readZLong(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); + tookInMillis = in.readVLong(); } @Override @@ -255,6 +267,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeZLong(maxSeqNo); out.writeZLong(maxSeqNoOfUpdatesOrDeletes); out.writeArray(Translog.Operation::writeOperation, operations); + out.writeVLong(tookInMillis); } @Override @@ -266,12 +279,14 @@ public boolean equals(final Object o) { globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes && - Arrays.equals(operations, that.operations); + Arrays.equals(operations, that.operations) && + tookInMillis == that.tookInMillis; } @Override public int hashCode() { - return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, Arrays.hashCode(operations)); + return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, + Arrays.hashCode(operations), tookInMillis); } } @@ -308,7 +323,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc request.getMaxBatchSize()); // must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations. final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); - return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations); + return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos); } @Override @@ -316,6 +331,7 @@ protected void asyncShardOperation( final Request request, final ShardId shardId, final ActionListener listener) throws IOException { + request.relativeStartNanos = System.nanoTime(); final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); final IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); @@ -373,7 +389,8 @@ private void globalCheckpointAdvancementFailure( clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); - listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY)); + listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY, + request.relativeStartNanos)); } catch (final Exception caught) { caught.addSuppressed(e); listener.onFailure(caught); @@ -459,8 +476,11 @@ static Translog.Operation[] getOperations( } static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, - final long maxSeqNoOfUpdates, final Translog.Operation[] operations) { - return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, operations); + final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) { + long tookInNanos = System.nanoTime() - relativeStartNanos; + long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos); + return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, + operations, tookInMillis); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 48c04ad05a691..68db709a14d45 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -72,6 +72,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private int numConcurrentReads = 0; private int numConcurrentWrites = 0; private long currentMappingVersion = 0; + private long totalFetchTookTimeMillis = 0; private long totalFetchTimeMillis = 0; private long numberOfSuccessfulFetches = 0; private long numberOfFailedFetches = 0; @@ -245,6 +246,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR fetchExceptions.remove(from); if (response.getOperations().length > 0) { // do not count polls against fetch stats + totalFetchTookTimeMillis += response.getTookInMillis(); totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); numberOfSuccessfulFetches++; operationsReceived += response.getOperations().length; @@ -455,6 +457,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() { buffer.size(), currentMappingVersion, totalFetchTimeMillis, + totalFetchTookTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, operationsReceived, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index a99e930188cf0..b9ac4fee3d23d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -26,7 +26,8 @@ protected ShardChangesAction.Response createTestInstance() { leaderGlobalCheckpoint, leaderMaxSeqNo, maxSeqNoOfUpdatesOrDeletes, - operations + operations, + randomNonNegativeLong() ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index d4b2d630966e8..50c0dd9ca49a0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -158,7 +158,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co final long globalCheckpoint = tracker.getCheckpoint(); final long maxSeqNo = tracker.getMaxSeqNo(); handler.accept(new ShardChangesAction.Response( - 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0])); + 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0], 1L)); } }; threadPool.generic().execute(task); @@ -233,7 +233,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, randomNonNegativeLong(), - ops.toArray(EMPTY)) + ops.toArray(EMPTY), + randomNonNegativeLong()) ) ); responses.put(prevGlobalCheckpoint, item); @@ -256,7 +257,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, randomNonNegativeLong(), - EMPTY + EMPTY, + randomNonNegativeLong() ); item.add(new TestResponse(null, mappingVersion, response)); } @@ -273,7 +275,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, localLeaderGCP, localLeaderGCP, randomNonNegativeLong(), - ops.toArray(EMPTY) + ops.toArray(EMPTY), + randomNonNegativeLong() ); item.add(new TestResponse(null, mappingVersion, response)); responses.put(fromSeqNo, Collections.unmodifiableList(item)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 3b8c13dda8824..8e477057051a5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -55,6 +55,7 @@ protected ShardFollowNodeTaskStatus createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomReadExceptions(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index b221a79e69efa..1988513c95d3b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -439,7 +439,7 @@ public void testReceiveNothingExpectedSomething() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0])); + task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0], 1L)); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); @@ -782,7 +782,8 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con leaderGlobalCheckpoints.poll(), maxSeqNos.poll(), randomNonNegativeLong(), - operations + operations, + 1L ); handler.accept(response); } @@ -813,7 +814,8 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro leaderGlobalCheckPoint, leaderGlobalCheckPoint, randomNonNegativeLong(), - ops.toArray(new Translog.Operation[0]) + ops.toArray(new Translog.Operation[0]), + 1L ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 7b95252c866f3..dd34325cccc14 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -377,7 +377,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); if (from > seqNoStats.getGlobalCheckpoint()) { handler.accept(ShardChangesAction.getResponse(1L, seqNoStats, - maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY)); + maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L)); return; } Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, @@ -388,7 +388,8 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdatesOrDeletes, - ops + ops, + 1L ); handler.accept(response); return; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java index 6c82852fca142..9f97787bf7d39 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java @@ -48,6 +48,7 @@ protected FollowStatsAction.StatsResponses createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), Collections.emptyNavigableMap(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java index 1c7c6ff06f5a6..7afbd7edaa4b8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java @@ -94,6 +94,7 @@ public void testToXContent() throws IOException { final int numberOfQueuedWrites = randomIntBetween(0, Integer.MAX_VALUE); final long mappingVersion = randomIntBetween(0, Integer.MAX_VALUE); final long totalFetchTimeMillis = randomLongBetween(0, 4096); + final long totalFetchTookTimeMillis = randomLongBetween(0, 4096); final long numberOfSuccessfulFetches = randomNonNegativeLong(); final long numberOfFailedFetches = randomLongBetween(0, 8); final long operationsReceived = randomNonNegativeLong(); @@ -121,6 +122,7 @@ public void testToXContent() throws IOException { numberOfQueuedWrites, mappingVersion, totalFetchTimeMillis, + totalFetchTookTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, operationsReceived, @@ -164,6 +166,7 @@ public void testToXContent() throws IOException { + "\"number_of_queued_writes\":" + numberOfQueuedWrites + "," + "\"mapping_version\":" + mappingVersion + "," + "\"total_fetch_time_millis\":" + totalFetchTimeMillis + "," + + "\"total_fetch_took_time_millis\":" + totalFetchTookTimeMillis + "," + "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + "," + "\"number_of_failed_fetches\":" + numberOfFailedFetches + "," + "\"operations_received\":" + operationsReceived + "," @@ -205,6 +208,7 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { 1, 1, 100, + 50, 10, 0, 10, @@ -223,7 +227,6 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { Map template = XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false); Map followStatsMapping = (Map) XContentMapValues.extractValue("mappings.doc.properties.ccr_stats.properties", template); - assertThat(serializedStatus.size(), equalTo(followStatsMapping.size())); for (Map.Entry entry : serializedStatus.entrySet()) { String fieldName = entry.getKey(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index 26341753b37ba..36ed4744c1307 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -47,6 +47,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { private static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes"); private static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version"); private static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis"); + private static final ParseField TOTAL_FETCH_TOOK_TIME_MILLIS_FIELD = new ParseField("total_fetch_took_time_millis"); private static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches"); private static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches"); private static final ParseField OPERATIONS_RECEIVED_FIELD = new ParseField("operations_received"); @@ -85,12 +86,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status { (long) args[18], (long) args[19], (long) args[20], + (long) args[21], new TreeMap<>( - ((List>>) args[21]) + ((List>>) args[22]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (long) args[22], - (ElasticsearchException) args[23])); + (long) args[23], + (ElasticsearchException) args[24])); public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; @@ -113,6 +115,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TOOK_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); @@ -219,6 +222,12 @@ public long totalFetchTimeMillis() { return totalFetchTimeMillis; } + private final long totalFetchTookTimeMillis; + + public long totalFetchTookTimeMillis() { + return totalFetchTookTimeMillis; + } + private final long numberOfSuccessfulFetches; public long numberOfSuccessfulFetches() { @@ -299,6 +308,7 @@ public ShardFollowNodeTaskStatus( final int numberOfQueuedWrites, final long mappingVersion, final long totalFetchTimeMillis, + final long totalFetchTookTimeMillis, final long numberOfSuccessfulFetches, final long numberOfFailedFetches, final long operationsReceived, @@ -323,6 +333,7 @@ public ShardFollowNodeTaskStatus( this.numberOfQueuedWrites = numberOfQueuedWrites; this.mappingVersion = mappingVersion; this.totalFetchTimeMillis = totalFetchTimeMillis; + this.totalFetchTookTimeMillis = totalFetchTookTimeMillis; this.numberOfSuccessfulFetches = numberOfSuccessfulFetches; this.numberOfFailedFetches = numberOfFailedFetches; this.operationsReceived = operationsReceived; @@ -350,6 +361,7 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException { this.numberOfQueuedWrites = in.readVInt(); this.mappingVersion = in.readVLong(); this.totalFetchTimeMillis = in.readVLong(); + this.totalFetchTookTimeMillis = in.readVLong(); this.numberOfSuccessfulFetches = in.readVLong(); this.numberOfFailedFetches = in.readVLong(); this.operationsReceived = in.readVLong(); @@ -384,6 +396,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVInt(numberOfQueuedWrites); out.writeVLong(mappingVersion); out.writeVLong(totalFetchTimeMillis); + out.writeVLong(totalFetchTookTimeMillis); out.writeVLong(numberOfSuccessfulFetches); out.writeVLong(numberOfFailedFetches); out.writeVLong(operationsReceived); @@ -430,6 +443,10 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(), "total_fetch_time", new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS)); + builder.humanReadableField( + TOTAL_FETCH_TOOK_TIME_MILLIS_FIELD.getPreferredName(), + "total_fetch_took_time", + new TimeValue(totalFetchTookTimeMillis, TimeUnit.MILLISECONDS)); builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches); builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches); builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived); @@ -501,6 +518,7 @@ public boolean equals(final Object o) { numberOfQueuedWrites == that.numberOfQueuedWrites && mappingVersion == that.mappingVersion && totalFetchTimeMillis == that.totalFetchTimeMillis && + totalFetchTookTimeMillis == that.totalFetchTookTimeMillis && numberOfSuccessfulFetches == that.numberOfSuccessfulFetches && numberOfFailedFetches == that.numberOfFailedFetches && operationsReceived == that.operationsReceived && @@ -536,6 +554,7 @@ public int hashCode() { numberOfQueuedWrites, mappingVersion, totalFetchTimeMillis, + totalFetchTookTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, operationsReceived, diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index 31d79f3646be7..017e3638ccf31 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -968,6 +968,9 @@ "total_fetch_time_millis": { "type": "long" }, + "total_fetch_took_time_millis": { + "type": "long" + }, "number_of_successful_fetches": { "type": "long" }, From 2d752cf6d9fa962bfcc013e8b992e9a13436a002 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 22 Oct 2018 09:27:57 +0200 Subject: [PATCH 2/3] start the clock the moment the request is de-serialized. --- .../elasticsearch/xpack/ccr/action/ShardChangesAction.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index e50cbc08df37f..611d31978692c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -68,7 +68,7 @@ public static class Request extends SingleShardRequest { private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT; private ByteSizeValue maxBatchSize = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE; - private volatile long relativeStartNanos; + private long relativeStartNanos; public Request(ShardId shardId, String expectedHistoryUUID) { super(shardId.getIndexName()); @@ -145,6 +145,9 @@ public void readFrom(StreamInput in) throws IOException { expectedHistoryUUID = in.readString(); pollTimeout = in.readTimeValue(); maxBatchSize = new ByteSizeValue(in); + + // Starting the clock in order to know how much time is spent on fetching operations: + relativeStartNanos = System.nanoTime(); } @Override @@ -331,7 +334,6 @@ protected void asyncShardOperation( final Request request, final ShardId shardId, final ActionListener listener) throws IOException { - request.relativeStartNanos = System.nanoTime(); final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); final IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); From d0a61774fe8a73e14e28f67920002358f29ac65e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 22 Oct 2018 09:34:20 +0200 Subject: [PATCH 3/3] Renamed `total_fetch_took_time_millis` to `total_fetch_leader_time_millis`. --- .../ccr/FollowStatsMonitoringDocTests.java | 2 +- .../core/ccr/ShardFollowNodeTaskStatus.java | 35 ++++++++++--------- .../src/main/resources/monitoring-es.json | 2 +- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java index 70cc660979e33..219bf7187baad 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java @@ -168,7 +168,7 @@ public void testToXContent() throws IOException { + "\"number_of_queued_writes\":" + numberOfQueuedWrites + "," + "\"mapping_version\":" + mappingVersion + "," + "\"total_fetch_time_millis\":" + totalFetchTimeMillis + "," - + "\"total_fetch_took_time_millis\":" + totalFetchTookTimeMillis + "," + + "\"total_fetch_leader_time_millis\":" + totalFetchTookTimeMillis + "," + "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + "," + "\"number_of_failed_fetches\":" + numberOfFailedFetches + "," + "\"operations_received\":" + operationsReceived + "," diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index 3094b3f317bc0..e21729df58b54 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -48,7 +48,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { private static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes"); private static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version"); private static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis"); - private static final ParseField TOTAL_FETCH_TOOK_TIME_MILLIS_FIELD = new ParseField("total_fetch_took_time_millis"); + private static final ParseField TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD = new ParseField("total_fetch_leader_time_millis"); private static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches"); private static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches"); private static final ParseField OPERATIONS_RECEIVED_FIELD = new ParseField("operations_received"); @@ -88,12 +88,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status { (long) args[19], (long) args[20], (long) args[21], + (long) args[22], new TreeMap<>( - ((List>>) args[22]) + ((List>>) args[23]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (long) args[23], - (ElasticsearchException) args[24])); + (long) args[24], + (ElasticsearchException) args[25])); public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; @@ -117,7 +118,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); - STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TOOK_TIME_MILLIS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); @@ -230,10 +231,10 @@ public long totalFetchTimeMillis() { return totalFetchTimeMillis; } - private final long totalFetchTookTimeMillis; + private final long totalFetchLeaderTimeMillis; - public long totalFetchTookTimeMillis() { - return totalFetchTookTimeMillis; + public long totalFetchLeaderTimeMillis() { + return totalFetchLeaderTimeMillis; } private final long numberOfSuccessfulFetches; @@ -317,7 +318,7 @@ public ShardFollowNodeTaskStatus( final int numberOfQueuedWrites, final long mappingVersion, final long totalFetchTimeMillis, - final long totalFetchTookTimeMillis, + final long totalFetchLeaderTimeMillis, final long numberOfSuccessfulFetches, final long numberOfFailedFetches, final long operationsReceived, @@ -343,7 +344,7 @@ public ShardFollowNodeTaskStatus( this.numberOfQueuedWrites = numberOfQueuedWrites; this.mappingVersion = mappingVersion; this.totalFetchTimeMillis = totalFetchTimeMillis; - this.totalFetchTookTimeMillis = totalFetchTookTimeMillis; + this.totalFetchLeaderTimeMillis = totalFetchLeaderTimeMillis; this.numberOfSuccessfulFetches = numberOfSuccessfulFetches; this.numberOfFailedFetches = numberOfFailedFetches; this.operationsReceived = operationsReceived; @@ -372,7 +373,7 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException { this.numberOfQueuedWrites = in.readVInt(); this.mappingVersion = in.readVLong(); this.totalFetchTimeMillis = in.readVLong(); - this.totalFetchTookTimeMillis = in.readVLong(); + this.totalFetchLeaderTimeMillis = in.readVLong(); this.numberOfSuccessfulFetches = in.readVLong(); this.numberOfFailedFetches = in.readVLong(); this.operationsReceived = in.readVLong(); @@ -408,7 +409,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVInt(numberOfQueuedWrites); out.writeVLong(mappingVersion); out.writeVLong(totalFetchTimeMillis); - out.writeVLong(totalFetchTookTimeMillis); + out.writeVLong(totalFetchLeaderTimeMillis); out.writeVLong(numberOfSuccessfulFetches); out.writeVLong(numberOfFailedFetches); out.writeVLong(operationsReceived); @@ -457,9 +458,9 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P "total_fetch_time", new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS)); builder.humanReadableField( - TOTAL_FETCH_TOOK_TIME_MILLIS_FIELD.getPreferredName(), - "total_fetch_took_time", - new TimeValue(totalFetchTookTimeMillis, TimeUnit.MILLISECONDS)); + TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD.getPreferredName(), + "total_fetch_leader_time", + new TimeValue(totalFetchLeaderTimeMillis, TimeUnit.MILLISECONDS)); builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches); builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches); builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived); @@ -532,7 +533,7 @@ public boolean equals(final Object o) { numberOfQueuedWrites == that.numberOfQueuedWrites && mappingVersion == that.mappingVersion && totalFetchTimeMillis == that.totalFetchTimeMillis && - totalFetchTookTimeMillis == that.totalFetchTookTimeMillis && + totalFetchLeaderTimeMillis == that.totalFetchLeaderTimeMillis && numberOfSuccessfulFetches == that.numberOfSuccessfulFetches && numberOfFailedFetches == that.numberOfFailedFetches && operationsReceived == that.operationsReceived && @@ -569,7 +570,7 @@ public int hashCode() { numberOfQueuedWrites, mappingVersion, totalFetchTimeMillis, - totalFetchTookTimeMillis, + totalFetchLeaderTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, operationsReceived, diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index bd8ecd1489d54..791a0ea02c392 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -971,7 +971,7 @@ "total_fetch_time_millis": { "type": "long" }, - "total_fetch_took_time_millis": { + "total_fetch_leader_time_millis": { "type": "long" }, "number_of_successful_fetches": {