From c34f030e634018d6c64171bfa9baa75de5ecabec Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 28 Apr 2020 12:10:01 +0800 Subject: [PATCH 1/7] The filter used by Metadata-only queries should not have Unevaluable --- .../execution/OptimizeMetadataOnlyQuery.scala | 4 +++ .../OptimizeMetadataOnlyQuerySuite.scala | 26 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index 47284f3448b7b..e3491fbf05872 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -119,6 +119,10 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic } } + if (normalizedFilters.exists(_.find(_.isInstanceOf[Unevaluable]).isDefined)) { + return child + } + child transform { case plan if plan eq relation => relation match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala index afb438e0bbc72..4690fb715f3f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala @@ -150,4 +150,30 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-31590 The filter used by Metadata-only queries should not have Unevaluable") { + withTable("test_tbl") { + withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") { + sql("CREATE TABLE test_tbl (a INT,d STRING,h STRING) USING PARQUET PARTITIONED BY (d ,h)") + sql(""" + |INSERT OVERWRITE TABLE test_tbl PARTITION(d,h) + |SELECT 1,'2020-01-01','23' + |UNION ALL + |SELECT 2,'2020-01-02','01' + |UNION ALL + |SELECT 3,'2020-01-02','02' + """.stripMargin) + sql( + s""" + |SELECT d, MAX(h) AS h + |FROM test_tbl + |WHERE d= ( + | SELECT MAX(d) AS d + | FROM test_tbl + |) + |GROUP BY d + """.stripMargin).collect() + } + } + } } From 86f28d51f343becc6a516f726d20efd4d12c7708 Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 1 May 2020 12:24:59 +0800 Subject: [PATCH 2/7] use filterNot(SubqueryExpression.hasSubquery) --- .../spark/sql/execution/OptimizeMetadataOnlyQuery.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index e3491fbf05872..492d177c7c773 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -117,11 +117,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic case a: AttributeReference => a.withName(relation.output.find(_.semanticEquals(a)).get.name) } - } - - if (normalizedFilters.exists(_.find(_.isInstanceOf[Unevaluable]).isDefined)) { - return child - } + }.filterNot(SubqueryExpression.hasSubquery) child transform { case plan if plan eq relation => From c8485086ffee7e5ca2ad7a1e24e10081070d135d Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 1 May 2020 13:28:58 +0800 Subject: [PATCH 3/7] change ut --- .../OptimizeMetadataOnlyQuerySuite.scala | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala index 4690fb715f3f2..2306c9f85c6ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala @@ -152,27 +152,27 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSparkSession { } test("SPARK-31590 The filter used by Metadata-only queries should not have Unevaluable") { - withTable("test_tbl") { - withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") { - sql("CREATE TABLE test_tbl (a INT,d STRING,h STRING) USING PARQUET PARTITIONED BY (d ,h)") - sql(""" - |INSERT OVERWRITE TABLE test_tbl PARTITION(d,h) - |SELECT 1,'2020-01-01','23' - |UNION ALL - |SELECT 2,'2020-01-02','01' - |UNION ALL - |SELECT 3,'2020-01-02','02' - """.stripMargin) - sql( - s""" - |SELECT d, MAX(h) AS h - |FROM test_tbl - |WHERE d= ( - | SELECT MAX(d) AS d - | FROM test_tbl - |) - |GROUP BY d - """.stripMargin).collect() + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { + val df = sql( + """ + |SELECT partcol1, MAX(partcol2) AS partcol2 + |FROM srcpart + |WHERE partcol1 = ( + | SELECT MAX(partcol1) + | FROM srcpart + |) + |GROUP BY partcol1 + """.stripMargin) + val localRelations = df.queryExecution.optimizedPlan.collect { + case l@LocalRelation(_, _, _) => l + } + if (enableOptimizeMetadataOnlyQuery) { + assert(localRelations.size == 1) + } else { + assert(localRelations.size == 0) + } + df.collect() } } } From 4abf2f5ac87296497cee68035ad7c0ea91cf3547 Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 1 May 2020 15:07:11 +0800 Subject: [PATCH 4/7] reuse testMetadataOnly --- .../OptimizeMetadataOnlyQuerySuite.scala | 40 +++++++------------ 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala index 2306c9f85c6ab..331aaafbce0cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala @@ -103,6 +103,20 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSparkSession { "select partcol2, min(partcol1) from srcpart where partcol1 = 0 group by partcol2", "select max(c1) from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t") + testMetadataOnly( + "SPARK-31590 The filter used by Metadata-only queries should not have Unevaluable", + """ + |SELECT partcol1, MAX(partcol2) AS partcol2 + |FROM srcpart + |WHERE partcol1 = ( + | SELECT MAX(partcol1) + | FROM srcpart + |) + |AND partcol2= 'event' + |GROUP BY partcol1 + |""".stripMargin + ) + testNotMetadataOnly( "Don't optimize metadata only query for non-partition columns", "select col1 from srcpart group by col1", @@ -150,30 +164,4 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSparkSession { } } } - - test("SPARK-31590 The filter used by Metadata-only queries should not have Unevaluable") { - Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => - withSQLConf(OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { - val df = sql( - """ - |SELECT partcol1, MAX(partcol2) AS partcol2 - |FROM srcpart - |WHERE partcol1 = ( - | SELECT MAX(partcol1) - | FROM srcpart - |) - |GROUP BY partcol1 - """.stripMargin) - val localRelations = df.queryExecution.optimizedPlan.collect { - case l@LocalRelation(_, _, _) => l - } - if (enableOptimizeMetadataOnlyQuery) { - assert(localRelations.size == 1) - } else { - assert(localRelations.size == 0) - } - df.collect() - } - } - } } From 7046db81d1a830c97a20f8e690cfb507c712da82 Mon Sep 17 00:00:00 2001 From: sychen Date: Sat, 2 May 2020 11:18:31 +0800 Subject: [PATCH 5/7] nit --- .../spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala index 331aaafbce0cb..d32d22d98d78e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala @@ -104,7 +104,8 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSparkSession { "select max(c1) from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t") testMetadataOnly( - "SPARK-31590 The filter used by Metadata-only queries should not have Unevaluable", + "SPARK-31590 " + + "The filter used by Metadata-only queries should filter out all the unevaluable expr", """ |SELECT partcol1, MAX(partcol2) AS partcol2 |FROM srcpart @@ -112,7 +113,7 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSparkSession { | SELECT MAX(partcol1) | FROM srcpart |) - |AND partcol2= 'event' + |AND partcol2 = 'event' |GROUP BY partcol1 |""".stripMargin ) From a7638d602fca15911c99fd14703f144902f68f47 Mon Sep 17 00:00:00 2001 From: sychen Date: Sat, 2 May 2020 11:40:12 +0800 Subject: [PATCH 6/7] ut use existing partition value --- .../spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala index d32d22d98d78e..0e12ca6974f99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala @@ -113,7 +113,7 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSparkSession { | SELECT MAX(partcol1) | FROM srcpart |) - |AND partcol2 = 'event' + |AND partcol2 = 'even' |GROUP BY partcol1 |""".stripMargin ) From d601af42e32cd74e99c691b6870eebf17845ae09 Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 4 May 2020 15:46:22 +0800 Subject: [PATCH 7/7] change ut name --- .../spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala index 0e12ca6974f99..68691e2f7fdac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala @@ -104,8 +104,7 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSparkSession { "select max(c1) from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t") testMetadataOnly( - "SPARK-31590 " + - "The filter used by Metadata-only queries should filter out all the unevaluable expr", + "SPARK-31590 Metadata-only queries should not include subquery in partition filters", """ |SELECT partcol1, MAX(partcol2) AS partcol2 |FROM srcpart