From 270e8d54b29ba5b50921215ac0c5ba0770976070 Mon Sep 17 00:00:00 2001 From: zhichen Date: Thu, 6 Feb 2020 17:57:22 +0800 Subject: [PATCH 01/12] Add Bulk stats track the bulk sizes per shard and the time spent on the bulk shard request (#50536)(#47345) --- .../rest-api-spec/api/indices.stats.json | 3 +- .../rest-api-spec/api/nodes.stats.json | 3 +- .../admin/indices/stats/CommonStats.java | 32 +++++- .../admin/indices/stats/CommonStatsFlags.java | 3 +- .../indices/stats/IndicesStatsRequest.java | 8 ++ .../stats/IndicesStatsRequestBuilder.java | 5 + .../action/bulk/BulkShardRequest.java | 21 ++++ .../action/bulk/TransportShardBulkAction.java | 5 + .../bulk/stats/BulkOperationListener.java | 32 ++++++ .../index/bulk/stats/BulkStats.java | 106 ++++++++++++++++++ .../index/bulk/stats/ShardBulkStats.java | 48 ++++++++ .../elasticsearch/index/shard/IndexShard.java | 13 +++ .../elasticsearch/indices/IndicesService.java | 6 + .../indices/NodeIndicesStats.java | 6 + .../rest/action/cat/RestIndicesAction.java | 19 ++++ .../rest/action/cat/RestNodesAction.java | 10 ++ .../rest/action/cat/RestShardsAction.java | 9 ++ .../bulk/TransportShardBulkActionTests.java | 2 + .../index/bulk/stats/BulkStatsTests.java | 62 ++++++++++ .../indices/stats/IndexStatsIT.java | 39 ++++++- .../src/main/resources/monitoring-es.json | 65 +++++++++++ .../indices/IndexStatsCollector.java | 1 + .../indices/IndexStatsMonitoringDoc.java | 8 +- .../indices/IndicesStatsMonitoringDoc.java | 8 +- .../collector/node/NodeStatsCollector.java | 3 +- .../node/NodeStatsMonitoringDoc.java | 3 + 26 files changed, 512 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/bulk/stats/BulkOperationListener.java create mode 100644 server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java create mode 100644 server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java create mode 100644 server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json index 0940313154a74..322a43dcdd134 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json @@ -37,7 +37,8 @@ "segments", "store", "warmer", - "suggest" + "suggest", + "bulk" ], "description":"Limit the information returned the specific metrics." } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json index 1aa57ee849c66..09027ba3a3103 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json @@ -120,7 +120,8 @@ "segments", "store", "warmer", - "suggest" + "suggest", + "bulk" ], "description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified." } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java index e9309bf8ac9c8..ade4ddb316da9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java @@ -20,6 +20,8 @@ package org.elasticsearch.action.admin.indices.stats; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Version; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -102,6 +104,9 @@ public class CommonStats implements Writeable, ToXContentFragment { @Nullable public RecoveryStats recoveryStats; + @Nullable + public BulkStats bulk; + public CommonStats() { this(CommonStatsFlags.NONE); } @@ -159,6 +164,9 @@ public CommonStats(CommonStatsFlags flags) { case Recovery: recoveryStats = new RecoveryStats(); break; + case Bulk: + bulk = new BulkStats(); + break; default: throw new IllegalStateException("Unknown Flag: " + flag); } @@ -218,6 +226,9 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C case Recovery: recoveryStats = indexShard.recoveryStats(); break; + case Bulk: + bulk = indexShard.bulkStats(); + break; default: throw new IllegalStateException("Unknown Flag: " + flag); } @@ -244,6 +255,9 @@ public CommonStats(StreamInput in) throws IOException { translog = in.readOptionalWriteable(TranslogStats::new); requestCache = in.readOptionalWriteable(RequestCacheStats::new); recoveryStats = in.readOptionalWriteable(RecoveryStats::new); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + bulk = in.readOptionalWriteable(BulkStats::new); + } } @Override @@ -264,6 +278,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(translog); out.writeOptionalWriteable(requestCache); out.writeOptionalWriteable(recoveryStats); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(bulk); + } } public void add(CommonStats stats) { @@ -396,6 +413,14 @@ public void add(CommonStats stats) { } else { recoveryStats.add(stats.getRecoveryStats()); } + if (bulk == null) { + if (stats.getBulk() != null) { + bulk = new BulkStats(); + bulk.add(stats.getBulk()); + } + } else { + bulk.add(stats.getBulk()); + } } @Nullable @@ -478,6 +503,11 @@ public RecoveryStats getRecoveryStats() { return recoveryStats; } + @Nullable + public BulkStats getBulk() { + return bulk; + } + /** * Utility method which computes total memory by adding * FieldData, PercolatorCache, Segments (memory, index writer, version map) @@ -504,7 +534,7 @@ public ByteSizeValue getTotalMemory() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { final Stream stream = Arrays.stream(new ToXContent[] { docs, store, indexing, get, search, merge, refresh, flush, warmer, queryCache, - fieldData, completion, segments, translog, requestCache, recoveryStats}) + fieldData, completion, segments, translog, requestCache, recoveryStats, bulk}) .filter(Objects::nonNull); for (ToXContent toXContent : ((Iterable)stream::iterator)) { toXContent.toXContent(builder, params); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java index a25f8e417a885..ee041235410ef 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java @@ -227,7 +227,8 @@ public enum Flag { Translog("translog", 13), // 14 was previously used for Suggest RequestCache("request_cache", 15), - Recovery("recovery", 16); + Recovery("recovery", 16), + Bulk("bulk", 17); private final String restName; private final int index; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java index 5f7eb2b3b95ad..6af50d5e2eacb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java @@ -254,6 +254,14 @@ public boolean recovery() { return flags.isSet(Flag.Recovery); } + public IndicesStatsRequest bulk(boolean bulk) { + flags.set(Flag.Bulk, bulk); + return this; + } + public boolean bulk() { + return flags.isSet(Flag.Bulk); + } + public boolean includeSegmentFileSizes() { return flags.includeSegmentFileSizes(); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java index 8108299721d82..8fce7c79f5064 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java @@ -149,6 +149,11 @@ public IndicesStatsRequestBuilder setRecovery(boolean recovery) { return this; } + public IndicesStatsRequestBuilder setBulk(boolean bulk) { + request.bulk(bulk); + return this; + } + public IndicesStatsRequestBuilder setIncludeSegmentFileSizes(boolean includeSegmentFileSizes) { request.includeSegmentFileSizes(includeSegmentFileSizes); return this; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 0f05f071ad4f3..540db58da864c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -19,8 +19,11 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -49,6 +52,24 @@ public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRe setRefreshPolicy(refreshPolicy); } + public long totalSizeInBytes() { + long totalSizeInBytes = 0; + for (int i = 0; i < items.length; i++) { + DocWriteRequest request = items[i].request(); + if (request instanceof IndexRequest) { + if (((IndexRequest) request).source() != null) { + totalSizeInBytes += ((IndexRequest) request).source().length(); + } + } else if (request instanceof UpdateRequest) { + IndexRequest doc = ((UpdateRequest) request).doc(); + if (doc != null && doc.source() != null) { + totalSizeInBytes += ((UpdateRequest) request).doc().source().length(); + } + } + } + return totalSizeInBytes; + } + public BulkItemRequest[] items() { return items; } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index e709c24bc64c7..828147e997343 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -153,6 +153,8 @@ public static void performOnPrimary( private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); + final long startBulkTime = System.nanoTime(); + @Override protected void doRun() throws Exception { while (context.hasMoreOperationsToExecute()) { @@ -187,6 +189,7 @@ private void finishRequest() { () -> new WritePrimaryResult<>( context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), null, context.getPrimary(), logger)); + primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); } }.run(); } @@ -392,7 +395,9 @@ private static BulkItemResponse processUpdateResponse(final UpdateRequest update @Override public WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + final long startBulkTime = System.nanoTime(); final Translog.Location location = performOnReplica(request, replica); + replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); return new WriteReplicaResult<>(request, location, null, replica, logger); } diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkOperationListener.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkOperationListener.java new file mode 100644 index 0000000000000..c9164e800d199 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkOperationListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.bulk.stats; + +/** + * An bulk operation listener for bulk events. + */ +public interface BulkOperationListener { + /** + * Called after the bulk operation occurred. + */ + default void afterBulk(long bulkShardSizeInBytes, long tookInNanos) { + } +} + diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java new file mode 100644 index 0000000000000..63dfd68d48820 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java @@ -0,0 +1,106 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.bulk.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +public class BulkStats implements Writeable, ToXContentFragment { + + private long total = 0; + private long totalTimeInMillis = 0; + private long totalSizeInBytes = 0; + + public BulkStats() { + + } + + public BulkStats(StreamInput in) throws IOException { + total = in.readVLong(); + totalTimeInMillis = in.readVLong(); + totalSizeInBytes = in.readVLong(); + } + + public BulkStats(long total, long totalTimeInMillis, long totalSizeInBytes) { + this.total = total; + this.totalTimeInMillis = totalTimeInMillis; + this.totalSizeInBytes = totalSizeInBytes; + } + + public void add(BulkStats bulkStats) { + addTotals(bulkStats); + } + + public void addTotals(BulkStats bulkStats) { + if (bulkStats == null) { + return; + } + this.total += bulkStats.total; + this.totalTimeInMillis += bulkStats.totalTimeInMillis; + this.totalSizeInBytes += bulkStats.totalSizeInBytes; + } + + public long getTotalSizeInBytes() { + return totalSizeInBytes; + } + + public long getTotal() { + return total; + } + + public TimeValue getTotalTime() { + return new TimeValue(totalTimeInMillis); + } + + public long getTotalTimeInMillis() { + return totalTimeInMillis; + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(total); + out.writeVLong(totalTimeInMillis); + out.writeVLong(totalSizeInBytes); + } + + @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(Fields.BULK); + builder.field(Fields.TOTAL, total); + builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime()); + builder.field(Fields.TOTAL_SIZE_IN_BYTES, totalSizeInBytes); + builder.endObject(); + return builder; + } + + static final class Fields { + static final String BULK = "bulk"; + static final String TOTAL = "total"; + static final String TOTAL_TIME = "total_time"; + static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; + static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes"; + } +} + diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java new file mode 100644 index 0000000000000..ec3b1ded3c39a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.bulk.stats; + +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; + +import java.util.concurrent.TimeUnit; + +public class ShardBulkStats implements BulkOperationListener { + + private final StatsHolder totalStats = new StatsHolder(); + + public BulkStats stats() { + return totalStats.stats(); + } + + @Override public void afterBulk(long shardBulkSizeInBytes, long tookInNanos) { + totalStats.totalSizeInBytes.inc(shardBulkSizeInBytes); + totalStats.shardBulkMetric.inc(tookInNanos); + } + + static final class StatsHolder { + final MeanMetric shardBulkMetric = new MeanMetric(); + final CounterMetric totalSizeInBytes = new CounterMetric(); + + BulkStats stats() { + return new BulkStats(shardBulkMetric.count(), TimeUnit.NANOSECONDS.toMillis(shardBulkMetric.sum()), totalSizeInBytes.count()); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 759feb1d7c68e..0cf44284f9774 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -46,6 +46,9 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.index.bulk.stats.BulkOperationListener; +import org.elasticsearch.index.bulk.stats.BulkStats; +import org.elasticsearch.index.bulk.stats.ShardBulkStats; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -209,6 +212,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final SearchOperationListener searchOperationListener; + private final ShardBulkStats bulkOperationListener; private final GlobalCheckpointListeners globalCheckpointListeners; private final ReplicationTracker replicationTracker; @@ -311,6 +315,7 @@ public IndexShard( final List listenersList = new ArrayList<>(listeners); listenersList.add(internalIndexingStats); this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger); + this.bulkOperationListener = new ShardBulkStats(); this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); final List searchListenersList = new ArrayList<>(searchOperationListener); @@ -400,6 +405,10 @@ public SearchOperationListener getSearchOperationListener() { return this.searchOperationListener; } + public BulkOperationListener getBulkOperationListener() { + return this.bulkOperationListener; + } + public ShardIndexWarmerService warmerService() { return this.shardWarmerService; } @@ -1033,6 +1042,10 @@ public CompletionStats completionStats(String... fields) { } } + public BulkStats bulkStats() { + return bulkOperationListener.stats(); + } + /** * Executes the given flush request against the engine. * diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 62be919485669..b0903ab33cea2 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -35,6 +35,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -393,6 +394,9 @@ public NodeIndicesStats stats(CommonStatsFlags flags) { case Flush: commonStats.flush.add(oldShardsStats.flushStats); break; + case Bulk: + commonStats.bulk.add(oldShardsStats.bulkStats); + break; } } @@ -769,6 +773,7 @@ static class OldShardsStats implements IndexEventListener { final RefreshStats refreshStats = new RefreshStats(); final FlushStats flushStats = new FlushStats(); final RecoveryStats recoveryStats = new RecoveryStats(); + final BulkStats bulkStats = new BulkStats(); @Override public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { @@ -781,6 +786,7 @@ public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable Index refreshStats.addTotals(indexShard.refreshStats()); flushStats.addTotals(indexShard.flushStats()); recoveryStats.addTotals(indexShard.recoveryStats()); + bulkStats.addTotals(indexShard.bulkStats()); } } diff --git a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java index aa9e880180327..47fb58f3cae13 100644 --- a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -172,6 +173,11 @@ public RecoveryStats getRecoveryStats() { return stats.getRecoveryStats(); } + @Nullable + public BulkStats getBulk() { + return stats.getBulk(); + } + @Override public void writeTo(StreamOutput out) throws IOException { stats.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index 5a50b0ff04ce6..d90736be20426 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -487,6 +487,16 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("search.throttled", "alias:sth;default:false;desc:indicates if the index is search throttled"); + table.addCell("bulk.total", + "sibling:pri;alias:bto,bulkTotal;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("pri.bulk.shard_bulk_time", "default:false;text-align:right;desc:number of bulk shard ops"); + + table.addCell("bulk.total_time", "sibling:pri;alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); + table.addCell("pri.bulk.shard_bulk_time", "default:false;text-align:right;desc:time spend in shard bulk"); + + table.addCell("bulk.total_size_in_bytes", "sibling:pri;alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.addCell("pri.bulk.total_size_in_bytes", "default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.endHeaders(); return table; } @@ -746,6 +756,15 @@ Table buildTable(final RestRequest request, table.addCell(searchThrottled); + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotal()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotal()); + + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotalTime()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotalTime()); + + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotalSizeInBytes()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotalSizeInBytes()); + table.endRow(); }); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java index ac26c43385004..ccf2d664fafcb 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -242,6 +243,10 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("suggest.time", "alias:suti,suggestTime;default:false;text-align:right;desc:time spend in suggest"); table.addCell("suggest.total", "alias:suto,suggestTotal;default:false;text-align:right;desc:number of suggest ops"); + table.addCell("bulk.total", "alias:bto,bulkTotal;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); + table.addCell("bulk.total_size_in_bytes", "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.endHeaders(); return table; } @@ -416,6 +421,11 @@ Table buildTable(boolean fullId, RestRequest req, ClusterStateResponse state, No table.addCell(searchStats == null ? null : searchStats.getTotal().getSuggestTime()); table.addCell(searchStats == null ? null : searchStats.getTotal().getSuggestCount()); + BulkStats bulkStats = indicesStats == null ? null : indicesStats.getBulk(); + table.addCell(bulkStats == null ? null : bulkStats.getTotal()); + table.addCell(bulkStats == null ? null : bulkStats.getTotalTime()); + table.addCell(bulkStats == null ? null : bulkStats.getTotalSizeInBytes()); + table.endRow(); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java index 847b1a773e52a..427580735d9d9 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; @@ -199,6 +200,10 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("warmer.total", "alias:wto,warmerTotal;default:false;text-align:right;desc:total warmer ops"); table.addCell("warmer.total_time", "alias:wtt,warmerTotalTime;default:false;text-align:right;desc:time spent in warmers"); + table.addCell("bulk.total", "alias:bto,bulkTotal;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); + table.addCell("bulk.total_size_in_bytes", "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.endHeaders(); return table; } @@ -350,6 +355,10 @@ private Table buildTable(RestRequest request, ClusterStateResponse state, Indice table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::total)); table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::totalTime)); + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotal)); + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalTime)); + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalSizeInBytes)); + table.endRow(); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 5985d6731fdbd..ee6aee947b746 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.bulk.stats.ShardBulkStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperService; @@ -781,6 +782,7 @@ public void testRetries() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); when(shard.shardId()).thenReturn(shardId); when(shard.mapperService()).thenReturn(mock(MapperService.class)); + when(shard.getBulkOperationListener()).thenReturn(mock(ShardBulkStats.class)); UpdateHelper updateHelper = mock(UpdateHelper.class); when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( diff --git a/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java b/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java new file mode 100644 index 0000000000000..74101a464e8cc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.bulk.stats; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class BulkStatsTests extends ESTestCase { + + public void testSerialize() throws IOException { + BulkStats stats = new BulkStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + BytesStreamOutput out = new BytesStreamOutput(); + stats.writeTo(out); + StreamInput input = out.bytes().streamInput(); + BulkStats read = new BulkStats(input); + assertEquals(-1, input.read()); + assertEquals(stats.getTotal(), read.getTotal()); + assertEquals(stats.getTotalTime(), read.getTotalTime()); + assertEquals(stats.getTotalSizeInBytes(), read.getTotalSizeInBytes()); + } + + public void testAddTotals() { + BulkStats bulkStats1 = new BulkStats(1, 1, 1); + BulkStats bulkStats2 = new BulkStats(1, 1, 1); + + // adding these two bulk stats and checking stats are correct + bulkStats1.add(bulkStats2); + assertStats(bulkStats1, 2); + + // another call, adding again ... + bulkStats1.add(bulkStats2); + assertStats(bulkStats1, 3); + + } + + private static void assertStats(BulkStats stats, long equalTo) { + assertEquals(equalTo, stats.getTotal()); + assertEquals(equalTo, stats.getTotalTimeInMillis()); + assertEquals(equalTo, stats.getTotalSizeInBytes()); + } +} + diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 816d941ac7ec0..ff44d35dc6d73 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -32,11 +32,16 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -726,7 +731,7 @@ public void testEncodeDecodeCommonStats() throws IOException { public void testFlagOrdinalOrder() { Flag[] flags = new Flag[]{Flag.Store, Flag.Indexing, Flag.Get, Flag.Search, Flag.Merge, Flag.Flush, Flag.Refresh, Flag.QueryCache, Flag.FieldData, Flag.Docs, Flag.Warmer, Flag.Completion, Flag.Segments, - Flag.Translog, Flag.RequestCache, Flag.Recovery}; + Flag.Translog, Flag.RequestCache, Flag.Recovery, Flag.Bulk}; assertThat(flags.length, equalTo(Flag.values().length)); for (int i = 0; i < flags.length; i++) { @@ -902,6 +907,9 @@ private static void set(Flag flag, IndicesStatsRequestBuilder builder, boolean s case Recovery: builder.setRecovery(set); break; + case Bulk: + builder.setBulk(set); + break; default: fail("new flag? " + flag); break; @@ -942,6 +950,8 @@ private static boolean isSet(Flag flag, CommonStats response) { return response.getRequestCache() != null; case Recovery: return response.getRecoveryStats() != null; + case Bulk: + return response.getBulk() != null; default: fail("new flag? " + flag); return false; @@ -1067,6 +1077,33 @@ public void testFilterCacheStats() throws Exception { assertThat(response.getTotal().queryCache.getMemorySizeInBytes(), equalTo(0L)); } + public void testBulkStats() throws Exception { + final String index = "test"; + assertAcked(prepareCreate(index).setSettings(settingsBuilder().put("index.number_of_shards", 2) + .put("index.number_of_replicas", 1))); + ensureGreen(); + final BulkRequest request1 = new BulkRequest(); + for (int i = 0; i < 500; ++i) { + request1.add(new IndexRequest(index).source(Collections.singletonMap("key", "value" + i))) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + BulkResponse bulkResponse = client().bulk(request1).get(); + assertThat(bulkResponse.hasFailures(), equalTo(false)); + assertThat(bulkResponse.getItems().length, equalTo(500)); + for (BulkItemResponse bulkItemResponse : bulkResponse) { + assertThat(bulkItemResponse.getIndex(), equalTo(index)); + } + IndicesStatsResponse stats = client().admin().indices().prepareStats(index).setBulk(true).get(); + + assertThat(stats.getTotal().bulk.getTotal(), equalTo(4L)); + assertThat(stats.getTotal().bulk.getTotalTimeInMillis(), greaterThan(0L)); + assertThat(stats.getTotal().bulk.getTotalSizeInBytes(), greaterThan(0L)); + + assertThat(stats.getPrimaries().bulk.getTotal(), equalTo(2L)); + assertThat(stats.getPrimaries().bulk.getTotalTimeInMillis(), greaterThan(0L)); + assertThat(stats.getPrimaries().bulk.getTotalSizeInBytes(), greaterThan(0L)); + } + /** * Test that we can safely concurrently index and get stats. This test was inspired by a serialization issue that arose due to a race * getting doc stats during heavy indexing. The race could lead to deleted docs being negative which would then be serialized as a diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index c937adb4174fb..88ca0f5beba02 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -84,6 +84,19 @@ "type": "long" } } + }, + "bulk": { + "properties": { + "total": { + "type": "long" + }, + "total_time_in_millis": { + "type": "long" + }, + "total_size_in_bytes": { + "type": "long" + } + } } } }, @@ -115,6 +128,19 @@ "type": "long" } } + }, + "bulk": { + "properties": { + "total": { + "type": "long" + }, + "total_time_in_millis": { + "type": "long" + }, + "total_size_in_bytes": { + "type": "long" + } + } } } } @@ -258,6 +284,19 @@ "type": "long" } } + }, + "bulk": { + "properties": { + "total": { + "type": "long" + }, + "total_time_in_millis": { + "type": "long" + }, + "total_size_in_bytes": { + "type": "long" + } + } } } }, @@ -392,6 +431,19 @@ "type": "long" } } + }, + "bulk": { + "properties": { + "total": { + "type": "long" + }, + "total_time_in_millis": { + "type": "long" + }, + "total_size_in_bytes": { + "type": "long" + } + } } } } @@ -560,6 +612,19 @@ "type": "long" } } + }, + "bulk": { + "properties": { + "total": { + "type": "long" + }, + "total_time_in_millis": { + "type": "long" + }, + "total_size_in_bytes": { + "type": "long" + } + } } } }, diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java index 7630aa4794b88..abcb1531b3acd 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java @@ -70,6 +70,7 @@ protected Collection doCollect(final MonitoringDoc.Node node, .setRefresh(true) .setQueryCache(true) .setRequestCache(true) + .setBulk(true) .get(getCollectionTimeout()); final long timestamp = timestamp(); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java index 4700040377042..e8c0428cf1b85 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java @@ -163,6 +163,9 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "index_stats.primaries.store.size_in_bytes", "index_stats.primaries.refresh.total_time_in_millis", "index_stats.primaries.refresh.external_total_time_in_millis", + "index_stats.primaries.bulk.total", + "index_stats.primaries.bulk.total_time_in_millis", + "index_stats.primaries.bulk.total_size_in_bytes", "index_stats.total.docs.count", "index_stats.total.fielddata.memory_size_in_bytes", "index_stats.total.fielddata.evictions", @@ -193,5 +196,8 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "index_stats.total.segments.fixed_bit_set_memory_in_bytes", "index_stats.total.store.size_in_bytes", "index_stats.total.refresh.total_time_in_millis", - "index_stats.total.refresh.external_total_time_in_millis"); + "index_stats.total.refresh.external_total_time_in_millis", + "index_stats.total.bulk.total", + "index_stats.total.bulk.total_time_in_millis", + "index_stats.total.bulk.total_size_in_bytes"); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java index 6fbf26b7c39dd..b9fd42aede0b2 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java @@ -100,6 +100,9 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "indices_stats._all.primaries.search.query_time_in_millis", "indices_stats._all.primaries.search.query_total", "indices_stats._all.primaries.store.size_in_bytes", + "indices_stats._all.primaries.bulk.total", + "indices_stats._all.primaries.bulk.total_time_in_millis", + "indices_stats._all.primaries.bulk.total_size_in_bytes", "indices_stats._all.total.docs.count", "indices_stats._all.total.indexing.index_time_in_millis", "indices_stats._all.total.indexing.index_total", @@ -107,5 +110,8 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "indices_stats._all.total.indexing.throttle_time_in_millis", "indices_stats._all.total.search.query_time_in_millis", "indices_stats._all.total.search.query_total", - "indices_stats._all.total.store.size_in_bytes"); + "indices_stats._all.total.store.size_in_bytes", + "indices_stats._all.total.bulk.total", + "indices_stats._all.total.bulk.total_time_in_millis", + "indices_stats._all.total.bulk.total_size_in_bytes"); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java index bc816cb9d9a05..d7f7b1b1f00db 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java @@ -44,7 +44,8 @@ public class NodeStatsCollector extends Collector { CommonStatsFlags.Flag.QueryCache, CommonStatsFlags.Flag.RequestCache, CommonStatsFlags.Flag.Search, - CommonStatsFlags.Flag.Segments); + CommonStatsFlags.Flag.Segments, + CommonStatsFlags.Flag.Bulk); private final Client client; diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java index 6ed4271b7b989..2b353e3f908a8 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java @@ -104,6 +104,9 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "node_stats.indices.segments.index_writer_memory_in_bytes", "node_stats.indices.segments.version_map_memory_in_bytes", "node_stats.indices.segments.fixed_bit_set_memory_in_bytes", + "node_stats.indices.bulk.total", + "node_stats.indices.bulk.total_time_in_millis", + "node_stats.indices.bulk.total_size_in_bytes", "node_stats.fs.io_stats.total.operations", "node_stats.fs.io_stats.total.read_operations", "node_stats.fs.io_stats.total.write_operations", From 0d05e87ddcc8f6b784e47e7f49e5aa488ca35db9 Mon Sep 17 00:00:00 2001 From: zhichen Date: Sat, 15 Feb 2020 20:24:12 +0800 Subject: [PATCH 02/12] Refactoring bulk stats test and add some java docs as mentioned in the review. --- .../rest-api-spec/api/indices.stats.json | 3 +- .../rest-api-spec/api/nodes.stats.json | 3 +- .../test/cat.shards/10_basic.yml | 3 ++ .../action/bulk/TransportShardBulkAction.java | 2 +- .../index/bulk/stats/BulkStats.java | 51 ++++++++++++++----- .../index/bulk/stats/ShardBulkStats.java | 8 ++- .../rest/action/cat/RestIndicesAction.java | 10 ++-- .../rest/action/cat/RestNodesAction.java | 4 +- .../rest/action/cat/RestShardsAction.java | 4 +- .../index/bulk/stats/BulkStatsTests.java | 49 +++++++++++------- .../indices/stats/IndexStatsIT.java | 4 +- 11 files changed, 95 insertions(+), 46 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json index 322a43dcdd134..f1bfc33daf8f7 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json @@ -84,7 +84,8 @@ "segments", "store", "warmer", - "suggest" + "suggest", + "bulk" ], "description":"Limit the information returned the specific metrics." } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json index 09027ba3a3103..0dc5e159a4528 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json @@ -168,7 +168,8 @@ "segments", "store", "warmer", - "suggest" + "suggest", + "bulk" ], "description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified." }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml index de5e632975752..f8409d7a31da3 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml @@ -78,6 +78,9 @@ warmer.current .+ \n warmer.total .+ \n warmer.total_time .+ \n + bulk.total_operations .+ \n + bulk.total_time .+ \n + bulk.total_size_in_bytes .+ \n $/ --- "Test cat shards output": diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 828147e997343..3a28b4eba1779 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -168,6 +168,7 @@ protected void doRun() throws Exception { } // We're done, there's no more operations to execute so we resolve the wrapped listener finishRequest(); + primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); } @Override @@ -189,7 +190,6 @@ private void finishRequest() { () -> new WritePrimaryResult<>( context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), null, context.getPrimary(), logger)); - primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); } }.run(); } diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java index 63dfd68d48820..dad40c418686b 100644 --- a/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java @@ -28,10 +28,15 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Objects; +/** + * Bulk related statistics, including the time and size of shard bulk requests, + * starting at the shard level and allowing aggregation to indices and node level + */ public class BulkStats implements Writeable, ToXContentFragment { - private long total = 0; + private long totalOperations = 0; private long totalTimeInMillis = 0; private long totalSizeInBytes = 0; @@ -40,13 +45,13 @@ public BulkStats() { } public BulkStats(StreamInput in) throws IOException { - total = in.readVLong(); + totalOperations = in.readVLong(); totalTimeInMillis = in.readVLong(); totalSizeInBytes = in.readVLong(); } - public BulkStats(long total, long totalTimeInMillis, long totalSizeInBytes) { - this.total = total; + public BulkStats(long totalOperations, long totalTimeInMillis, long totalSizeInBytes) { + this.totalOperations = totalOperations; this.totalTimeInMillis = totalTimeInMillis; this.totalSizeInBytes = totalSizeInBytes; } @@ -59,7 +64,7 @@ public void addTotals(BulkStats bulkStats) { if (bulkStats == null) { return; } - this.total += bulkStats.total; + this.totalOperations += bulkStats.totalOperations; this.totalTimeInMillis += bulkStats.totalTimeInMillis; this.totalSizeInBytes += bulkStats.totalSizeInBytes; } @@ -68,8 +73,8 @@ public long getTotalSizeInBytes() { return totalSizeInBytes; } - public long getTotal() { - return total; + public long getTotalOperations() { + return totalOperations; } public TimeValue getTotalTime() { @@ -80,24 +85,46 @@ public long getTotalTimeInMillis() { return totalTimeInMillis; } - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(total); + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalOperations); out.writeVLong(totalTimeInMillis); out.writeVLong(totalSizeInBytes); } - @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(Fields.BULK); - builder.field(Fields.TOTAL, total); + builder.field(Fields.TOTAL_OPERATIONS, totalOperations); builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime()); builder.field(Fields.TOTAL_SIZE_IN_BYTES, totalSizeInBytes); builder.endObject(); return builder; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final BulkStats that = (BulkStats) o; + return Objects.equals(this.totalOperations, that.totalOperations) + && Objects.equals(this.totalTimeInMillis, that.totalTimeInMillis) + && Objects.equals(this.totalSizeInBytes, that.totalSizeInBytes); + } + + @Override + public int hashCode() { + return Objects.hash(totalOperations, totalTimeInMillis, totalSizeInBytes); + } + static final class Fields { static final String BULK = "bulk"; - static final String TOTAL = "total"; + static final String TOTAL_OPERATIONS = "total_operations"; static final String TOTAL_TIME = "total_time"; static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes"; diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java index ec3b1ded3c39a..8f1869c1ec289 100644 --- a/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java @@ -21,9 +21,14 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.index.shard.IndexShard; import java.util.concurrent.TimeUnit; +/** + * Internal class that maintains relevant shard bulk statistics / metrics. + * @see IndexShard + */ public class ShardBulkStats implements BulkOperationListener { private final StatsHolder totalStats = new StatsHolder(); @@ -32,7 +37,8 @@ public BulkStats stats() { return totalStats.stats(); } - @Override public void afterBulk(long shardBulkSizeInBytes, long tookInNanos) { + @Override + public void afterBulk(long shardBulkSizeInBytes, long tookInNanos) { totalStats.totalSizeInBytes.inc(shardBulkSizeInBytes); totalStats.shardBulkMetric.inc(tookInNanos); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index d90736be20426..230de7c40254d 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -487,9 +487,9 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("search.throttled", "alias:sth;default:false;desc:indicates if the index is search throttled"); - table.addCell("bulk.total", - "sibling:pri;alias:bto,bulkTotal;default:false;text-align:right;desc:number of bulk shard ops"); - table.addCell("pri.bulk.shard_bulk_time", "default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("bulk.total_operations", + "sibling:pri;alias:bto,bulkTotalOperation;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("pri.bulk.total_operations", "default:false;text-align:right;desc:number of bulk shard ops"); table.addCell("bulk.total_time", "sibling:pri;alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); table.addCell("pri.bulk.shard_bulk_time", "default:false;text-align:right;desc:time spend in shard bulk"); @@ -756,8 +756,8 @@ Table buildTable(final RestRequest request, table.addCell(searchThrottled); - table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotal()); - table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotal()); + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotalOperations()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotalOperations()); table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotalTime()); table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotalTime()); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java index ccf2d664fafcb..607d14e4d2f01 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java @@ -243,7 +243,7 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("suggest.time", "alias:suti,suggestTime;default:false;text-align:right;desc:time spend in suggest"); table.addCell("suggest.total", "alias:suto,suggestTotal;default:false;text-align:right;desc:number of suggest ops"); - table.addCell("bulk.total", "alias:bto,bulkTotal;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("bulk.total_operations", "alias:bto,bulkTotalOperations;default:false;text-align:right;desc:number of bulk shard ops"); table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); table.addCell("bulk.total_size_in_bytes", "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); @@ -422,7 +422,7 @@ Table buildTable(boolean fullId, RestRequest req, ClusterStateResponse state, No table.addCell(searchStats == null ? null : searchStats.getTotal().getSuggestCount()); BulkStats bulkStats = indicesStats == null ? null : indicesStats.getBulk(); - table.addCell(bulkStats == null ? null : bulkStats.getTotal()); + table.addCell(bulkStats == null ? null : bulkStats.getTotalOperations()); table.addCell(bulkStats == null ? null : bulkStats.getTotalTime()); table.addCell(bulkStats == null ? null : bulkStats.getTotalSizeInBytes()); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java index 427580735d9d9..6f9ab28b0c5ba 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java @@ -200,7 +200,7 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("warmer.total", "alias:wto,warmerTotal;default:false;text-align:right;desc:total warmer ops"); table.addCell("warmer.total_time", "alias:wtt,warmerTotalTime;default:false;text-align:right;desc:time spent in warmers"); - table.addCell("bulk.total", "alias:bto,bulkTotal;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("bulk.total_operations", "alias:bto,bulkTotalOperations;default:false;text-align:right;desc:number of bulk shard ops"); table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); table.addCell("bulk.total_size_in_bytes", "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); @@ -355,7 +355,7 @@ private Table buildTable(RestRequest request, ClusterStateResponse state, Indice table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::total)); table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::totalTime)); - table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotal)); + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalOperations)); table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalTime)); table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalSizeInBytes)); diff --git a/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java b/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java index 74101a464e8cc..d5c70f7908fc2 100644 --- a/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java +++ b/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java @@ -19,24 +19,34 @@ package org.elasticsearch.index.bulk.stats; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; - -public class BulkStatsTests extends ESTestCase { - - public void testSerialize() throws IOException { - BulkStats stats = new BulkStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); - BytesStreamOutput out = new BytesStreamOutput(); - stats.writeTo(out); - StreamInput input = out.bytes().streamInput(); - BulkStats read = new BulkStats(input); - assertEquals(-1, input.read()); - assertEquals(stats.getTotal(), read.getTotal()); - assertEquals(stats.getTotalTime(), read.getTotalTime()); - assertEquals(stats.getTotalSizeInBytes(), read.getTotalSizeInBytes()); +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class BulkStatsTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return BulkStats::new; + } + + @Override + protected BulkStats createTestInstance() { + return new BulkStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + } + + @Override + protected BulkStats mutateInstance(BulkStats instance) { + BulkStats mutateBulkStats = new BulkStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + switch (between(0, 1)) { + case 0: + break; + case 1: + mutateBulkStats.add(instance); + break; + default: + throw new AssertionError("Illegal randomisation branch"); + } + return mutateBulkStats; } public void testAddTotals() { @@ -54,9 +64,10 @@ public void testAddTotals() { } private static void assertStats(BulkStats stats, long equalTo) { - assertEquals(equalTo, stats.getTotal()); + assertEquals(equalTo, stats.getTotalOperations()); assertEquals(equalTo, stats.getTotalTimeInMillis()); assertEquals(equalTo, stats.getTotalSizeInBytes()); } + } diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index ff44d35dc6d73..c8a5ebf3f2a31 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -1095,11 +1095,11 @@ public void testBulkStats() throws Exception { } IndicesStatsResponse stats = client().admin().indices().prepareStats(index).setBulk(true).get(); - assertThat(stats.getTotal().bulk.getTotal(), equalTo(4L)); + assertThat(stats.getTotal().bulk.getTotalOperations(), equalTo(4L)); assertThat(stats.getTotal().bulk.getTotalTimeInMillis(), greaterThan(0L)); assertThat(stats.getTotal().bulk.getTotalSizeInBytes(), greaterThan(0L)); - assertThat(stats.getPrimaries().bulk.getTotal(), equalTo(2L)); + assertThat(stats.getPrimaries().bulk.getTotalOperations(), equalTo(2L)); assertThat(stats.getPrimaries().bulk.getTotalTimeInMillis(), greaterThan(0L)); assertThat(stats.getPrimaries().bulk.getTotalSizeInBytes(), greaterThan(0L)); } From d988e98fffcfeb5b97be63cef50b5847b1a470a2 Mon Sep 17 00:00:00 2001 From: zhichen Date: Sat, 15 Feb 2020 20:50:22 +0800 Subject: [PATCH 03/12] Adjust the code style --- .../elasticsearch/rest/action/cat/RestIndicesAction.java | 6 ++++-- .../elasticsearch/rest/action/cat/RestNodesAction.java | 8 +++++--- .../elasticsearch/rest/action/cat/RestShardsAction.java | 8 +++++--- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index 230de7c40254d..3a56427d829ee 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -491,10 +491,12 @@ protected Table getTableWithHeader(final RestRequest request) { "sibling:pri;alias:bto,bulkTotalOperation;default:false;text-align:right;desc:number of bulk shard ops"); table.addCell("pri.bulk.total_operations", "default:false;text-align:right;desc:number of bulk shard ops"); - table.addCell("bulk.total_time", "sibling:pri;alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); + table.addCell("bulk.total_time", + "sibling:pri;alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); table.addCell("pri.bulk.shard_bulk_time", "default:false;text-align:right;desc:time spend in shard bulk"); - table.addCell("bulk.total_size_in_bytes", "sibling:pri;alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.addCell("bulk.total_size_in_bytes", + "sibling:pri;alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); table.addCell("pri.bulk.total_size_in_bytes", "default:false;text-align:right;desc:total size in bytes of shard bulk"); table.endHeaders(); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java index 607d14e4d2f01..f21c3da93fb0a 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -39,6 +38,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.http.HttpInfo; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.cache.request.RequestCacheStats; import org.elasticsearch.index.engine.SegmentsStats; @@ -243,9 +243,11 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("suggest.time", "alias:suti,suggestTime;default:false;text-align:right;desc:time spend in suggest"); table.addCell("suggest.total", "alias:suto,suggestTotal;default:false;text-align:right;desc:number of suggest ops"); - table.addCell("bulk.total_operations", "alias:bto,bulkTotalOperations;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("bulk.total_operations", + "alias:bto,bulkTotalOperations;default:false;text-align:right;desc:number of bulk shard ops"); table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); - table.addCell("bulk.total_size_in_bytes", "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.addCell("bulk.total_size_in_bytes", + "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); table.endHeaders(); return table; diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java index 6f9ab28b0c5ba..cf977ba7db9c6 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java @@ -25,13 +25,13 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; import org.elasticsearch.common.Table; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; @@ -200,9 +200,11 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("warmer.total", "alias:wto,warmerTotal;default:false;text-align:right;desc:total warmer ops"); table.addCell("warmer.total_time", "alias:wtt,warmerTotalTime;default:false;text-align:right;desc:time spent in warmers"); - table.addCell("bulk.total_operations", "alias:bto,bulkTotalOperations;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("bulk.total_operations", + "alias:bto,bulkTotalOperations;default:false;text-align:right;desc:number of bulk shard ops"); table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); - table.addCell("bulk.total_size_in_bytes", "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.addCell("bulk.total_size_in_bytes", + "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); table.endHeaders(); return table; From bc32e4e7aa30f2f2a0ee9c58d15c93a25f3944c0 Mon Sep 17 00:00:00 2001 From: zhichen Date: Sat, 15 Feb 2020 22:02:03 +0800 Subject: [PATCH 04/12] fix xpack monitoring test case. --- .../collector/indices/IndexStatsMonitoringDoc.java | 4 ++-- .../indices/IndicesStatsMonitoringDoc.java | 4 ++-- .../collector/node/NodeStatsMonitoringDoc.java | 2 +- .../indices/IndexStatsMonitoringDocTests.java | 10 ++++++++++ .../indices/IndicesStatsMonitoringDocTests.java | 14 ++++++++++++++ .../node/NodeStatsMonitoringDocTests.java | 5 +++++ 6 files changed, 34 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java index e8c0428cf1b85..d5ba7a50b1c57 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java @@ -163,7 +163,7 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "index_stats.primaries.store.size_in_bytes", "index_stats.primaries.refresh.total_time_in_millis", "index_stats.primaries.refresh.external_total_time_in_millis", - "index_stats.primaries.bulk.total", + "index_stats.primaries.bulk.total_operations", "index_stats.primaries.bulk.total_time_in_millis", "index_stats.primaries.bulk.total_size_in_bytes", "index_stats.total.docs.count", @@ -197,7 +197,7 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "index_stats.total.store.size_in_bytes", "index_stats.total.refresh.total_time_in_millis", "index_stats.total.refresh.external_total_time_in_millis", - "index_stats.total.bulk.total", + "index_stats.total.bulk.total_operations", "index_stats.total.bulk.total_time_in_millis", "index_stats.total.bulk.total_size_in_bytes"); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java index b9fd42aede0b2..2ec414fff1a3a 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java @@ -100,7 +100,7 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "indices_stats._all.primaries.search.query_time_in_millis", "indices_stats._all.primaries.search.query_total", "indices_stats._all.primaries.store.size_in_bytes", - "indices_stats._all.primaries.bulk.total", + "indices_stats._all.primaries.bulk.total_operations", "indices_stats._all.primaries.bulk.total_time_in_millis", "indices_stats._all.primaries.bulk.total_size_in_bytes", "indices_stats._all.total.docs.count", @@ -111,7 +111,7 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "indices_stats._all.total.search.query_time_in_millis", "indices_stats._all.total.search.query_total", "indices_stats._all.total.store.size_in_bytes", - "indices_stats._all.total.bulk.total", + "indices_stats._all.total.bulk.total_operations", "indices_stats._all.total.bulk.total_time_in_millis", "indices_stats._all.total.bulk.total_size_in_bytes"); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java index 2b353e3f908a8..8188e94fe1b72 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java @@ -104,7 +104,7 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "node_stats.indices.segments.index_writer_memory_in_bytes", "node_stats.indices.segments.version_map_memory_in_bytes", "node_stats.indices.segments.fixed_bit_set_memory_in_bytes", - "node_stats.indices.bulk.total", + "node_stats.indices.bulk.total_operations", "node_stats.indices.bulk.total_time_in_millis", "node_stats.indices.bulk.total_size_in_bytes", "node_stats.fs.io_stats.total.operations", diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java index aea6fa40af201..9e3594d3c362b 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java @@ -196,6 +196,11 @@ public void testToXContent() throws IOException { + " \"evictions\": 10," + " \"hit_count\": 11," + " \"miss_count\": 12" + + " }," + + " \"bulk\": {" + + " \"total_operations\": 0," + + " \"total_time_in_millis\": 0," + + " \"total_size_in_bytes\": 0" + " }" + " }," + " \"primaries\": {" @@ -249,6 +254,11 @@ public void testToXContent() throws IOException { + " \"evictions\": 10," + " \"hit_count\": 11," + " \"miss_count\": 12" + + " }," + + " \"bulk\": {" + + " \"total_operations\": 0," + + " \"total_time_in_millis\": 0," + + " \"total_size_in_bytes\": 0" + " }" + " }" + " }" diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java index e5f9e6f56af31..9da8bffe46762 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.IndexingStats; @@ -117,6 +118,11 @@ public void testToXContent() throws IOException { + " \"search\": {" + " \"query_total\": 12," + " \"query_time_in_millis\": 14" + + " }," + + " \"bulk\": {" + + " \"total_operations\": 0," + + " \"total_time_in_millis\": 0," + + " \"total_size_in_bytes\": 0" + " }" + " }," + " \"total\": {" @@ -135,6 +141,11 @@ public void testToXContent() throws IOException { + " \"search\": {" + " \"query_total\": 18," + " \"query_time_in_millis\": 21" + + " }," + + " \"bulk\": {" + + " \"total_operations\": 0," + + " \"total_time_in_millis\": 0," + + " \"total_size_in_bytes\": 0" + " }" + " }" + " }" @@ -155,6 +166,9 @@ private CommonStats mockCommonStats() { final SearchStats.Stats searchStats = new SearchStats.Stats(6L, 7L, -1L, -1L, -1L, -1L, -1L, -1L, -1L, -1L, -1L, -1L); commonStats.getSearch().add(new SearchStats(searchStats, -1L, null)); + final BulkStats bulkStats = new BulkStats(0L, 0L, 0L); + commonStats.getBulk().add(bulkStats); + return commonStats; } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index 95a424a90ca84..652cd18538f32 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -167,6 +167,11 @@ public void testToXContent() throws IOException { + " \"evictions\": 14," + " \"hit_count\": 15," + " \"miss_count\": 16" + + " }," + + " \"bulk\": {" + + " \"total_operations\": 0," + + " \"total_time_in_millis\": 0," + + " \"total_size_in_bytes\": 0" + " }" + " }," + " \"os\": {" From ff52c8083d9bdac3a1d1a6dceee8447da856535f Mon Sep 17 00:00:00 2001 From: zhichen Date: Thu, 20 Feb 2020 14:34:52 +0800 Subject: [PATCH 05/12] Track the exponentially weighted moving average for the time and the size of shard bulk requests and modify bulk stats tests as mentioned in the review --- .../test/cat.shards/10_basic.yml | 2 + .../action/bulk/TransportShardBulkAction.java | 2 +- .../index/bulk/stats/BulkStats.java | 49 +++++++++++++++---- .../index/bulk/stats/ShardBulkStats.java | 13 ++++- .../rest/action/cat/RestIndicesAction.java | 10 +++- .../rest/action/cat/RestNodesAction.java | 3 ++ .../rest/action/cat/RestShardsAction.java | 7 ++- .../index/bulk/stats/BulkStatsTests.java | 48 ++++++++++++++---- .../indices/stats/IndexStatsIT.java | 8 ++- .../src/main/resources/monitoring-es.json | 32 ++++++++++-- .../indices/IndexStatsMonitoringDoc.java | 6 ++- .../indices/IndicesStatsMonitoringDoc.java | 6 ++- .../node/NodeStatsMonitoringDoc.java | 2 + .../indices/IndexStatsMonitoringDocTests.java | 8 ++- .../IndicesStatsMonitoringDocTests.java | 10 ++-- .../node/NodeStatsMonitoringDocTests.java | 4 +- 16 files changed, 172 insertions(+), 38 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml index f8409d7a31da3..df9d1453766dc 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml @@ -81,6 +81,8 @@ bulk.total_operations .+ \n bulk.total_time .+ \n bulk.total_size_in_bytes .+ \n + bulk.avg_time .+ \n + bulk.avg_size_in_bytes .+ \n $/ --- "Test cat shards output": diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index d581cdd718c50..c45e98f724362 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -166,9 +166,9 @@ protected void doRun() throws Exception { } assert context.isInitial(); // either completed and moved to next or reset } + primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); // We're done, there's no more operations to execute so we resolve the wrapped listener finishRequest(); - primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java index dad40c418686b..e0ce32b753bf3 100644 --- a/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java @@ -39,6 +39,8 @@ public class BulkStats implements Writeable, ToXContentFragment { private long totalOperations = 0; private long totalTimeInMillis = 0; private long totalSizeInBytes = 0; + private long avgTimeInMillis = 0; + private long avgSizeInBytes = 0; public BulkStats() { @@ -48,12 +50,16 @@ public BulkStats(StreamInput in) throws IOException { totalOperations = in.readVLong(); totalTimeInMillis = in.readVLong(); totalSizeInBytes = in.readVLong(); + avgTimeInMillis = in.readVLong(); + avgSizeInBytes = in.readVLong(); } - public BulkStats(long totalOperations, long totalTimeInMillis, long totalSizeInBytes) { + public BulkStats(long totalOperations, long totalTimeInMillis, long totalSizeInBytes, long avgTimeInMillis, long avgSizeInBytes) { this.totalOperations = totalOperations; this.totalTimeInMillis = totalTimeInMillis; this.totalSizeInBytes = totalSizeInBytes; + this.avgTimeInMillis = avgTimeInMillis; + this.avgSizeInBytes = avgSizeInBytes; } public void add(BulkStats bulkStats) { @@ -64,6 +70,14 @@ public void addTotals(BulkStats bulkStats) { if (bulkStats == null) { return; } + if (this.totalOperations > 0 || bulkStats.totalOperations > 0) { + this.avgTimeInMillis = + (avgTimeInMillis * totalOperations + bulkStats.avgTimeInMillis * bulkStats.totalOperations) / (totalOperations + + bulkStats.totalOperations); + this.avgSizeInBytes = + (avgSizeInBytes * totalOperations + bulkStats.avgSizeInBytes * bulkStats.totalOperations) / (totalOperations + + bulkStats.totalOperations); + } this.totalOperations += bulkStats.totalOperations; this.totalTimeInMillis += bulkStats.totalTimeInMillis; this.totalSizeInBytes += bulkStats.totalSizeInBytes; @@ -81,23 +95,37 @@ public TimeValue getTotalTime() { return new TimeValue(totalTimeInMillis); } + public TimeValue getAvgTime() { + return new TimeValue(avgTimeInMillis); + } + public long getTotalTimeInMillis() { return totalTimeInMillis; } - @Override - public void writeTo(StreamOutput out) throws IOException { + public long getAvgTimeInMillis() { + return avgTimeInMillis; + } + + public long getAvgSizeInBytes() { + return avgSizeInBytes; + } + + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(totalOperations); out.writeVLong(totalTimeInMillis); out.writeVLong(totalSizeInBytes); + out.writeVLong(avgTimeInMillis); + out.writeVLong(avgSizeInBytes); } - @Override - public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(Fields.BULK); builder.field(Fields.TOTAL_OPERATIONS, totalOperations); builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime()); builder.field(Fields.TOTAL_SIZE_IN_BYTES, totalSizeInBytes); + builder.humanReadableField(Fields.AVG_TIME_IN_MILLIS, Fields.AVG_TIME, getAvgTime()); + builder.field(Fields.AVG_SIZE_IN_BYTES, avgSizeInBytes); builder.endObject(); return builder; } @@ -112,22 +140,25 @@ public boolean equals(Object o) { } final BulkStats that = (BulkStats) o; - return Objects.equals(this.totalOperations, that.totalOperations) - && Objects.equals(this.totalTimeInMillis, that.totalTimeInMillis) - && Objects.equals(this.totalSizeInBytes, that.totalSizeInBytes); + return Objects.equals(this.totalOperations, that.totalOperations) && Objects.equals(this.totalTimeInMillis, that.totalTimeInMillis) + && Objects.equals(this.totalSizeInBytes, that.totalSizeInBytes) && Objects.equals(this.avgTimeInMillis, that.avgTimeInMillis) + && Objects.equals(this.avgSizeInBytes, that.avgSizeInBytes); } @Override public int hashCode() { - return Objects.hash(totalOperations, totalTimeInMillis, totalSizeInBytes); + return Objects.hash(totalOperations, totalTimeInMillis, totalSizeInBytes, avgTimeInMillis, avgSizeInBytes); } static final class Fields { static final String BULK = "bulk"; static final String TOTAL_OPERATIONS = "total_operations"; static final String TOTAL_TIME = "total_time"; + static final String AVG_TIME = "avg_time"; static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes"; + static final String AVG_TIME_IN_MILLIS = "avg_time_in_millis"; + static final String AVG_SIZE_IN_BYTES = "avg_size_in_bytes"; } } diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java index 8f1869c1ec289..223dcbbf933a9 100644 --- a/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.bulk.stats; +import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.index.shard.IndexShard; @@ -32,6 +33,7 @@ public class ShardBulkStats implements BulkOperationListener { private final StatsHolder totalStats = new StatsHolder(); + private static final double ALPHA = 0.1; public BulkStats stats() { return totalStats.stats(); @@ -41,14 +43,23 @@ public BulkStats stats() { public void afterBulk(long shardBulkSizeInBytes, long tookInNanos) { totalStats.totalSizeInBytes.inc(shardBulkSizeInBytes); totalStats.shardBulkMetric.inc(tookInNanos); + totalStats.timeInMillis.addValue(tookInNanos); + totalStats.sizeInBytes.addValue(tookInNanos); } static final class StatsHolder { final MeanMetric shardBulkMetric = new MeanMetric(); final CounterMetric totalSizeInBytes = new CounterMetric(); + final ExponentiallyWeightedMovingAverage timeInMillis = new ExponentiallyWeightedMovingAverage(ALPHA, 0); + final ExponentiallyWeightedMovingAverage sizeInBytes = new ExponentiallyWeightedMovingAverage(ALPHA, 0); BulkStats stats() { - return new BulkStats(shardBulkMetric.count(), TimeUnit.NANOSECONDS.toMillis(shardBulkMetric.sum()), totalSizeInBytes.count()); + return new BulkStats( + shardBulkMetric.count(), + TimeUnit.NANOSECONDS.toMillis(shardBulkMetric.sum()), + totalSizeInBytes.count(), + (long) timeInMillis.getAverage(), + (long) sizeInBytes.getAverage()); } } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index 223ef2abeeffe..815d3bed8d564 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -495,12 +495,20 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("bulk.total_time", "sibling:pri;alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); - table.addCell("pri.bulk.shard_bulk_time", "default:false;text-align:right;desc:time spend in shard bulk"); + table.addCell("pri.bulk.total_time", "default:false;text-align:right;desc:time spend in shard bulk"); table.addCell("bulk.total_size_in_bytes", "sibling:pri;alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); table.addCell("pri.bulk.total_size_in_bytes", "default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.addCell("bulk.avg_time", + "sibling:pri;alias:bati,bulkAvgTime;default:false;text-align:right;desc:average time spend in shard bulk"); + table.addCell("pri.bulk.avg_time", "default:false;text-align:right;desc:average time spend in shard bulk"); + + table.addCell("bulk.avg_size_in_bytes", + "sibling:pri;alias:basi,bulkAvgSizeInBytes;default:false;text-align:right;desc:average size in bytes of shard bulk"); + table.addCell("pri.bulk.avg_size_in_bytes", "default:false;text-align:right;desc:average size in bytes of shard bulk"); + table.endHeaders(); return table; } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java index 23f9c4a1f733f..d12a6a1b63aa7 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java @@ -249,6 +249,9 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); table.addCell("bulk.total_size_in_bytes", "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.addCell("bulk.avg_time", "alias:bati,bulkAvgTime;default:false;text-align:right;desc:average time spend in shard bulk"); + table.addCell("bulk.avg_size_in_bytes", + "alias:basi,bulkAvgSizeInBytes;default:false;text-align:right;desc:average size in bytes of shard bulk"); table.endHeaders(); return table; diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java index d0d615b092afb..033f5f624b8ca 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java @@ -206,7 +206,9 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); table.addCell("bulk.total_size_in_bytes", "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); - + table.addCell("bulk.avg_time", "alias:bati,bulkAvgTime;default:false;text-align:right;desc:average time spend in shard bulk"); + table.addCell("bulk.avg_size_in_bytes", + "alias:basi,bulkAvgSizeInBytes;default:false;text-align:right;desc:avg size in bytes of shard bulk"); table.endHeaders(); return table; } @@ -361,7 +363,8 @@ private Table buildTable(RestRequest request, ClusterStateResponse state, Indice table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalOperations)); table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalTime)); table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalSizeInBytes)); - + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getAvgTimeInMillis)); + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getAvgSizeInBytes)); table.endRow(); } diff --git a/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java b/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java index d5c70f7908fc2..5086b8dd9bd09 100644 --- a/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java +++ b/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; public class BulkStatsTests extends AbstractWireSerializingTestCase { @@ -31,27 +32,54 @@ protected Writeable.Reader instanceReader() { @Override protected BulkStats createTestInstance() { - return new BulkStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + return new BulkStats(randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong()); } @Override protected BulkStats mutateInstance(BulkStats instance) { - BulkStats mutateBulkStats = new BulkStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); - switch (between(0, 1)) { + switch (between(0, 4)) { case 0: - break; + return new BulkStats(randomValueOtherThan(instance.getTotalOperations(), ESTestCase::randomNonNegativeLong), + instance.getTotalTimeInMillis(), + instance.getTotalSizeInBytes(), + instance.getAvgTimeInMillis(), + instance.getAvgTimeInMillis()); case 1: - mutateBulkStats.add(instance); - break; + return new BulkStats(instance.getTotalOperations(), + randomValueOtherThan(instance.getTotalTimeInMillis(), ESTestCase::randomNonNegativeLong), + instance.getTotalSizeInBytes(), + instance.getAvgTimeInMillis(), + instance.getAvgTimeInMillis()); + case 2: + return new BulkStats(instance.getTotalOperations(), + instance.getTotalTimeInMillis(), + randomValueOtherThan(instance.getTotalSizeInBytes(), ESTestCase::randomNonNegativeLong), + instance.getAvgTimeInMillis(), + instance.getAvgTimeInMillis()); + case 3: + return new BulkStats(instance.getTotalOperations(), + instance.getTotalTimeInMillis(), + instance.getTotalSizeInBytes(), + randomValueOtherThan(instance.getAvgTimeInMillis(), ESTestCase::randomNonNegativeLong), + instance.getAvgTimeInMillis()); + case 4: + return new BulkStats(instance.getTotalOperations(), + instance.getTotalTimeInMillis(), + instance.getTotalSizeInBytes(), + instance.getAvgTimeInMillis(), + randomValueOtherThan(instance.getAvgSizeInBytes(), ESTestCase::randomNonNegativeLong)); default: - throw new AssertionError("Illegal randomisation branch"); + throw new AssertionError("failure, got illegal switch case"); } - return mutateBulkStats; } public void testAddTotals() { - BulkStats bulkStats1 = new BulkStats(1, 1, 1); - BulkStats bulkStats2 = new BulkStats(1, 1, 1); + BulkStats bulkStats1 = new BulkStats(1, 1, 1, 2, 2); + BulkStats bulkStats2 = new BulkStats(1, 1, 1, 2, 2); // adding these two bulk stats and checking stats are correct bulkStats1.add(bulkStats2); diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index c8a5ebf3f2a31..5933b50f7df7c 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -1083,13 +1083,13 @@ public void testBulkStats() throws Exception { .put("index.number_of_replicas", 1))); ensureGreen(); final BulkRequest request1 = new BulkRequest(); - for (int i = 0; i < 500; ++i) { + for (int i = 0; i < 20; ++i) { request1.add(new IndexRequest(index).source(Collections.singletonMap("key", "value" + i))) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); } BulkResponse bulkResponse = client().bulk(request1).get(); assertThat(bulkResponse.hasFailures(), equalTo(false)); - assertThat(bulkResponse.getItems().length, equalTo(500)); + assertThat(bulkResponse.getItems().length, equalTo(20)); for (BulkItemResponse bulkItemResponse : bulkResponse) { assertThat(bulkItemResponse.getIndex(), equalTo(index)); } @@ -1098,10 +1098,14 @@ public void testBulkStats() throws Exception { assertThat(stats.getTotal().bulk.getTotalOperations(), equalTo(4L)); assertThat(stats.getTotal().bulk.getTotalTimeInMillis(), greaterThan(0L)); assertThat(stats.getTotal().bulk.getTotalSizeInBytes(), greaterThan(0L)); + assertThat(stats.getTotal().bulk.getAvgTimeInMillis(), greaterThan(0L)); + assertThat(stats.getTotal().bulk.getAvgSizeInBytes(), greaterThan(0L)); assertThat(stats.getPrimaries().bulk.getTotalOperations(), equalTo(2L)); assertThat(stats.getPrimaries().bulk.getTotalTimeInMillis(), greaterThan(0L)); assertThat(stats.getPrimaries().bulk.getTotalSizeInBytes(), greaterThan(0L)); + assertThat(stats.getPrimaries().bulk.getAvgTimeInMillis(), greaterThan(0L)); + assertThat(stats.getPrimaries().bulk.getAvgSizeInBytes(), greaterThan(0L)); } /** diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index 88ca0f5beba02..f74c8778c61c8 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -87,7 +87,7 @@ }, "bulk": { "properties": { - "total": { + "total_operations": { "type": "long" }, "total_time_in_millis": { @@ -95,6 +95,12 @@ }, "total_size_in_bytes": { "type": "long" + }, + "avg_time_in_millis": { + "type": "long" + }, + "avg_size_in_bytes": { + "type": "long" } } } @@ -131,7 +137,7 @@ }, "bulk": { "properties": { - "total": { + "total_operations": { "type": "long" }, "total_time_in_millis": { @@ -139,6 +145,12 @@ }, "total_size_in_bytes": { "type": "long" + }, + "avg_time_in_millis": { + "type": "long" + }, + "avg_size_in_bytes": { + "type": "long" } } } @@ -287,7 +299,7 @@ }, "bulk": { "properties": { - "total": { + "total_operations": { "type": "long" }, "total_time_in_millis": { @@ -295,6 +307,12 @@ }, "total_size_in_bytes": { "type": "long" + }, + "avg_time_in_millis": { + "type": "long" + }, + "avg_size_in_bytes": { + "type": "long" } } } @@ -434,7 +452,7 @@ }, "bulk": { "properties": { - "total": { + "total_operations": { "type": "long" }, "total_time_in_millis": { @@ -442,6 +460,12 @@ }, "total_size_in_bytes": { "type": "long" + }, + "avg_time_in_millis": { + "type": "long" + }, + "avg_size_in_bytes": { + "type": "long" } } } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java index d5ba7a50b1c57..0a2338ed722a1 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java @@ -166,6 +166,8 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "index_stats.primaries.bulk.total_operations", "index_stats.primaries.bulk.total_time_in_millis", "index_stats.primaries.bulk.total_size_in_bytes", + "index_stats.primaries.bulk.avg_time_in_millis", + "index_stats.primaries.bulk.avg_size_in_bytes", "index_stats.total.docs.count", "index_stats.total.fielddata.memory_size_in_bytes", "index_stats.total.fielddata.evictions", @@ -199,5 +201,7 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "index_stats.total.refresh.external_total_time_in_millis", "index_stats.total.bulk.total_operations", "index_stats.total.bulk.total_time_in_millis", - "index_stats.total.bulk.total_size_in_bytes"); + "index_stats.total.bulk.total_size_in_bytes", + "index_stats.total.bulk.avg_time_in_millis", + "index_stats.total.bulk.avg_size_in_bytes"); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java index 2ec414fff1a3a..86b9472532b9c 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java @@ -103,6 +103,8 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "indices_stats._all.primaries.bulk.total_operations", "indices_stats._all.primaries.bulk.total_time_in_millis", "indices_stats._all.primaries.bulk.total_size_in_bytes", + "indices_stats._all.primaries.bulk.avg_time_in_millis", + "indices_stats._all.primaries.bulk.avg_size_in_bytes", "indices_stats._all.total.docs.count", "indices_stats._all.total.indexing.index_time_in_millis", "indices_stats._all.total.indexing.index_total", @@ -113,5 +115,7 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "indices_stats._all.total.store.size_in_bytes", "indices_stats._all.total.bulk.total_operations", "indices_stats._all.total.bulk.total_time_in_millis", - "indices_stats._all.total.bulk.total_size_in_bytes"); + "indices_stats._all.total.bulk.total_size_in_bytes", + "indices_stats._all.total.bulk.avg_time_in_millis", + "indices_stats._all.total.bulk.avg_size_in_bytes"); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java index 8188e94fe1b72..beb5fa220f11e 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java @@ -107,6 +107,8 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "node_stats.indices.bulk.total_operations", "node_stats.indices.bulk.total_time_in_millis", "node_stats.indices.bulk.total_size_in_bytes", + "node_stats.indices.bulk.avg_time_in_millis", + "node_stats.indices.bulk.avg_size_in_bytes", "node_stats.fs.io_stats.total.operations", "node_stats.fs.io_stats.total.read_operations", "node_stats.fs.io_stats.total.write_operations", diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java index 9e3594d3c362b..092832d81f68b 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java @@ -200,7 +200,9 @@ public void testToXContent() throws IOException { + " \"bulk\": {" + " \"total_operations\": 0," + " \"total_time_in_millis\": 0," - + " \"total_size_in_bytes\": 0" + + " \"total_size_in_bytes\": 0," + + " \"avg_time_in_millis\": 0," + + " \"avg_size_in_bytes\": 0" + " }" + " }," + " \"primaries\": {" @@ -258,7 +260,9 @@ public void testToXContent() throws IOException { + " \"bulk\": {" + " \"total_operations\": 0," + " \"total_time_in_millis\": 0," - + " \"total_size_in_bytes\": 0" + + " \"total_size_in_bytes\": 0," + + " \"avg_time_in_millis\": 0," + + " \"avg_size_in_bytes\": 0" + " }" + " }" + " }" diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java index 9da8bffe46762..038da15a8eadd 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java @@ -122,7 +122,9 @@ public void testToXContent() throws IOException { + " \"bulk\": {" + " \"total_operations\": 0," + " \"total_time_in_millis\": 0," - + " \"total_size_in_bytes\": 0" + + " \"total_size_in_bytes\": 0," + + " \"avg_time_in_millis\": 0," + + " \"avg_size_in_bytes\": 0" + " }" + " }," + " \"total\": {" @@ -145,7 +147,9 @@ public void testToXContent() throws IOException { + " \"bulk\": {" + " \"total_operations\": 0," + " \"total_time_in_millis\": 0," - + " \"total_size_in_bytes\": 0" + + " \"total_size_in_bytes\": 0," + + " \"avg_time_in_millis\": 0," + + " \"avg_size_in_bytes\": 0" + " }" + " }" + " }" @@ -166,7 +170,7 @@ private CommonStats mockCommonStats() { final SearchStats.Stats searchStats = new SearchStats.Stats(6L, 7L, -1L, -1L, -1L, -1L, -1L, -1L, -1L, -1L, -1L, -1L); commonStats.getSearch().add(new SearchStats(searchStats, -1L, null)); - final BulkStats bulkStats = new BulkStats(0L, 0L, 0L); + final BulkStats bulkStats = new BulkStats(0L, 0L, 0L, 0L, 0L); commonStats.getBulk().add(bulkStats); return commonStats; diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index 652cd18538f32..28931b249696d 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -171,7 +171,9 @@ public void testToXContent() throws IOException { + " \"bulk\": {" + " \"total_operations\": 0," + " \"total_time_in_millis\": 0," - + " \"total_size_in_bytes\": 0" + + " \"total_size_in_bytes\": 0," + + " \"avg_time_in_millis\": 0," + + " \"avg_size_in_bytes\": 0" + " }" + " }," + " \"os\": {" From bbbd37062b9653c7d555ecf700d269eadce99d1c Mon Sep 17 00:00:00 2001 From: zhichen Date: Thu, 20 Feb 2020 16:31:52 +0800 Subject: [PATCH 06/12] Track the exponentially weighted moving average for the time and the size of shard bulk requests and modify bulk stats tests as mentioned in the review --- .../index/bulk/stats/ShardBulkStats.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java index 223dcbbf933a9..a5f10ac319def 100644 --- a/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java @@ -43,23 +43,31 @@ public BulkStats stats() { public void afterBulk(long shardBulkSizeInBytes, long tookInNanos) { totalStats.totalSizeInBytes.inc(shardBulkSizeInBytes); totalStats.shardBulkMetric.inc(tookInNanos); - totalStats.timeInMillis.addValue(tookInNanos); - totalStats.sizeInBytes.addValue(tookInNanos); + if (totalStats.timeInMillis == null) { + totalStats.timeInMillis = new ExponentiallyWeightedMovingAverage(ALPHA, tookInNanos); + } else { + totalStats.timeInMillis.addValue(tookInNanos); + } + if (totalStats.sizeInBytes == null) { + totalStats.sizeInBytes = new ExponentiallyWeightedMovingAverage(ALPHA, shardBulkSizeInBytes); + } else { + totalStats.sizeInBytes.addValue(shardBulkSizeInBytes); + } } static final class StatsHolder { final MeanMetric shardBulkMetric = new MeanMetric(); final CounterMetric totalSizeInBytes = new CounterMetric(); - final ExponentiallyWeightedMovingAverage timeInMillis = new ExponentiallyWeightedMovingAverage(ALPHA, 0); - final ExponentiallyWeightedMovingAverage sizeInBytes = new ExponentiallyWeightedMovingAverage(ALPHA, 0); + ExponentiallyWeightedMovingAverage timeInMillis; + ExponentiallyWeightedMovingAverage sizeInBytes; BulkStats stats() { return new BulkStats( shardBulkMetric.count(), TimeUnit.NANOSECONDS.toMillis(shardBulkMetric.sum()), totalSizeInBytes.count(), - (long) timeInMillis.getAverage(), - (long) sizeInBytes.getAverage()); + timeInMillis == null ? 0L : TimeUnit.NANOSECONDS.toMillis((long) timeInMillis.getAverage()), + sizeInBytes == null ? 0L : (long) sizeInBytes.getAverage()); } } } From 7419f46da0b351b5240e5e9ff660b15fb0de1dd5 Mon Sep 17 00:00:00 2001 From: zhichen Date: Thu, 20 Feb 2020 17:35:53 +0800 Subject: [PATCH 07/12] Track the exponentially weighted moving average for the time and the size of shard bulk requests and modify bulk stats tests as mentioned in the review --- .../elasticsearch/rest/action/cat/RestIndicesAction.java | 6 ++++++ .../org/elasticsearch/rest/action/cat/RestNodesAction.java | 2 ++ .../org/elasticsearch/rest/action/cat/RestShardsAction.java | 2 +- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index 815d3bed8d564..f087f6a526390 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -777,6 +777,12 @@ Table buildTable(final RestRequest request, table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotalSizeInBytes()); table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotalSizeInBytes()); + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getAvgTime()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getAvgTime()); + + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getAvgSizeInBytes()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getAvgSizeInBytes()); + table.endRow(); }); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java index d12a6a1b63aa7..5afd404f98ae2 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java @@ -431,6 +431,8 @@ Table buildTable(boolean fullId, RestRequest req, ClusterStateResponse state, No table.addCell(bulkStats == null ? null : bulkStats.getTotalOperations()); table.addCell(bulkStats == null ? null : bulkStats.getTotalTime()); table.addCell(bulkStats == null ? null : bulkStats.getTotalSizeInBytes()); + table.addCell(bulkStats == null ? null : bulkStats.getAvgTime()); + table.addCell(bulkStats == null ? null : bulkStats.getAvgSizeInBytes()); table.endRow(); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java index 033f5f624b8ca..e2af0670e2a71 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java @@ -363,7 +363,7 @@ private Table buildTable(RestRequest request, ClusterStateResponse state, Indice table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalOperations)); table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalTime)); table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalSizeInBytes)); - table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getAvgTimeInMillis)); + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getAvgTime)); table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getAvgSizeInBytes)); table.endRow(); } From ad28121a1526cb612ce2d37b6a571b12ef212418 Mon Sep 17 00:00:00 2001 From: zhichen Date: Thu, 20 Feb 2020 23:54:25 +0800 Subject: [PATCH 08/12] fix bwc cat shard case. --- .../elasticsearch/action/admin/indices/stats/CommonStats.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java index ade4ddb316da9..9d8f80a9ecbc8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java @@ -255,7 +255,7 @@ public CommonStats(StreamInput in) throws IOException { translog = in.readOptionalWriteable(TranslogStats::new); requestCache = in.readOptionalWriteable(RequestCacheStats::new); recoveryStats = in.readOptionalWriteable(RecoveryStats::new); - if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { bulk = in.readOptionalWriteable(BulkStats::new); } } @@ -278,7 +278,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(translog); out.writeOptionalWriteable(requestCache); out.writeOptionalWriteable(recoveryStats); - if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { out.writeOptionalWriteable(bulk); } } From b32acaa8b5ee759b26a4e40c799fb4503ed261af Mon Sep 17 00:00:00 2001 From: zhichen Date: Fri, 21 Feb 2020 00:26:15 +0800 Subject: [PATCH 09/12] fix bwc cat shard case. --- .../elasticsearch/action/admin/indices/stats/CommonStats.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java index 9d8f80a9ecbc8..ade4ddb316da9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java @@ -255,7 +255,7 @@ public CommonStats(StreamInput in) throws IOException { translog = in.readOptionalWriteable(TranslogStats::new); requestCache = in.readOptionalWriteable(RequestCacheStats::new); recoveryStats = in.readOptionalWriteable(RecoveryStats::new); - if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { bulk = in.readOptionalWriteable(BulkStats::new); } } @@ -278,7 +278,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(translog); out.writeOptionalWriteable(requestCache); out.writeOptionalWriteable(recoveryStats); - if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { out.writeOptionalWriteable(bulk); } } From 5e07ed5177ea4ca3d0b74c9899bd3fe993341722 Mon Sep 17 00:00:00 2001 From: zhichen Date: Fri, 21 Feb 2020 10:13:16 +0800 Subject: [PATCH 10/12] fix bwc cat shard case. --- .../main/resources/rest-api-spec/test/cat.shards/10_basic.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml index df9d1453766dc..46d02ffcdb4a2 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml @@ -1,8 +1,8 @@ --- "Help": - skip: - version: " - 7.1.99" - reason: external refresh stats were added in 7.2.0 + version: " - 7.9.99" + reason: bulk stats were added in 8.0.0 - do: cat.shards: help: true From 2a7e06e0bf8275d538f7e06bd74fcba828db9f57 Mon Sep 17 00:00:00 2001 From: zhichen Date: Fri, 21 Feb 2020 12:25:16 +0800 Subject: [PATCH 11/12] Missing avg time in monitoring-es.json --- x-pack/plugin/core/src/main/resources/monitoring-es.json | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index f74c8778c61c8..9517c4a49c88b 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -639,7 +639,7 @@ }, "bulk": { "properties": { - "total": { + "total_operations": { "type": "long" }, "total_time_in_millis": { @@ -647,6 +647,12 @@ }, "total_size_in_bytes": { "type": "long" + }, + "avg_time_in_millis": { + "type": "long" + }, + "avg_size_in_bytes": { + "type": "long" } } } From 7a5e64c48a22bceebd63bffa7f9bbbf670ba56b1 Mon Sep 17 00:00:00 2001 From: zhichen Date: Tue, 25 Feb 2020 01:55:31 +0800 Subject: [PATCH 12/12] initialize ewma to new ExponentiallyWeightedMovingAverage(ALPHA, 0.0) --- .../index/bulk/stats/ShardBulkStats.java | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java index a5f10ac319def..db09a98319877 100644 --- a/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java @@ -43,31 +43,23 @@ public BulkStats stats() { public void afterBulk(long shardBulkSizeInBytes, long tookInNanos) { totalStats.totalSizeInBytes.inc(shardBulkSizeInBytes); totalStats.shardBulkMetric.inc(tookInNanos); - if (totalStats.timeInMillis == null) { - totalStats.timeInMillis = new ExponentiallyWeightedMovingAverage(ALPHA, tookInNanos); - } else { - totalStats.timeInMillis.addValue(tookInNanos); - } - if (totalStats.sizeInBytes == null) { - totalStats.sizeInBytes = new ExponentiallyWeightedMovingAverage(ALPHA, shardBulkSizeInBytes); - } else { - totalStats.sizeInBytes.addValue(shardBulkSizeInBytes); - } + totalStats.timeInMillis.addValue(tookInNanos); + totalStats.sizeInBytes.addValue(shardBulkSizeInBytes); } static final class StatsHolder { final MeanMetric shardBulkMetric = new MeanMetric(); final CounterMetric totalSizeInBytes = new CounterMetric(); - ExponentiallyWeightedMovingAverage timeInMillis; - ExponentiallyWeightedMovingAverage sizeInBytes; + ExponentiallyWeightedMovingAverage timeInMillis = new ExponentiallyWeightedMovingAverage(ALPHA, 0.0); + ExponentiallyWeightedMovingAverage sizeInBytes = new ExponentiallyWeightedMovingAverage(ALPHA, 0.0); BulkStats stats() { return new BulkStats( shardBulkMetric.count(), TimeUnit.NANOSECONDS.toMillis(shardBulkMetric.sum()), totalSizeInBytes.count(), - timeInMillis == null ? 0L : TimeUnit.NANOSECONDS.toMillis((long) timeInMillis.getAverage()), - sizeInBytes == null ? 0L : (long) sizeInBytes.getAverage()); + TimeUnit.NANOSECONDS.toMillis((long) timeInMillis.getAverage()), + (long) sizeInBytes.getAverage()); } } }