From eabd2ffcfe2941a9c6245c481a0a4907b1956abc Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Mon, 17 Jun 2019 15:24:41 -0500 Subject: [PATCH 1/8] [ML][Data Frame] Adding bwc tests for pivot transform --- .../core/dataframe/transforms/DestConfig.java | 4 +- x-pack/qa/rolling-upgrade/build.gradle | 3 +- .../mixed_cluster/80_data_frame_jobs_crud.yml | 192 ++++++++++++ .../old_cluster/80_data_frame_jobs_crud.yml | 140 +++++++++ .../80_data_frame_jobs_crud.yml | 278 ++++++++++++++++++ 5 files changed, 614 insertions(+), 3 deletions(-) create mode 100644 x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml create mode 100644 x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml create mode 100644 x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java index 282a3f9a04484..00d03066b6fff 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java @@ -50,7 +50,7 @@ public DestConfig(String index, String pipeline) { public DestConfig(final StreamInput in) throws IOException { index = in.readString(); - if (in.getVersion().onOrAfter(Version.CURRENT)) { + if (in.getVersion().onOrAfter(Version.V_7_3_0)) { pipeline = in.readOptionalString(); } else { pipeline = null; @@ -72,7 +72,7 @@ public boolean isValid() { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(index); - if (out.getVersion().onOrAfter(Version.CURRENT)) { + if (out.getVersion().onOrAfter(Version.V_7_3_0)) { out.writeOptionalString(pipeline); } } diff --git a/x-pack/qa/rolling-upgrade/build.gradle b/x-pack/qa/rolling-upgrade/build.gradle index 6fb89ab47a658..b3347fd126c54 100644 --- a/x-pack/qa/rolling-upgrade/build.gradle +++ b/x-pack/qa/rolling-upgrade/build.gradle @@ -240,7 +240,8 @@ for (Version version : bwcVersions.wireCompatible) { 'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data', 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed in mixed cluster', 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed without aggs in mixed cluster', - 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster' + 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster', + 'mixed_cluster/80_data_frame_jobs_crud/Test put mixed cluser data frame transform' ].join(',') finalizedBy "${baseName}#oldClusterTestCluster#node1.stop" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml new file mode 100644 index 0000000000000..a1179c3558725 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml @@ -0,0 +1,192 @@ +--- +"Test put mixed cluser data frame transform": + - do: + indices.create: + index: mixed-dataframe-transform-airline-data + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + - do: + data_frame.put_data_frame_transform: + transform_id: "mixed-simple-transform" + body: > + { + "source": { "index": "mixed-dataframe-transform-airline-data" }, + "dest": { "index": "mixed-simple-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.put_data_frame_transform: + transform_id: "mixed-complex-transform" + body: > + { + "source": { + "index": "mixed-dataframe-transform-airline-data", + "query": { + "bool": { + "filter": {"term": {"airline": "ElasticAir"}} + } + } + }, + "dest": { + "index": "mixed-complex-transform-idx", + "pipeline": "data_frame_simple_pipeline" + }, + "pivot": { + "group_by": { + "airline": {"terms": {"field": "airline"}}, + "day": {"date_histogram": {"field": "timestamp", "calendar_interval": "1d"}}, + "every_50": {"histogram": {"field": "responsetime", "interval": 50}} + }, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + cluster.health: + index: ".data-frame-internal-1" + wait_for_status: green +--- +"Test GET, start, and stop old cluster transforms": + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.source.index.0: "old-dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "old-simple-transform-idx" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.source.index.0: "old-dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "old-complex-transform-idx" } + - match: { transforms.0.dest.pipeline: "data_frame_simple_pipeline" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.group_by.day.date_histogram.field: "timestamp" } + - match: { transforms.0.pivot.group_by.every_50.histogram.field: "responsetime" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml new file mode 100644 index 0000000000000..bd4999f3ff4b3 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml @@ -0,0 +1,140 @@ +--- +"Test put data frame transforms on old cluster": + - do: + indices.create: + index: old-dataframe-transform-airline-data + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + - do: + ingest.put_pipeline: + id: "data_frame_simple_pipeline" + body: > + { + "processors": [ + { + "set" : { + "field" : "my_field", + "value": 42 + } + } + ] + } + - do: + data_frame.put_data_frame_transform: + transform_id: "old-simple-transform" + body: > + { + "source": { "index": "old-dataframe-transform-airline-data" }, + "dest": { "index": "old-simple-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.put_data_frame_transform: + transform_id: "old-complex-transform" + body: > + { + "source": { + "index": "old-dataframe-transform-airline-data", + "query": { + "bool": { + "filter": {"term": {"airline": "ElasticAir"}} + } + } + }, + "dest": { + "index": "old-complex-transform-idx", + "pipeline": "data_frame_simple_pipeline" + }, + "pivot": { + "group_by": { + "airline": {"terms": {"field": "airline"}}, + "day": {"date_histogram": {"field": "timestamp", "calendar_interval": "1d"}}, + "every_50": {"histogram": {"field": "responsetime", "interval": 50}} + }, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + cluster.health: + index: ".data-frame-internal-1" + wait_for_status: green diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml new file mode 100644 index 0000000000000..fbf6c1108b07e --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -0,0 +1,278 @@ +--- +"Test put new cluster data frame transforms": + - do: + indices.create: + index: new-dataframe-transform-airline-data + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + - do: + data_frame.put_data_frame_transform: + transform_id: "new-simple-transform" + body: > + { + "source": { "index": "new-dataframe-transform-airline-data" }, + "dest": { "index": "new-simple-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.start_data_frame_transform: + transform_id: "new-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "new-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "new-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "new-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "new-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "new-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.put_data_frame_transform: + transform_id: "new-complex-transform" + body: > + { + "source": { + "index": "new-dataframe-transform-airline-data", + "query": { + "bool": { + "filter": {"term": {"airline": "ElasticAir"}} + } + } + }, + "dest": { + "index": "new-complex-transform-idx", + "pipeline": "data_frame_simple_pipeline" + }, + "pivot": { + "group_by": { + "airline": {"terms": {"field": "airline"}}, + "day": {"date_histogram": {"field": "timestamp", "calendar_interval": "1d"}}, + "every_50": {"histogram": {"field": "responsetime", "interval": 50}} + }, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "new-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "new-complex-transform" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "new-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "new-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "new-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "new-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "new-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "new-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + cluster.health: + index: ".data-frame-internal-1" + wait_for_status: green +--- +"Get start, stop, and delete old and mixed cluster data frame transforms": + # Simple and complex OLD transforms + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.source.index.0: "old-dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "old-simple-transform-idx" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + - do: + data_frame.get_data_frame_transform: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.source.index.0: "old-dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "old-complex-transform-idx" } + - match: { transforms.0.dest.pipeline: "data_frame_simple_pipeline" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.group_by.day.date_histogram.field: "timestamp" } + - match: { transforms.0.pivot.group_by.every_50.histogram.field: "responsetime" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + # Simple and complex Mixed cluster transforms + - do: + data_frame.get_data_frame_transform: + transform_id: "mixed-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-transform" } + - match: { transforms.0.source.index.0: "mixed-dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "mixed-simple-transform-idx" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.get_data_frame_transform: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - match: { transforms.0.source.index.0: "mixed-dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "mixed-complex-transform-idx" } + - match: { transforms.0.dest.pipeline: "data_frame_simple_pipeline" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.group_by.day.date_histogram.field: "timestamp" } + - match: { transforms.0.pivot.group_by.every_50.histogram.field: "responsetime" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + +# Delete all old and mixed transforms + - do: + data_frame.delete_data_frame_transform: + transform_id: "old-simple-transform" + + - do: + data_frame.delete_data_frame_transform: + transform_id: "mixed-simple-transform" + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform,mixed-simple-transform" + - match: { count: 0 } From c7e20b1375c0db59545b69e06f39f69a17ef336d Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 25 Jun 2019 08:18:00 -0500 Subject: [PATCH 2/8] adding continuous transforms --- .../mixed_cluster/80_data_frame_jobs_crud.yml | 13 +++- .../old_cluster/80_data_frame_jobs_crud.yml | 61 ++++++++++++++++++- .../80_data_frame_jobs_crud.yml | 13 +++- 3 files changed, 82 insertions(+), 5 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml index a1179c3558725..e444df3032a73 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml @@ -1,5 +1,5 @@ --- -"Test put mixed cluser data frame transform": +"Test put batch data frame transforms on mixed cluster": - do: indices.create: index: mixed-dataframe-transform-airline-data @@ -14,6 +14,10 @@ type: float event_rate: type: integer + - do: + cluster.health: + index: "mixed-dataframe-transform-airline-data" + wait_for_status: green - do: data_frame.put_data_frame_transform: transform_id: "mixed-simple-transform" @@ -119,7 +123,12 @@ index: ".data-frame-internal-1" wait_for_status: green --- -"Test GET, start, and stop old cluster transforms": +"Test GET, start, and stop old cluster batch transforms": + - do: + cluster.health: + index: "old-dataframe-transform-airline-data" + wait_for_status: green + - do: data_frame.get_data_frame_transform: transform_id: "old-simple-transform" diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml index bd4999f3ff4b3..04556738f9749 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml @@ -1,5 +1,5 @@ --- -"Test put data frame transforms on old cluster": +"Test put batch data frame transforms on old cluster": - do: indices.create: index: old-dataframe-transform-airline-data @@ -14,6 +14,11 @@ type: float event_rate: type: integer + - do: + cluster.health: + index: "old-dataframe-transform-airline-data" + wait_for_status: green + - do: ingest.put_pipeline: id: "data_frame_simple_pipeline" @@ -138,3 +143,57 @@ cluster.health: index: ".data-frame-internal-1" wait_for_status: green + +--- +"Test put continuous data frame transform on old cluster": + - do: + indices.create: + index: old-dataframe-transform-airline-data-cont + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + - do: + cluster.health: + index: "old-dataframe-transform-airline-data-cont" + wait_for_status: green + + - do: + data_frame.put_data_frame_transform: + transform_id: "old-simple-continuous-transform" + body: > + { + "source": { "index": "old-dataframe-transform-airline-data-cont" }, + "dest": { "index": "old-simple-continuous-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "/started/" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml index fbf6c1108b07e..74abb385fc7c0 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -1,5 +1,5 @@ --- -"Test put new cluster data frame transforms": +"Test put batch data frame transforms on new cluster": - do: indices.create: index: new-dataframe-transform-airline-data @@ -14,6 +14,10 @@ type: float event_rate: type: integer + - do: + cluster.health: + index: "new-dataframe-transform-airline-data" + wait_for_status: green - do: data_frame.put_data_frame_transform: transform_id: "new-simple-transform" @@ -119,7 +123,12 @@ index: ".data-frame-internal-1" wait_for_status: green --- -"Get start, stop, and delete old and mixed cluster data frame transforms": +"Get start, stop, and delete old and mixed cluster batch data frame transforms": + - do: + cluster.health: + index: "mixed-dataframe-transform-airline-data,old-dataframe-transform-airline-data" + wait_for_status: green + # Simple and complex OLD transforms - do: data_frame.get_data_frame_transform: From eef5c069ab5179c592faba858dc4640f8cf73493 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 25 Jun 2019 09:29:02 -0500 Subject: [PATCH 3/8] adding continuous dataframes to bwc --- .../mixed_cluster/80_data_frame_jobs_crud.yml | 112 +++++++++++- .../old_cluster/80_data_frame_jobs_crud.yml | 14 +- .../80_data_frame_jobs_crud.yml | 170 +++++++++++++++++- 3 files changed, 290 insertions(+), 6 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml index e444df3032a73..d4e1f90867e1b 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml @@ -123,10 +123,71 @@ index: ".data-frame-internal-1" wait_for_status: green --- +"Test put continuous data frame transform on mixed cluster": + - do: + indices.create: + index: mixed-dataframe-transform-airline-data-cont + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + - do: + cluster.health: + index: ".data-frame-internal-1,mixed-dataframe-transform-airline-data-cont" + wait_for_status: green + + - do: + data_frame.put_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + body: > + { + "source": { "index": "mixed-dataframe-transform-airline-data-cont" }, + "dest": { "index": "mixed-simple-continuous-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "sync": { + "time": { + "field": "time", + "delay": "90m" + } + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } +--- "Test GET, start, and stop old cluster batch transforms": - do: cluster.health: - index: "old-dataframe-transform-airline-data" + index: ".data-frame-internal-1,old-dataframe-transform-airline-data" wait_for_status: green - do: @@ -199,3 +260,52 @@ - match: { transforms.0.id: "old-complex-transform" } - match: { transforms.0.state.indexer_state: "stopped" } - match: { transforms.0.state.task_state: "stopped" } + +--- +"Test GET, stop, start, old continuous transforms": + - do: + cluster.health: + index: ".data-frame-internal-1,old-dataframe-transform-airline-data-cont" + wait_for_status: green + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-continuous-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml index 04556738f9749..4a639b9b2eda5 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml @@ -16,7 +16,7 @@ type: integer - do: cluster.health: - index: "old-dataframe-transform-airline-data" + index: ".data-frame-internal-1,old-dataframe-transform-airline-data" wait_for_status: green - do: @@ -162,7 +162,7 @@ type: integer - do: cluster.health: - index: "old-dataframe-transform-airline-data-cont" + index: ".data-frame-internal-1,old-dataframe-transform-airline-data-cont" wait_for_status: green - do: @@ -175,6 +175,12 @@ "pivot": { "group_by": { "airline": {"terms": {"field": "airline"}}}, "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "sync": { + "time": { + "field": "time", + "delay": "90m" + } } } - match: { acknowledged: true } @@ -184,6 +190,8 @@ transform_id: "old-simple-continuous-transform" - match: { count: 1 } - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } - is_true: transforms.0.version - is_true: transforms.0.create_time @@ -196,4 +204,4 @@ transform_id: "old-simple-continuous-transform" - match: { count: 1 } - match: { transforms.0.id: "old-simple-continuous-transform" } - - match: { transforms.0.state.task_state: "/started/" } + - match: { transforms.0.state.task_state: "started" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml index 74abb385fc7c0..e9c191488fa3e 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -1,3 +1,10 @@ +setup: + - do: + cluster.health: + wait_for_status: green + wait_for_nodes: 3 + # wait for long enough that we give delayed unassigned shards to stop being delayed + timeout: 70s --- "Test put batch data frame transforms on new cluster": - do: @@ -118,15 +125,97 @@ - match: { transforms.0.state.indexer_state: "stopped" } - match: { transforms.0.state.task_state: "stopped" } + - do: + data_frame.delete_data_frame_transform: + transform_id: "new-complex-transform" + + - do: + data_frame.delete_data_frame_transform: + transform_id: "new-simple-transform" +--- +"Test put continuous data frame transform on new cluster": + - do: + indices.create: + index: new-dataframe-transform-airline-data-cont + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer - do: cluster.health: - index: ".data-frame-internal-1" + index: "new-dataframe-transform-airline-data-cont" wait_for_status: green + + - do: + data_frame.put_data_frame_transform: + transform_id: "new-simple-continuous-transform" + body: > + { + "source": { "index": "new-dataframe-transform-airline-data-cont" }, + "dest": { "index": "new-simple-continuous-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "sync": { + "time": { + "field": "time", + "delay": "90m" + } + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "new-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "new-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "new-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "new-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "new-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "new-simple-continuous-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "new-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "new-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.delete_data_frame_transform: + transform_id: "new-simple-continuous-transform" --- "Get start, stop, and delete old and mixed cluster batch data frame transforms": - do: cluster.health: - index: "mixed-dataframe-transform-airline-data,old-dataframe-transform-airline-data" + index: ".data-frame-internal-1,mixed-dataframe-transform-airline-data,old-dataframe-transform-airline-data" wait_for_status: green # Simple and complex OLD transforms @@ -285,3 +374,80 @@ data_frame.get_data_frame_transform_stats: transform_id: "old-simple-transform,mixed-simple-transform" - match: { count: 0 } + +--- +"Test GET, stop, delete, old and mixed continuous transforms": + - do: + cluster.health: + index: ".data-frame-internal-1,old-dataframe-transform-airline-data-cont,mixed-dataframe-transform-airline-data-cont" + wait_for_status: green + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-continuous-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.delete_data_frame_transform: + transform_id: "old-simple-continuous-transform" + + - do: + data_frame.get_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.delete_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" From 3baa393bcfe4e66ac97d8828ff418ef51e6a13ad Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 25 Jun 2019 15:27:09 -0500 Subject: [PATCH 4/8] adding continuous data frame tests --- .../test/mixed_cluster/80_data_frame_jobs_crud.yml | 8 ++++++-- .../test/old_cluster/80_data_frame_jobs_crud.yml | 6 +++++- .../test/upgraded_cluster/80_data_frame_jobs_crud.yml | 10 ---------- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml index d4e1f90867e1b..a0908f20ea209 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml @@ -18,6 +18,7 @@ cluster.health: index: "mixed-dataframe-transform-airline-data" wait_for_status: green + - do: data_frame.put_data_frame_transform: transform_id: "mixed-simple-transform" @@ -140,7 +141,7 @@ type: integer - do: cluster.health: - index: ".data-frame-internal-1,mixed-dataframe-transform-airline-data-cont" + index: "mixed-dataframe-transform-airline-data-cont" wait_for_status: green - do: @@ -183,6 +184,10 @@ - match: { count: 1 } - match: { transforms.0.id: "mixed-simple-continuous-transform" } - match: { transforms.0.state.task_state: "started" } + - do: + cluster.health: + index: ".data-frame-internal-1" + wait_for_status: green --- "Test GET, start, and stop old cluster batch transforms": - do: @@ -265,7 +270,6 @@ "Test GET, stop, start, old continuous transforms": - do: cluster.health: - index: ".data-frame-internal-1,old-dataframe-transform-airline-data-cont" wait_for_status: green - do: diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml index 4a639b9b2eda5..30a7fb4748237 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml @@ -16,7 +16,7 @@ type: integer - do: cluster.health: - index: ".data-frame-internal-1,old-dataframe-transform-airline-data" + index: "old-dataframe-transform-airline-data" wait_for_status: green - do: @@ -205,3 +205,7 @@ - match: { count: 1 } - match: { transforms.0.id: "old-simple-continuous-transform" } - match: { transforms.0.state.task_state: "started" } + - do: + cluster.health: + index: ".data-frame-internal-1" + wait_for_status: green diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml index e9c191488fa3e..40c3c2cedbb08 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -213,11 +213,6 @@ setup: transform_id: "new-simple-continuous-transform" --- "Get start, stop, and delete old and mixed cluster batch data frame transforms": - - do: - cluster.health: - index: ".data-frame-internal-1,mixed-dataframe-transform-airline-data,old-dataframe-transform-airline-data" - wait_for_status: green - # Simple and complex OLD transforms - do: data_frame.get_data_frame_transform: @@ -377,11 +372,6 @@ setup: --- "Test GET, stop, delete, old and mixed continuous transforms": - - do: - cluster.health: - index: ".data-frame-internal-1,old-dataframe-transform-airline-data-cont,mixed-dataframe-transform-airline-data-cont" - wait_for_status: green - - do: data_frame.get_data_frame_transform: transform_id: "old-simple-continuous-transform" From c86b581101833fe7c1881ac55f88d715842c591d Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 26 Jun 2019 07:55:27 -0500 Subject: [PATCH 5/8] Adding rolling upgrade tests for continuous df --- .../upgrades/DataFrameSurvivesUpgradeIT.java | 251 ++++++++++++++++++ .../mixed_cluster/80_data_frame_jobs_crud.yml | 44 +-- .../old_cluster/80_data_frame_jobs_crud.yml | 27 +- .../80_data_frame_jobs_crud.yml | 228 ++-------------- .../test/rest/XPackRestTestConstants.java | 6 + 5 files changed, 317 insertions(+), 239 deletions(-) create mode 100644 x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java new file mode 100644 index 0000000000000..6c0dfce5a4796 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java @@ -0,0 +1,251 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.client.dataframe.transforms.DestConfig; +import org.elasticsearch.client.dataframe.transforms.SourceConfig; +import org.elasticsearch.client.dataframe.transforms.TimeSyncConfig; +import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig; +import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig; +import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.xpack.test.rest.XPackRestTestConstants; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { + + private static final String DATAFRAME_ENDPOINT = "/_data_frame/transforms/"; + private static final String CONTINUOUS_DATA_FRAME_ID = "continuous-data-frame-upgrade-job"; + private static final String CONTINUOUS_DATA_FRAME_SOURCE = "data-frame-upgrade-continuous-source"; + private static final List ENTITIES = Stream.iterate(1, n -> n + 1) + .limit(5) + .map(v -> "user_" + v) + .collect(Collectors.toList()); + private static final List BUCKETS = Stream.iterate(1, n -> n + 1) + .limit(5) + .map(TimeValue::timeValueSeconds) + .collect(Collectors.toList()); + + @Override + protected Collection templatesToWaitFor() { + return Stream.concat(XPackRestTestConstants.DATA_FRAME_TEMPLATES.stream(), + super.templatesToWaitFor().stream()).collect(Collectors.toSet()); + } + + protected static void waitForPendingDataFrameTasks() throws Exception { + waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("data_frame/transforms") == false); + } + + /** + * The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results + * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade + */ + public void testDataFramesRollingUpgrade() throws Exception { + + switch (CLUSTER_TYPE) { + case OLD: + createAndStartContinuousDataFrame(); + break; + case MIXED: + verifyContinuousDataFrameHandlesData(); + break; + case UPGRADED: + verifyContinuousDataFrameHandlesData(); + cleanUpTransforms(); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } + + private void cleanUpTransforms() throws Exception { + stopTransform(CONTINUOUS_DATA_FRAME_ID); + deleteTransform(CONTINUOUS_DATA_FRAME_ID); + waitForPendingDataFrameTasks(); + } + + private void createAndStartContinuousDataFrame() throws Exception { + createIndex(CONTINUOUS_DATA_FRAME_SOURCE); + long totalDocsWritten = 0; + for (TimeValue bucket : BUCKETS) { + int docs = randomIntBetween(1, 25); + putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, bucket, ENTITIES); + totalDocsWritten += docs * ENTITIES.size(); + } + + DataFrameTransformConfig config = DataFrameTransformConfig.builder() + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) + .setPivotConfig(PivotConfig.builder() + .setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("stars").field("stars"))) + .setGroups(GroupConfig.builder().groupBy("user_id", TermsGroupSource.builder().setField("user_id").build()).build()) + .build()) + .setDest(DestConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_ID + "_idx").build()) + .setSource(SourceConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_SOURCE).build()) + .setId(CONTINUOUS_DATA_FRAME_ID) + .build(); + putTransform(CONTINUOUS_DATA_FRAME_ID, config); + + startTransform(CONTINUOUS_DATA_FRAME_ID); + waitUntilCheckpoint(CONTINUOUS_DATA_FRAME_ID, 1); + + DataFrameTransformStateAndStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + + assertThat(stateAndStats.getTransformStats().getOutputDocuments(), equalTo(ENTITIES.size())); + assertThat(stateAndStats.getTransformStats().getNumDocuments(), equalTo(totalDocsWritten)); + assertThat(stateAndStats.getTransformState().getTaskState(), equalTo(DataFrameTransformTaskState.STARTED)); + } + + private void verifyContinuousDataFrameHandlesData() throws Exception { + // A continuous data frame should automatically become started when it gets assigned to a node + // if it was assigned to the node that was removed from the cluster + assertBusy(() -> assertThat(getTransformStats(CONTINUOUS_DATA_FRAME_ID).getTransformState().getTaskState(), + equalTo(DataFrameTransformTaskState.STARTED)), + 60, + TimeUnit.SECONDS); + + DataFrameTransformStateAndStats previousStateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + + // Add a new user and write data to it and all the old users + List entities = new ArrayList<>(ENTITIES); + entities.add("user_" + ENTITIES.size() + 1); + int docs = randomIntBetween(1, 25); + putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(1), entities); + long totalDocsWritten = docs * entities.size(); + + waitUntilCheckpoint(CONTINUOUS_DATA_FRAME_ID, 2); + + DataFrameTransformStateAndStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + + assertThat(stateAndStats.getTransformStats().getOutputDocuments(), equalTo(entities.size())); + assertThat(stateAndStats.getTransformStats().getNumDocuments(), + equalTo(totalDocsWritten + previousStateAndStats.getTransformStats().getNumDocuments())); + assertThat(stateAndStats.getTransformState().getTaskState(), + equalTo(DataFrameTransformTaskState.STARTED)); + } + + private void putTransform(String id, DataFrameTransformConfig config) throws IOException { + final Request createDataframeTransformRequest = new Request("PUT", DATAFRAME_ENDPOINT + id); + createDataframeTransformRequest.setJsonEntity(Strings.toString(config)); + Response response = client().performRequest(createDataframeTransformRequest); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private void deleteTransform(String id) throws IOException { + Response response = client().performRequest(new Request("DELETE", DATAFRAME_ENDPOINT + id)); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private void startTransform(String id) throws IOException { + final Request startDataframeTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + id + "/_start"); + Response response = client().performRequest(startDataframeTransformRequest); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private void stopTransform(String id) throws IOException { + final Request stopDataframeTransformRequest = new Request("POST", + DATAFRAME_ENDPOINT + id + "/_stop?wait_for_completion=true"); + Response response = client().performRequest(stopDataframeTransformRequest); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private DataFrameTransformStateAndStats getTransformStats(String id) throws IOException { + final Request getStats = new Request("GET", DATAFRAME_ENDPOINT + id + "/stats"); + Response response = client().performRequest(getStats); + assertEquals(200, response.getStatusLine().getStatusCode()); + XContentType xContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue()); + try (XContentParser parser = xContentType.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + response.getEntity().getContent())) { + GetDataFrameTransformStatsResponse resp = GetDataFrameTransformStatsResponse.fromXContent(parser); + assertThat(resp.getTransformsStateAndStats(), hasSize(1)); + return resp.getTransformsStateAndStats().get(0); + } + } + + private void waitUntilCheckpoint(String id, long checkpoint) throws Exception { + assertBusy(() -> assertThat(getTransformStats(id).getTransformState().getCheckpoint(), equalTo(checkpoint)), + 60, TimeUnit.SECONDS); + } + + private void createIndex(String indexName) throws IOException { + // create mapping + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("mappings") + .startObject("properties") + .startObject("timestamp") + .field("type", "date") + .endObject() + .startObject("user_id") + .field("type", "keyword") + .endObject() + .startObject("stars") + .field("type", "integer") + .endObject() + .endObject() + .endObject(); + } + builder.endObject(); + final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); + Request req = new Request("PUT", indexName); + req.setEntity(entity); + client().performRequest(req); + } + } + + private void putData(String indexName, int numDocs, TimeValue fromTime, List entityIds) throws IOException { + long timeStamp = Instant.now().toEpochMilli() - fromTime.getMillis(); + + // create index + final StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < numDocs; i++) { + for (String entity : entityIds) { + bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n") + .append("{\"user_id\":\"") + .append(entity) + .append("\",\"stars\":") + .append(randomLongBetween(0, 5)) + .append("\",\"timestamp\":\"") + .append(timeStamp) + .append("\"}\n"); + } + } + bulk.append("\r\n"); + final Request bulkRequest = new Request("POST", "/_bulk"); + bulkRequest.addParameter("refresh", "true"); + bulkRequest.setJsonEntity(bulk.toString()); + client().performRequest(bulkRequest); + } +} diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml index a0908f20ea209..c1a2ba63e6d59 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml @@ -17,7 +17,7 @@ - do: cluster.health: index: "mixed-dataframe-transform-airline-data" - wait_for_status: green + wait_for_status: yellow - do: data_frame.put_data_frame_transform: @@ -119,10 +119,6 @@ - match: { transforms.0.state.indexer_state: "stopped" } - match: { transforms.0.state.task_state: "stopped" } - - do: - cluster.health: - index: ".data-frame-internal-1" - wait_for_status: green --- "Test put continuous data frame transform on mixed cluster": - do: @@ -142,7 +138,7 @@ - do: cluster.health: index: "mixed-dataframe-transform-airline-data-cont" - wait_for_status: green + wait_for_status: yellow - do: data_frame.put_data_frame_transform: @@ -184,16 +180,26 @@ - match: { count: 1 } - match: { transforms.0.id: "mixed-simple-continuous-transform" } - match: { transforms.0.state.task_state: "started" } + - do: - cluster.health: - index: ".data-frame-internal-1" - wait_for_status: green + data_frame.stop_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + --- "Test GET, start, and stop old cluster batch transforms": - do: cluster.health: index: ".data-frame-internal-1,old-dataframe-transform-airline-data" - wait_for_status: green + wait_for_status: yellow - do: data_frame.get_data_frame_transform: @@ -270,7 +276,8 @@ "Test GET, stop, start, old continuous transforms": - do: cluster.health: - wait_for_status: green + index: ".data-frame-internal-1,old-dataframe-transform-airline-data-cont" + wait_for_status: yellow - do: data_frame.get_data_frame_transform: @@ -282,6 +289,10 @@ - is_true: transforms.0.version - is_true: transforms.0.create_time + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { acknowledged: true } - do: data_frame.get_data_frame_transform_stats: transform_id: "old-simple-continuous-transform" @@ -302,14 +313,3 @@ - match: { transforms.0.id: "old-simple-continuous-transform" } - match: { transforms.0.state.indexer_state: "stopped" } - match: { transforms.0.state.task_state: "stopped" } - - - do: - data_frame.start_data_frame_transform: - transform_id: "old-simple-continuous-transform" - - match: { acknowledged: true } - - do: - data_frame.get_data_frame_transform_stats: - transform_id: "old-simple-continuous-transform" - - match: { count: 1 } - - match: { transforms.0.id: "old-simple-continuous-transform" } - - match: { transforms.0.state.task_state: "started" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml index 30a7fb4748237..a98f209a75bc2 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml @@ -17,7 +17,7 @@ - do: cluster.health: index: "old-dataframe-transform-airline-data" - wait_for_status: green + wait_for_status: yellow - do: ingest.put_pipeline: @@ -139,11 +139,6 @@ - match: { transforms.0.state.indexer_state: "stopped" } - match: { transforms.0.state.task_state: "stopped" } - - do: - cluster.health: - index: ".data-frame-internal-1" - wait_for_status: green - --- "Test put continuous data frame transform on old cluster": - do: @@ -162,8 +157,8 @@ type: integer - do: cluster.health: - index: ".data-frame-internal-1,old-dataframe-transform-airline-data-cont" - wait_for_status: green + index: "old-dataframe-transform-airline-data-cont" + wait_for_status: yellow - do: data_frame.put_data_frame_transform: @@ -205,7 +200,17 @@ - match: { count: 1 } - match: { transforms.0.id: "old-simple-continuous-transform" } - match: { transforms.0.state.task_state: "started" } + - do: - cluster.health: - index: ".data-frame-internal-1" - wait_for_status: green + data_frame.stop_data_frame_transform: + transform_id: "old-simple-continuous-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml index 40c3c2cedbb08..1efa2e1eb0b2b 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -6,212 +6,6 @@ setup: # wait for long enough that we give delayed unassigned shards to stop being delayed timeout: 70s --- -"Test put batch data frame transforms on new cluster": - - do: - indices.create: - index: new-dataframe-transform-airline-data - body: - mappings: - properties: - time: - type: date - airline: - type: keyword - responsetime: - type: float - event_rate: - type: integer - - do: - cluster.health: - index: "new-dataframe-transform-airline-data" - wait_for_status: green - - do: - data_frame.put_data_frame_transform: - transform_id: "new-simple-transform" - body: > - { - "source": { "index": "new-dataframe-transform-airline-data" }, - "dest": { "index": "new-simple-transform-idx" }, - "pivot": { - "group_by": { "airline": {"terms": {"field": "airline"}}}, - "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} - } - } - - match: { acknowledged: true } - - - do: - data_frame.start_data_frame_transform: - transform_id: "new-simple-transform" - - match: { acknowledged: true } - - do: - data_frame.get_data_frame_transform_stats: - transform_id: "new-simple-transform" - - match: { count: 1 } - - match: { transforms.0.id: "new-simple-transform" } - - match: { transforms.0.state.task_state: "/started|stopped/" } - - - do: - data_frame.stop_data_frame_transform: - transform_id: "new-simple-transform" - wait_for_completion: true - - match: { acknowledged: true } - - - do: - data_frame.get_data_frame_transform_stats: - transform_id: "new-simple-transform" - - match: { count: 1 } - - match: { transforms.0.id: "new-simple-transform" } - - match: { transforms.0.state.indexer_state: "stopped" } - - match: { transforms.0.state.task_state: "stopped" } - - - do: - data_frame.put_data_frame_transform: - transform_id: "new-complex-transform" - body: > - { - "source": { - "index": "new-dataframe-transform-airline-data", - "query": { - "bool": { - "filter": {"term": {"airline": "ElasticAir"}} - } - } - }, - "dest": { - "index": "new-complex-transform-idx", - "pipeline": "data_frame_simple_pipeline" - }, - "pivot": { - "group_by": { - "airline": {"terms": {"field": "airline"}}, - "day": {"date_histogram": {"field": "timestamp", "calendar_interval": "1d"}}, - "every_50": {"histogram": {"field": "responsetime", "interval": 50}} - }, - "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} - } - } - - match: { acknowledged: true } - - - do: - data_frame.get_data_frame_transform: - transform_id: "new-complex-transform" - - match: { count: 1 } - - match: { transforms.0.id: "new-complex-transform" } - - is_true: transforms.0.version - - is_true: transforms.0.create_time - - - do: - data_frame.start_data_frame_transform: - transform_id: "new-complex-transform" - - match: { acknowledged: true } - - do: - data_frame.get_data_frame_transform_stats: - transform_id: "new-complex-transform" - - match: { count: 1 } - - match: { transforms.0.id: "new-complex-transform" } - - match: { transforms.0.state.task_state: "/started|stopped/" } - - - do: - data_frame.stop_data_frame_transform: - transform_id: "new-complex-transform" - wait_for_completion: true - - match: { acknowledged: true } - - - do: - data_frame.get_data_frame_transform_stats: - transform_id: "new-complex-transform" - - match: { count: 1 } - - match: { transforms.0.id: "new-complex-transform" } - - match: { transforms.0.state.indexer_state: "stopped" } - - match: { transforms.0.state.task_state: "stopped" } - - - do: - data_frame.delete_data_frame_transform: - transform_id: "new-complex-transform" - - - do: - data_frame.delete_data_frame_transform: - transform_id: "new-simple-transform" ---- -"Test put continuous data frame transform on new cluster": - - do: - indices.create: - index: new-dataframe-transform-airline-data-cont - body: - mappings: - properties: - time: - type: date - airline: - type: keyword - responsetime: - type: float - event_rate: - type: integer - - do: - cluster.health: - index: "new-dataframe-transform-airline-data-cont" - wait_for_status: green - - - do: - data_frame.put_data_frame_transform: - transform_id: "new-simple-continuous-transform" - body: > - { - "source": { "index": "new-dataframe-transform-airline-data-cont" }, - "dest": { "index": "new-simple-continuous-transform-idx" }, - "pivot": { - "group_by": { "airline": {"terms": {"field": "airline"}}}, - "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} - }, - "sync": { - "time": { - "field": "time", - "delay": "90m" - } - } - } - - match: { acknowledged: true } - - - do: - data_frame.get_data_frame_transform: - transform_id: "new-simple-continuous-transform" - - match: { count: 1 } - - match: { transforms.0.id: "new-simple-continuous-transform" } - - match: { transforms.0.sync.time.field: "time" } - - match: { transforms.0.sync.time.delay: "90m" } - - is_true: transforms.0.version - - is_true: transforms.0.create_time - - - do: - data_frame.start_data_frame_transform: - transform_id: "new-simple-continuous-transform" - - match: { acknowledged: true } - - do: - data_frame.get_data_frame_transform_stats: - transform_id: "new-simple-continuous-transform" - - match: { count: 1 } - - match: { transforms.0.id: "new-simple-continuous-transform" } - - match: { transforms.0.state.task_state: "started" } - - - do: - data_frame.stop_data_frame_transform: - transform_id: "new-simple-continuous-transform" - wait_for_completion: true - - match: { acknowledged: true } - - - do: - data_frame.get_data_frame_transform_stats: - transform_id: "new-simple-continuous-transform" - - match: { count: 1 } - - match: { transforms.0.id: "new-simple-continuous-transform" } - - match: { transforms.0.state.indexer_state: "stopped" } - - match: { transforms.0.state.task_state: "stopped" } - - - do: - data_frame.delete_data_frame_transform: - transform_id: "new-simple-continuous-transform" ---- "Get start, stop, and delete old and mixed cluster batch data frame transforms": # Simple and complex OLD transforms - do: @@ -382,6 +176,17 @@ setup: - is_true: transforms.0.version - is_true: transforms.0.create_time + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + - do: data_frame.get_data_frame_transform_stats: transform_id: "old-simple-continuous-transform" @@ -417,6 +222,17 @@ setup: - is_true: transforms.0.version - is_true: transforms.0.create_time + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + - do: data_frame.get_data_frame_transform_stats: transform_id: "mixed-simple-continuous-transform" diff --git a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java index bfdf051a29235..5b4a6addc3ec5 100644 --- a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java +++ b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java @@ -30,6 +30,12 @@ public final class XPackRestTestConstants { public static final List ML_POST_V660_TEMPLATES = List.of(AUDITOR_NOTIFICATIONS_INDEX, ML_META_INDEX_NAME, STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX, CONFIG_INDEX); + // Data Frame constants: + public static final String DATA_FRAME_INTERNAL_INDEX = ".data-frame-internal-1"; + public static final String DATA_FRAME_NOTIFICATIONS_INDEX = ".data-frame-notifications-1"; + + public static final List DATA_FRAME_TEMPLATES = List.of(DATA_FRAME_INTERNAL_INDEX, DATA_FRAME_NOTIFICATIONS_INDEX); + private XPackRestTestConstants() { } } From afcd67b5b4a4436fab4c4e4d796add2d748059a1 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 26 Jun 2019 16:11:52 -0500 Subject: [PATCH 6/8] Fixing test --- .../upgrades/DataFrameSurvivesUpgradeIT.java | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java index af86537b4cf9f..10971808a91ac 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig; import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig; import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; @@ -42,6 +43,8 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { @@ -84,11 +87,15 @@ public void testDataFramesRollingUpgrade() throws Exception { break; case MIXED: client().performRequest(waitForYellow); - verifyContinuousDataFrameHandlesData(); + long lastCheckpoint = 1; + if (Booleans.parseBoolean(System.getProperty("tests.first_round")) == false) { + lastCheckpoint = 2; + } + verifyContinuousDataFrameHandlesData(lastCheckpoint); break; case UPGRADED: client().performRequest(waitForYellow); - verifyContinuousDataFrameHandlesData(); + verifyContinuousDataFrameHandlesData(3); cleanUpTransforms(); break; default: @@ -124,7 +131,7 @@ private void createAndStartContinuousDataFrame() throws Exception { putTransform(CONTINUOUS_DATA_FRAME_ID, config); startTransform(CONTINUOUS_DATA_FRAME_ID); - waitUntilCheckpoint(CONTINUOUS_DATA_FRAME_ID, 1); + waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, 0L); DataFrameTransformStateAndStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); @@ -133,7 +140,7 @@ private void createAndStartContinuousDataFrame() throws Exception { assertThat(stateAndStats.getTransformState().getTaskState(), equalTo(DataFrameTransformTaskState.STARTED)); } - private void verifyContinuousDataFrameHandlesData() throws Exception { + private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception { // A continuous data frame should automatically become started when it gets assigned to a node // if it was assigned to the node that was removed from the cluster @@ -149,19 +156,24 @@ private void verifyContinuousDataFrameHandlesData() throws Exception { // Add a new user and write data to it and all the old users List entities = new ArrayList<>(ENTITIES); entities.add("user_" + ENTITIES.size() + 1); - int docs = randomIntBetween(1, 25); + int docs = 5; putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(1), entities); long totalDocsWritten = docs * entities.size(); - waitUntilCheckpoint(CONTINUOUS_DATA_FRAME_ID, 2); + waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, expectedLastCheckpoint); + assertBusy(() ->assertThat( + getTransformStats(CONTINUOUS_DATA_FRAME_ID).getTransformStats().getNumDocuments(), + greaterThanOrEqualTo(totalDocsWritten + previousStateAndStats.getTransformStats().getNumDocuments())) + ); DataFrameTransformStateAndStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); - assertThat(stateAndStats.getTransformStats().getOutputDocuments(), equalTo((long)entities.size())); - assertThat(stateAndStats.getTransformStats().getNumDocuments(), - equalTo(totalDocsWritten + previousStateAndStats.getTransformStats().getNumDocuments())); assertThat(stateAndStats.getTransformState().getTaskState(), equalTo(DataFrameTransformTaskState.STARTED)); + assertThat(stateAndStats.getTransformStats().getOutputDocuments(), + greaterThan(previousStateAndStats.getTransformStats().getOutputDocuments())); + assertThat(stateAndStats.getTransformStats().getNumDocuments(), + greaterThanOrEqualTo(totalDocsWritten + previousStateAndStats.getTransformStats().getNumDocuments())); } private void putTransform(String id, DataFrameTransformConfig config) throws IOException { @@ -203,8 +215,8 @@ private DataFrameTransformStateAndStats getTransformStats(String id) throws IOEx } } - private void waitUntilCheckpoint(String id, long checkpoint) throws Exception { - assertBusy(() -> assertThat(getTransformStats(id).getTransformState().getCheckpoint(), equalTo(checkpoint)), + private void waitUntilAfterCheckpoint(String id, long currentCheckpoint) throws Exception { + assertBusy(() -> assertThat(getTransformStats(id).getTransformState().getCheckpoint(), greaterThan(currentCheckpoint)), 60, TimeUnit.SECONDS); } From f3ea05a2fc64925b29fa8ee33bbf4a26d4ee6564 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 27 Jun 2019 09:44:14 -0500 Subject: [PATCH 7/8] Adjusting indices used in BWC, and handling NPE for seq_no_stats --- .../DataFrameTransformsCheckpointService.java | 12 ++++ .../mixed_cluster/80_data_frame_jobs_crud.yml | 58 ++++++------------- .../old_cluster/80_data_frame_jobs_crud.yml | 20 ++++--- .../80_data_frame_jobs_crud.yml | 8 +-- 4 files changed, 44 insertions(+), 54 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java index fad9836b760d8..d028d3248c8f6 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java @@ -15,6 +15,8 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats; @@ -222,6 +224,16 @@ static Map extractIndexCheckPoints(ShardStats[] shards, Set checkpoints = checkpointsByIndex.get(indexName); diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml index efbd1ac4b4a4a..9454423e98953 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml @@ -1,30 +1,17 @@ --- "Test put batch data frame transforms on mixed cluster": - - do: - indices.create: - index: mixed-dataframe-transform-airline-data - body: - mappings: - properties: - time: - type: date - airline: - type: keyword - responsetime: - type: float - event_rate: - type: integer - do: cluster.health: - index: "mixed-dataframe-transform-airline-data" - wait_for_status: yellow + index: "dataframe-transform-airline-data" + wait_for_status: green + timeout: 70s - do: data_frame.put_data_frame_transform: transform_id: "mixed-simple-transform" body: > { - "source": { "index": "mixed-dataframe-transform-airline-data" }, + "source": { "index": "dataframe-transform-airline-data" }, "dest": { "index": "mixed-simple-transform-idx" }, "pivot": { "group_by": { "airline": {"terms": {"field": "airline"}}}, @@ -64,7 +51,7 @@ body: > { "source": { - "index": "mixed-dataframe-transform-airline-data", + "index": "dataframe-transform-airline-data", "query": { "bool": { "filter": {"term": {"airline": "ElasticAir"}} @@ -121,31 +108,18 @@ --- "Test put continuous data frame transform on mixed cluster": - - do: - indices.create: - index: mixed-dataframe-transform-airline-data-cont - body: - mappings: - properties: - time: - type: date - airline: - type: keyword - responsetime: - type: float - event_rate: - type: integer - do: cluster.health: - index: "mixed-dataframe-transform-airline-data-cont" - wait_for_status: yellow + index: "dataframe-transform-airline-data-cont" + wait_for_status: green + timeout: 70s - do: data_frame.put_data_frame_transform: transform_id: "mixed-simple-continuous-transform" body: > { - "source": { "index": "mixed-dataframe-transform-airline-data-cont" }, + "source": { "index": "dataframe-transform-airline-data-cont" }, "dest": { "index": "mixed-simple-continuous-transform-idx" }, "pivot": { "group_by": { "airline": {"terms": {"field": "airline"}}}, @@ -199,15 +173,16 @@ "Test GET, start, and stop old cluster batch transforms": - do: cluster.health: - index: ".data-frame-internal-1,old-dataframe-transform-airline-data" - wait_for_status: yellow + index: "dataframe-transform-airline-data" + wait_for_status: green + timeout: 70s - do: data_frame.get_data_frame_transform: transform_id: "old-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "old-simple-transform" } - - match: { transforms.0.source.index.0: "old-dataframe-transform-airline-data" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } - match: { transforms.0.dest.index: "old-simple-transform-idx" } - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } @@ -241,7 +216,7 @@ transform_id: "old-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "old-complex-transform" } - - match: { transforms.0.source.index.0: "old-dataframe-transform-airline-data" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } - match: { transforms.0.dest.index: "old-complex-transform-idx" } - match: { transforms.0.dest.pipeline: "data_frame_simple_pipeline" } - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } @@ -277,8 +252,9 @@ "Test GET, stop, start, old continuous transforms": - do: cluster.health: - index: ".data-frame-internal-1,old-dataframe-transform-airline-data-cont" - wait_for_status: yellow + index: "dataframe-transform-airline-data-cont" + wait_for_status: green + timeout: 70s - do: data_frame.get_data_frame_transform: diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml index a98f209a75bc2..7b666c2caa35f 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml @@ -2,7 +2,7 @@ "Test put batch data frame transforms on old cluster": - do: indices.create: - index: old-dataframe-transform-airline-data + index: dataframe-transform-airline-data body: mappings: properties: @@ -16,8 +16,9 @@ type: integer - do: cluster.health: - index: "old-dataframe-transform-airline-data" - wait_for_status: yellow + index: "dataframe-transform-airline-data" + wait_for_status: green + timeout: 70s - do: ingest.put_pipeline: @@ -38,7 +39,7 @@ transform_id: "old-simple-transform" body: > { - "source": { "index": "old-dataframe-transform-airline-data" }, + "source": { "index": "dataframe-transform-airline-data" }, "dest": { "index": "old-simple-transform-idx" }, "pivot": { "group_by": { "airline": {"terms": {"field": "airline"}}}, @@ -86,7 +87,7 @@ body: > { "source": { - "index": "old-dataframe-transform-airline-data", + "index": "dataframe-transform-airline-data", "query": { "bool": { "filter": {"term": {"airline": "ElasticAir"}} @@ -143,7 +144,7 @@ "Test put continuous data frame transform on old cluster": - do: indices.create: - index: old-dataframe-transform-airline-data-cont + index: dataframe-transform-airline-data-cont body: mappings: properties: @@ -157,15 +158,16 @@ type: integer - do: cluster.health: - index: "old-dataframe-transform-airline-data-cont" - wait_for_status: yellow + index: "dataframe-transform-airline-data-cont" + wait_for_status: green + timeout: 70s - do: data_frame.put_data_frame_transform: transform_id: "old-simple-continuous-transform" body: > { - "source": { "index": "old-dataframe-transform-airline-data-cont" }, + "source": { "index": "dataframe-transform-airline-data-cont" }, "dest": { "index": "old-simple-continuous-transform-idx" }, "pivot": { "group_by": { "airline": {"terms": {"field": "airline"}}}, diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml index 1efa2e1eb0b2b..ea63950d7fc64 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -13,7 +13,7 @@ setup: transform_id: "old-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "old-simple-transform" } - - match: { transforms.0.source.index.0: "old-dataframe-transform-airline-data" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } - match: { transforms.0.dest.index: "old-simple-transform-idx" } - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } @@ -46,7 +46,7 @@ setup: transform_id: "old-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "old-complex-transform" } - - match: { transforms.0.source.index.0: "old-dataframe-transform-airline-data" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } - match: { transforms.0.dest.index: "old-complex-transform-idx" } - match: { transforms.0.dest.pipeline: "data_frame_simple_pipeline" } - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } @@ -84,7 +84,7 @@ setup: transform_id: "mixed-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "mixed-simple-transform" } - - match: { transforms.0.source.index.0: "mixed-dataframe-transform-airline-data" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } - match: { transforms.0.dest.index: "mixed-simple-transform-idx" } - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } @@ -118,7 +118,7 @@ setup: transform_id: "mixed-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "mixed-complex-transform" } - - match: { transforms.0.source.index.0: "mixed-dataframe-transform-airline-data" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } - match: { transforms.0.dest.index: "mixed-complex-transform-idx" } - match: { transforms.0.dest.pipeline: "data_frame_simple_pipeline" } - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } From 2949df79bbc356e4bbdfa397eb9f12f6c2716a26 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 3 Jul 2019 07:53:47 -0500 Subject: [PATCH 8/8] updating and muting specific bwc test --- .../upgrades/DataFrameSurvivesUpgradeIT.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java index 10971808a91ac..1aee9130ddc50 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java @@ -7,6 +7,7 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; +import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -47,6 +48,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; +@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/43662") public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version")); @@ -119,7 +121,7 @@ private void createAndStartContinuousDataFrame() throws Exception { } DataFrameTransformConfig config = DataFrameTransformConfig.builder() - .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(30))) .setPivotConfig(PivotConfig.builder() .setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("stars").field("stars"))) .setGroups(GroupConfig.builder().groupBy("user_id", TermsGroupSource.builder().setField("user_id").build()).build()) @@ -148,24 +150,28 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t DataFrameTransformStateAndStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); assertThat(stateAndStats.getTransformState().getTaskState(), equalTo(DataFrameTransformTaskState.STARTED)); }, - 60, + 120, TimeUnit.SECONDS); DataFrameTransformStateAndStats previousStateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); - // Add a new user and write data to it and all the old users - List entities = new ArrayList<>(ENTITIES); - entities.add("user_" + ENTITIES.size() + 1); + // Add a new user and write data to it + // This is so we can have more reliable data counts, as writing to existing entities requires + // rescanning the past data + List entities = new ArrayList<>(1); + entities.add("user_" + ENTITIES.size() + expectedLastCheckpoint); int docs = 5; + // Index the data very recently in the past so that the transform sync delay can catch up to reading it in our spin + // wait later. putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(1), entities); - long totalDocsWritten = docs * entities.size(); waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, expectedLastCheckpoint); - assertBusy(() ->assertThat( + assertBusy(() -> assertThat( getTransformStats(CONTINUOUS_DATA_FRAME_ID).getTransformStats().getNumDocuments(), - greaterThanOrEqualTo(totalDocsWritten + previousStateAndStats.getTransformStats().getNumDocuments())) - ); + greaterThanOrEqualTo(docs + previousStateAndStats.getTransformStats().getNumDocuments())), + 120, + TimeUnit.SECONDS); DataFrameTransformStateAndStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); assertThat(stateAndStats.getTransformState().getTaskState(), @@ -173,7 +179,7 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t assertThat(stateAndStats.getTransformStats().getOutputDocuments(), greaterThan(previousStateAndStats.getTransformStats().getOutputDocuments())); assertThat(stateAndStats.getTransformStats().getNumDocuments(), - greaterThanOrEqualTo(totalDocsWritten + previousStateAndStats.getTransformStats().getNumDocuments())); + greaterThanOrEqualTo(docs + previousStateAndStats.getTransformStats().getNumDocuments())); } private void putTransform(String id, DataFrameTransformConfig config) throws IOException {