diff --git a/docs/reference/cat/transforms.asciidoc b/docs/reference/cat/transforms.asciidoc index 947bfdd81aaf0..20752761ca13d 100644 --- a/docs/reference/cat/transforms.asciidoc +++ b/docs/reference/cat/transforms.asciidoc @@ -57,35 +57,40 @@ specified columns. Valid columns are: `changes_last_detection_time`, `cldt`::: +(Default) include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=checkpointing-changes-last-detected-at] +`checkpoint`, `cp`::: +(Default) +The sequence number for the checkpoint. + `checkpoint_duration_time_exp_avg`, `cdtea`, `checkpointTimeExpAvg`::: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=exponential-avg-checkpoint-duration-ms] -`create_time`, `ct`, `createTime`::: +`checkpoint_progress`, `c`, `checkpointProgress`::: (Default) +The progress of the next checkpoint that is currently in progress. + +`create_time`, `ct`, `createTime`::: The time the {transform} was created. `description`, `d`::: -(Default) The description of the {transform}. `dest_index`, `di`, `destIndex`::: -(Default) include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-index] `documents_indexed`, `doci`::: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-indexed] `docs_per_second`, `dps`::: -(Default) include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-docs-per-second] `documents_processed`, `docp`::: +(Default) include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-processed] `frequency`, `f`::: -(Default) include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=frequency] `id`::: @@ -104,15 +109,17 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=index-total] `indexed_documents_exp_avg`, `idea`::: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=exponential-avg-documents-indexed] -`max_page_search_size`, `mpsz`::: +`last_search_time`, `lst`, `lastSearchTime`::: (Default) +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=checkpointing-last-search-time] + +`max_page_search_size`, `mpsz`::: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size] `pages_processed`, `pp`::: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pages-processed] `pipeline`, `p`::: -(Default) include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-pipeline] `processed_documents_exp_avg`, `pdea`::: @@ -142,14 +149,12 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=source-index-transform include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=state-transform] `transform_type`, `tt`::: -(Default) Indicates the type of {transform}: `batch` or `continuous`. `trigger_count`, `tc`::: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=trigger-count] `version`, `v`::: -(Default) The version of {es} that existed on the node when the {transform} was created. @@ -179,16 +184,11 @@ GET /_cat/transforms?v=true&format=json [ { "id" : "ecommerce_transform", - "create_time" : "2020-03-20T20:31:25.077Z", - "version" : "7.7.0", - "source_index" : "kibana_sample_data_ecommerce", - "dest_index" : "kibana_sample_data_ecommerce_transform", - "pipeline" : null, - "description" : "Maximum priced ecommerce data by customer_id in Asia", - "transform_type" : "continuous", - "frequency" : "5m", - "max_page_search_size" : "500", - "state" : "STARTED" + "state" : "started", + "checkpoint" : "1", + "documents_processed" : "705", + "checkpoint_progress" : "100.00", + "changes_last_detection_time" : null } ] ---- diff --git a/docs/reference/rest-api/common-parms.asciidoc b/docs/reference/rest-api/common-parms.asciidoc index a61111b857697..4f5096d1bc0c9 100644 --- a/docs/reference/rest-api/common-parms.asciidoc +++ b/docs/reference/rest-api/common-parms.asciidoc @@ -102,6 +102,11 @@ tag::checkpointing-changes-last-detected-at[] The timestamp when changes were last detected in the source indices. end::checkpointing-changes-last-detected-at[] +tag::checkpointing-last-search-time[] +The timestamp of the last search in the source indices. This field is only +shown if the transform is running. +end::checkpointing-last-search-time[] + tag::cluster-health-status[] (string) Health status of the cluster, based on the state of its primary and replica diff --git a/docs/reference/transform/apis/get-transform-stats.asciidoc b/docs/reference/transform/apis/get-transform-stats.asciidoc index 75135801d892d..96222a0cc6043 100644 --- a/docs/reference/transform/apis/get-transform-stats.asciidoc +++ b/docs/reference/transform/apis/get-transform-stats.asciidoc @@ -108,6 +108,10 @@ was created. ===== //End checkpointing.last +`last_search_time`::: +(date) +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=checkpointing-last-search-time] + //Begin checkpointing.next `next`::: (object) Contains statistics about the next checkpoint that is currently in diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java index b666707bec365..c5fcf11904f2e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java @@ -289,6 +289,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeEnum(this); } + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + public String value() { return name().toLowerCase(Locale.ROOT); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_cat_apis.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_cat_apis.yml index 32ab91f7cb800..935efe1e7b4ff 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_cat_apis.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_cat_apis.yml @@ -54,6 +54,57 @@ setup: } } + - do: + index: + index: airline-data + id: 1 + body: > + { + "time": "2017-02-18T00:00:00Z", + "airline": "airline1", + "responsetime": 1.0, + "event_rate": 5 + } + + - do: + index: + index: airline-data + id: 2 + body: > + { + "time": "2017-02-18T00:30:00Z", + "airline": "airline1", + "responsetime": 1.0, + "event_rate": 6 + } + + - do: + index: + index: airline-data + id: 3 + body: > + { + "time": "2017-02-18T01:00:00Z", + "airline": "airline2", + "responsetime": 42.0, + "event_rate": 8 + } + + - do: + index: + index: airline-data + id: 4 + body: > + { + "time": "2017-02-18T01:01:00Z", + "airline": "airline1", + "responsetime": 42.0, + "event_rate": 7 + } + + - do: + indices.refresh: + index: airline-data --- teardown: - do: @@ -70,9 +121,9 @@ teardown: transform_id: "airline-transform-*" - match: $body: | - /^ #id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state \n - (airline\-transform\-latest \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data \s+ airline-data-latest \s+ \s+ batch \s+ 1m \s+ 500 \s+ - \s+ STOPPED \n)+ - (airline\-transform\-stats \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data \s+ airline-data-by-airline \s+ \s+ batch \s+ 1m \s+ 500 \s+ - \s+ STOPPED \n)+ $/ + /^ #id \s+ state \s+ checkpoint \s+ documents_processed \s+ checkpoint_progress \s+ last_search_time \s+ changes_last_detection_time \n + (airline\-transform\-latest \s+ stopped \s+ 0 \s+ 0 \s+ \s+ \s+ \n)+ + (airline\-transform\-stats \s+ stopped \s+ 0 \s+ 0 \s+ \s+ \s+ \n)+ $/ --- "Test cat transform stats with column selection": @@ -84,9 +135,8 @@ teardown: - match: $body: | /^ id \s+ version \s+ source_index \s+ dest_index \s+ search_total \s+ index_total \s+ docp \s+ cdtea \s+ indexed_documents_exp_avg \n - (airline\-transform-latest \s+ [^\s]+ \s+ airline-data \s+ airline-data-latest \s+ 0 \s+ 0 \s+ 0 \s+ 0.0 \s+ 0.0 \n)+ - (airline\-transform-stats \s+ [^\s]+ \s+ airline-data \s+ airline-data-by-airline \s+ 0 \s+ 0 \s+ 0 \s+ 0.0 \s+ 0.0 \n)+ $/ - + (airline\-transform-latest \s+ [^\s]+ \s+ airline-data \s+ airline-data-latest \s+ 0 \s+ 0 \s+ 0 \s+ 0.00 \s+ 0.00 \n)+ + (airline\-transform-stats \s+ [^\s]+ \s+ airline-data \s+ airline-data-by-airline \s+ 0 \s+ 0 \s+ 0 \s+ 0.00 \s+ 0.00 \n)+ $/ --- "Test cat transform stats with batch transform": @@ -97,7 +147,7 @@ teardown: { "source": { "index": ["airline-data", "airline-data-other"], - "query": {"bool":{"filter":{"term":{"airline":"foo"}}}} + "query": {"bool":{"filter":{"term":{"airline":"airline1"}}}} }, "dest": { "index": "airline-data-by-airline-batch" }, "pivot": { @@ -112,8 +162,30 @@ teardown: v: true - match: $body: | - /^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state \n - (airline\-transform\-batch \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-batch \s+ \s+ description \s+ batch \s+ 1m \s+ 500 \s+ - \s+ STOPPED \n)+ $/ + /^ id \s+ state \s+ checkpoint \s+ documents_processed \s+ checkpoint_progress \s+ last_search_time \s+ changes_last_detection_time \n + (airline\-transform\-batch \s+ stopped \s+ 0 \s+ 0 \s+ \s+ \s+ \n)+ $/ + - do: + transform.start_transform: + transform_id: "airline-transform-batch" + - match: { acknowledged: true } + + - do: + transform.stop_transform: + wait_for_checkpoint: true + transform_id: "airline-transform-batch" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + cat.transforms: + transform_id: "airline-transform-batch" + v: true + + # see gh#62204 despite wait_for_completion is true, it might still not be stopped + - match: + $body: | + /^ id \s+ state \s+ checkpoint \s+ documents_processed \s+ checkpoint_progress \s+ last_search_time \s+ changes_last_detection_time \n + (airline\-transform\-batch \s+ stop.* \s+ 1 \s+ 3 \s+ 100.00 \s+ \s+ .* \n)+ $/ - do: transform.delete_transform: transform_id: "airline-transform-batch" @@ -148,8 +220,8 @@ teardown: v: true - match: $body: | - /^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state \n - (airline\-transform\-continuous \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-continuous \s+ \s+ description \s+ continuous \s+ 10s \s+ 500 \s+ - \s+ STOPPED \n)+ $/ + /^ id \s+ state \s+ checkpoint \s+ documents_processed \s+ checkpoint_progress \s+ last_search_time \s+ changes_last_detection_time \n + (airline\-transform\-continuous \s+ stopped \s+ 0 \s+ 0 \s+ \s+ \s+ \n)+ $/ - do: transform.delete_transform: transform_id: "airline-transform-continuous" diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestCatTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestCatTransformAction.java index f603da335e313..b78fe4667171b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestCatTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestCatTransformAction.java @@ -26,7 +26,9 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformStats; import org.elasticsearch.xpack.transform.Transform; +import java.util.Date; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -94,22 +96,8 @@ protected Table getTableWithHeader(RestRequest unused) { private static Table getTableWithHeader() { return new Table().startHeaders() - // Transform config info + // default columns .addCell("id", TableColumnAttributeBuilder.builder("the id").build()) - .addCell("create_time", TableColumnAttributeBuilder.builder("transform creation time").setAliases("ct", "createTime").build()) - .addCell( - "version", - TableColumnAttributeBuilder.builder("the version of Elasticsearch when the transform was created").setAliases("v").build() - ) - .addCell("source_index", TableColumnAttributeBuilder.builder("source index").setAliases("si", "sourceIndex").build()) - .addCell("dest_index", TableColumnAttributeBuilder.builder("destination index").setAliases("di", "destIndex").build()) - .addCell("pipeline", TableColumnAttributeBuilder.builder("transform pipeline").setAliases("p").build()) - .addCell("description", TableColumnAttributeBuilder.builder("description").setAliases("d").build()) - .addCell("transform_type", TableColumnAttributeBuilder.builder("batch or continuous transform").setAliases("tt").build()) - .addCell("frequency", TableColumnAttributeBuilder.builder("frequency of transform").setAliases("f").build()) - .addCell("max_page_search_size", TableColumnAttributeBuilder.builder("max page search size").setAliases("mpsz").build()) - .addCell("docs_per_second", TableColumnAttributeBuilder.builder("docs per second").setAliases("dps").build()) - // Transform stats info .addCell( "state", TableColumnAttributeBuilder.builder("transform state") @@ -117,11 +105,48 @@ private static Table getTableWithHeader() { .setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT) .build() ) - .addCell("reason", TableColumnAttributeBuilder.builder("reason for the current state", false).setAliases("r", "reason").build()) + .addCell("checkpoint", TableColumnAttributeBuilder.builder("checkpoint").setAliases("c").build()) + .addCell( + "documents_processed", + TableColumnAttributeBuilder.builder("the number of documents read from source indices and processed") + .setAliases("docp", "documentsProcessed") + .build() + ) + .addCell( + "checkpoint_progress", + TableColumnAttributeBuilder.builder("progress of the checkpoint").setAliases("cp", "checkpointProgress").build() + ) + .addCell( + "last_search_time", + TableColumnAttributeBuilder.builder("last time transform searched for updates").setAliases("lst", "lastSearchTime").build() + ) .addCell( "changes_last_detection_time", - TableColumnAttributeBuilder.builder("changes last detected time", false).setAliases("cldt").build() + TableColumnAttributeBuilder.builder("changes last detected time").setAliases("cldt").build() + ) + + // optional columns + .addCell( + "create_time", + TableColumnAttributeBuilder.builder("transform creation time", false).setAliases("ct", "createTime").build() + ) + .addCell( + "version", + TableColumnAttributeBuilder.builder("the version of Elasticsearch when the transform was created", false) + .setAliases("v") + .build() ) + .addCell("source_index", TableColumnAttributeBuilder.builder("source index", false).setAliases("si", "sourceIndex").build()) + .addCell("dest_index", TableColumnAttributeBuilder.builder("destination index", false).setAliases("di", "destIndex").build()) + .addCell("pipeline", TableColumnAttributeBuilder.builder("transform pipeline", false).setAliases("p").build()) + .addCell("description", TableColumnAttributeBuilder.builder("description", false).setAliases("d").build()) + .addCell("transform_type", TableColumnAttributeBuilder.builder("batch or continuous transform", false).setAliases("tt").build()) + .addCell("frequency", TableColumnAttributeBuilder.builder("frequency of transform", false).setAliases("f").build()) + .addCell("max_page_search_size", TableColumnAttributeBuilder.builder("max page search size", false).setAliases("mpsz").build()) + .addCell("docs_per_second", TableColumnAttributeBuilder.builder("docs per second", false).setAliases("dps").build()) + + .addCell("reason", TableColumnAttributeBuilder.builder("reason for the current state", false).setAliases("r", "reason").build()) + .addCell("search_total", TableColumnAttributeBuilder.builder("total number of search phases", false).setAliases("st").build()) .addCell( "search_failure", @@ -137,15 +162,9 @@ private static Table getTableWithHeader() { "index_time", TableColumnAttributeBuilder.builder("total time spent indexing documents", false).setAliases("itime").build() ) - .addCell( - "documents_processed", - TableColumnAttributeBuilder.builder("the number of documents read from source indices and processed", false) - .setAliases("docp") - .build() - ) .addCell( "documents_indexed", - TableColumnAttributeBuilder.builder("the number of documents index to the destination index", false) + TableColumnAttributeBuilder.builder("the number of documents written to the destination index", false) .setAliases("doci") .build() ) @@ -199,9 +218,30 @@ private Table buildTable(GetTransformAction.Response response, GetTransformStats : config.getPivotConfig().getMaxPageSearchSize() : config.getSettings().getMaxPageSearchSize(); + Double progress = checkpointingInfo == null ? null + : checkpointingInfo.getNext().getCheckpointProgress() == null ? null + : checkpointingInfo.getNext().getCheckpointProgress().getPercentComplete(); + table.startRow() + // default columns .addCell(config.getId()) - .addCell(config.getCreateTime()) + .addCell(stats == null ? null : stats.getState().toString()) + .addCell(checkpointingInfo == null ? null : checkpointingInfo.getLast().getCheckpoint()) + .addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumDocuments()) + .addCell(progress == null ? null : String.format(Locale.ROOT, "%.2f", progress)) + .addCell( + checkpointingInfo == null ? null + : checkpointingInfo.getLastSearchTime() == null ? null + : Date.from(checkpointingInfo.getLastSearchTime()) + ) + .addCell( + checkpointingInfo == null ? null + : checkpointingInfo.getChangesLastDetectedAt() == null ? null + : Date.from(checkpointingInfo.getChangesLastDetectedAt()) + ) + + // optional columns + .addCell(config.getCreateTime() == null ? null : Date.from(config.getCreateTime())) .addCell(config.getVersion()) .addCell(String.join(",", config.getSource().getIndex())) .addCell(config.getDestination().getIndex()) @@ -215,9 +255,7 @@ private Table buildTable(GetTransformAction.Response response, GetTransformStats ? "-" : config.getSettings().getDocsPerSecond() ) - .addCell(stats == null ? null : stats.getState()) .addCell(stats == null ? null : stats.getReason()) - .addCell(checkpointingInfo == null ? null : checkpointingInfo.getChangesLastDetectedAt()) .addCell(transformIndexerStats == null ? null : transformIndexerStats.getSearchTotal()) .addCell(transformIndexerStats == null ? null : transformIndexerStats.getSearchFailures()) .addCell(transformIndexerStats == null ? null : TimeValue.timeValueMillis(transformIndexerStats.getSearchTime())) @@ -226,15 +264,26 @@ private Table buildTable(GetTransformAction.Response response, GetTransformStats .addCell(transformIndexerStats == null ? null : transformIndexerStats.getIndexFailures()) .addCell(transformIndexerStats == null ? null : TimeValue.timeValueMillis(transformIndexerStats.getIndexTime())) - .addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumDocuments()) .addCell(transformIndexerStats == null ? null : transformIndexerStats.getOutputDocuments()) .addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumInvocations()) .addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumPages()) .addCell(transformIndexerStats == null ? null : TimeValue.timeValueMillis(transformIndexerStats.getProcessingTime())) - .addCell(transformIndexerStats == null ? null : transformIndexerStats.getExpAvgCheckpointDurationMs()) - .addCell(transformIndexerStats == null ? null : transformIndexerStats.getExpAvgDocumentsIndexed()) - .addCell(transformIndexerStats == null ? null : transformIndexerStats.getExpAvgDocumentsProcessed()) + .addCell( + transformIndexerStats == null + ? null + : String.format(Locale.ROOT, "%.2f", transformIndexerStats.getExpAvgCheckpointDurationMs()) + ) + .addCell( + transformIndexerStats == null + ? null + : String.format(Locale.ROOT, "%.2f", transformIndexerStats.getExpAvgDocumentsIndexed()) + ) + .addCell( + transformIndexerStats == null + ? null + : String.format(Locale.ROOT, "%.2f", transformIndexerStats.getExpAvgDocumentsProcessed()) + ) .endRow(); });