From aa2fc5c78bfba9c6c1929a92a5c98f659cff835d Mon Sep 17 00:00:00 2001 From: chenzhx Date: Wed, 25 May 2022 16:16:33 +0800 Subject: [PATCH 01/10] [SPARK-38899][SQL]DS V2 supports push down datetime functions --- .../sql/connector/expressions/Extract.java | 120 ++++++++++++++++++ .../expressions/GeneralScalarExpression.java | 18 +++ .../util/V2ExpressionSQLBuilder.java | 18 +++ .../catalyst/util/V2ExpressionBuilder.scala | 47 ++++++- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 28 +++- .../apache/spark/sql/jdbc/JdbcDialects.scala | 11 ++ .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 70 +++++++++- 7 files changed, 307 insertions(+), 5 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java new file mode 100644 index 0000000000000..062baaa976df4 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions; + +import org.apache.spark.annotation.Evolving; + +import java.io.Serializable; + +/** + * Represent an extract function, which extracts and returns the value of a + * and a source expression where the field should be extracted. + *

+ * The currently supported field names: + *

    + *
  1. Field Name: SECOND + *
      + *
    • SQL semantic: EXTRACT(SECOND FROM source)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  2. + *
  3. Field Name: MINUTE + *
      + *
    • SQL semantic: EXTRACT(MINUTE FROM source)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  4. + *
  5. Field Name: HOUR + *
      + *
    • SQL semantic: EXTRACT(HOUR FROM source)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  6. + *
  7. Field Name: MONTH + *
      + *
    • SQL semantic: EXTRACT(MONTH FROM source)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  8. + *
  9. Field Name: QUARTER + *
      + *
    • SQL semantic: EXTRACT(QUARTER FROM source)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  10. + *
  11. Field Name: YEAR + *
      + *
    • SQL semantic: EXTRACT(YEAR FROM source)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  12. + *
  13. Field Name: ISO_DAY_OF_WEEK + *
      + *
    • SQL semantic: EXTRACT(ISO_DAY_OF_WEEK FROM source)
    • + *
    • Database dialects need to follow ISO semantics when handling ISO_DAY_OF_WEEK.
    • + *
    • Since version: 3.4.0
    • + *
    + *
  14. + *
  15. Field Name: DAY + *
      + *
    • SQL semantic: EXTRACT(DAY FROM source)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  16. + *
  17. Field Name: DOY + *
      + *
    • SQL semantic: EXTRACT(DOY FROM source)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  18. + *
  19. Field Name: WEEK + *
      + *
    • SQL semantic: EXTRACT(WEEK FROM source)
    • + *
    • Database dialects need to follow ISO semantics when handling WEEK.
    • + *
    • Since version: 3.4.0
    • + *
    + *
  20. + *
  21. Field Name: YEAR_OF_WEEK + *
      + *
    • SQL semantic: EXTRACT(YEAR_OF_WEEK FROM source)
    • + *
    • Database dialects need to follow ISO semantics when handling YEAR_OF_WEEK.
    • + *
    • Since version: 3.4.0
    • + *
    + *
  22. + *
+ * + * @since 3.4.0 + */ + +@Evolving +public class Extract implements Expression, Serializable { + + private String field; + private Expression source; + + public Extract(String field, Expression source) { + this.field = field; + this.source = source; + } + + public String field() { return field; } + public Expression source() { return source; } + + @Override + public Expression[] children() { return new Expression[]{ source() }; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java index ab9e33e86be77..53c511a87f691 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java @@ -346,6 +346,24 @@ *
  • Since version: 3.4.0
  • * * + *
  • Name: DATE_ADD + * + *
  • + *
  • Name: DATE_DIFF + * + *
  • + *
  • Name: TRUNC + * + *
  • * * Note: SQL semantic conforms ANSI standard, so some expressions are not supported when ANSI off, * including: add, subtract, multiply, divide, remainder, pmod. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java index 9b62fedcc8055..d91a3b72d1d79 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java @@ -23,6 +23,7 @@ import org.apache.spark.sql.connector.expressions.Cast; import org.apache.spark.sql.connector.expressions.Expression; +import org.apache.spark.sql.connector.expressions.Extract; import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.expressions.GeneralScalarExpression; import org.apache.spark.sql.connector.expressions.Literal; @@ -46,6 +47,9 @@ public String build(Expression expr) { } else if (expr instanceof Cast) { Cast cast = (Cast) expr; return visitCast(build(cast.expression()), cast.dataType()); + } else if (expr instanceof Extract) { + Extract extract = (Extract) expr; + return visitExtract(extract.field(), build(extract.source())); } else if (expr instanceof GeneralScalarExpression) { GeneralScalarExpression e = (GeneralScalarExpression) expr; String name = e.name(); @@ -136,6 +140,9 @@ public String build(Expression expr) { case "UPPER": case "LOWER": case "TRANSLATE": + case "DATE_ADD": + case "DATE_DIFF": + case "TRUNC": return visitSQLFunction(name, Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new)); case "CASE_WHEN": { @@ -327,4 +334,15 @@ protected String visitTrim(String direction, String[] inputs) { return "TRIM(" + direction + " " + inputs[1] + " FROM " + inputs[0] + ")"; } } + + protected String visitExtract(String field, String source) { + switch (field) { + case "DAY_OF_WEEK": + return "(EXTRACT(ISO_DAY_OF_WEEK FROM " + source + ") % 7)+ 1"; + case "WEEK_DAY": + return "EXTRACT(ISO_DAY_OF_WEEK FROM " + source + ") -1"; + default: + return "EXTRACT(" + field + " FROM " + source + ")"; + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index 163e071f08ead..c361f85e68c51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc} +import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, Extract => V2Extract, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc} import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate} import org.apache.spark.sql.types.BooleanType @@ -344,6 +344,51 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { } else { None } + case date: DateAdd => + val childrenExpressions = date.children.flatMap(generateExpression(_)) + if (childrenExpressions.length == date.children.length) { + Some(new GeneralScalarExpression("DATE_ADD", childrenExpressions.toArray[V2Expression])) + } else { + None + } + case date: DateDiff => + val childrenExpressions = date.children.flatMap(generateExpression(_)) + if (childrenExpressions.length == date.children.length) { + Some(new GeneralScalarExpression("DATE_DIFF", childrenExpressions.toArray[V2Expression])) + } else { + None + } + case date: TruncDate => + val childrenExpressions = date.children.flatMap(generateExpression(_)) + if (childrenExpressions.length == date.children.length) { + Some(new GeneralScalarExpression("TRUNC", childrenExpressions.toArray[V2Expression])) + } else { + None + } + case Second(child, _) => + generateExpression(child).map(v => new V2Extract("SECOND", v)) + case Minute(child, _) => + generateExpression(child).map(v => new V2Extract("MINUTE", v)) + case Hour(child, _) => + generateExpression(child).map(v => new V2Extract("HOUR", v)) + case Month(child) => + generateExpression(child).map(v => new V2Extract("MONTH", v)) + case Quarter(child) => + generateExpression(child).map(v => new V2Extract("QUARTER", v)) + case Year(child) => + generateExpression(child).map(v => new V2Extract("YEAR", v)) + case DayOfWeek(child) => + generateExpression(child).map(v => new V2Extract("DAY_OF_WEEK", v)) + case WeekDay(child) => + generateExpression(child).map(v => new V2Extract("WEEK_DAY", v)) + case DayOfMonth(child) => + generateExpression(child).map(v => new V2Extract("DAY", v)) + case DayOfYear(child) => + generateExpression(child).map(v => new V2Extract("DOY", v)) + case WeekOfYear(child) => + generateExpression(child).map(v => new V2Extract("WEEK", v)) + case YearOfWeek(child) => + generateExpression(child).map(v => new V2Extract("YEAR_OF_WEEK", v)) // TODO supports other expressions case ApplyFunctionExpression(function, children) => val childrenExpressions = children.flatMap(generateExpression(_)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 124cb001b5cf3..b368863fec677 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -22,10 +22,12 @@ import java.util.Locale import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog.functions.UnboundFunction +import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType} @@ -38,7 +40,9 @@ private[sql] object H2Dialect extends JdbcDialect { Set("ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LOG", "LOG10", "LN", "EXP", "POWER", "SQRT", "FLOOR", "CEIL", "ROUND", "SIN", "SINH", "COS", "COSH", "TAN", "TANH", "COT", "ASIN", "ACOS", "ATAN", "ATAN2", "DEGREES", "RADIANS", "SIGN", - "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM") + "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM", "SECOND", "MINUTE", + "HOUR", "MONTH", "QUARTER", "YEAR", "DAY", "DOY", "DAY_OF_WEEK", "WEEK_DAY", + "WEEK", "YEAR_OF_WEEK") override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) @@ -123,4 +127,26 @@ private[sql] object H2Dialect extends JdbcDialect { } super.classifyException(message, e) } + + override def compileExpression(expr: Expression): Option[String] = { + val jdbcSQLBuilder = new H2JDBCSQLBuilder() + try { + Some(jdbcSQLBuilder.build(expr)) + } catch { + case NonFatal(e) => + logWarning("Error occurs while compiling V2 expression", e) + None + } + } + + class H2JDBCSQLBuilder extends JDBCSQLBuilder { + + override def visitExtract(field: String, source: String): String = { + field match { + case "WEEK" => s"EXTRACT(ISO_WEEK FROM $source)" + case "YEAR_OF_WEEK" => s"EXTRACT(ISO_WEEK_YEAR FROM $source)" + case _ => super.visitExtract(field, source) + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index fa2a45be1852a..a313f627e604e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -270,6 +270,17 @@ abstract class JdbcDialect extends Serializable with Logging{ s"${this.getClass.getSimpleName} does not support function: TRIM") } } + + override def visitExtract(field: String, source: String): String = { + if (isSupportedFunction(field)) { + super.visitExtract(field, source) + } else { + // The framework will catch the error and give up the push-down. + // Please see `JdbcDialect.compileExpression(expr: Expression)` for more details. + throw new UnsupportedOperationException( + s"${this.getClass.getSimpleName} does not support function: EXTRACT") + } + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 90ab976d9d5e7..c0c39f7b9b2a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -182,6 +182,14 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "(1, 'bottle', 11111111111111111111.123)").executeUpdate() conn.prepareStatement("INSERT INTO \"test\".\"item\" VALUES " + "(1, 'bottle', 99999999999999999999.123)").executeUpdate() + + conn.prepareStatement( + "CREATE TABLE \"test\".\"datetime\" (name TEXT(32), date1 DATE, time1 TIMESTAMP)") + .executeUpdate() + conn.prepareStatement("INSERT INTO \"test\".\"datetime\" VALUES " + + "('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate() + conn.prepareStatement("INSERT INTO \"test\".\"datetime\" VALUES " + + "('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate() } H2Dialect.registerFunction("my_avg", IntegralAverage) H2Dialect.registerFunction("my_strlen", StrLen(CharLength)) @@ -1026,14 +1034,69 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel |AND cast(dept as short) > 1 |AND cast(bonus as decimal(20, 2)) > 1200""".stripMargin) checkFiltersRemoved(df6, ansiMode) - val expectedPlanFragment8 = if (ansiMode) { + val expectedPlanFragment6 = if (ansiMode) { "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL, " + "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, ...," } else { "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL]," } - checkPushedInfo(df6, expectedPlanFragment8) + checkPushedInfo(df6, expectedPlanFragment6) checkAnswer(df6, Seq(Row(2, "david", 10000, 1300, true))) + + val df7 = sql("SELECT name FROM h2.test.datetime WHERE " + + "dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ") + checkFiltersRemoved(df7) + val expectedPlanFragment7 = + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DOY FROM DATE1) > 100, " + + "EXTRACT(DAY FROM DATE1) > 10]" + checkPushedInfo(df7, expectedPlanFragment7) + checkAnswer(df7, Seq(Row("amy"), Row("alex"))) + + val df8 = sql("SELECT name FROM h2.test.datetime WHERE " + + "year(date1) = 2022 AND quarter(date1) = 2 AND month(date1) = 5") + checkFiltersRemoved(df8) + val expectedPlanFragment8 = + "[DATE1 IS NOT NULL, EXTRACT(YEAR FROM DATE1) = 2022, " + + "EXTRACT(QUARTER FROM DATE1) = 2, EXTRACT(MON...," + checkPushedInfo(df8, expectedPlanFragment8) + checkAnswer(df8, Seq(Row("amy"), Row("alex"))) + + val df9 = sql("SELECT name FROM h2.test.datetime WHERE " + + "hour(time1) = 0 AND minute(time1) = 0 AND second(time1) = 0") + checkFiltersRemoved(df9) + val expectedPlanFragment9 = + "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " + + "EXTRACT(MINUTE FROM TIME1) = 0, EXTRACT(SECOND ..." + checkPushedInfo(df9, expectedPlanFragment9) + checkAnswer(df9, Seq(Row("amy"), Row("alex"))) + + val df10 = sql("SELECT name FROM h2.test.datetime WHERE " + + "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") + checkFiltersRemoved(df10) + val expectedPlanFragment10 = + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " + + "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]" + checkPushedInfo(df10, expectedPlanFragment10) + checkAnswer(df10, Seq(Row("alex"), Row("amy"))) + + // H2 does not support + val df11 = sql("SELECT name FROM h2.test.datetime WHERE " + + "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " + + "AND datediff(date1, '2022-05-10') > 0") + checkFiltersRemoved(df11, false) + val expectedPlanFragment11 = + "PushedFilters: [DATE1 IS NOT NULL]" + checkPushedInfo(df11, expectedPlanFragment11) + checkAnswer(df11, Seq(Row("amy"))) + + val df12 = sql("SELECT name FROM h2.test.datetime WHERE " + + "weekday(date1) = 2 AND dayofweek(date1) = 4") + checkFiltersRemoved(df12) + val expectedPlanFragment12 = + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(ISO_DAY_OF_WEEK FROM DATE1) -1 = 2, " + + "(EXTRACT(ISO_DAY_OF_WEEK FROM DAT..." + checkPushedInfo(df12, expectedPlanFragment12) + checkAnswer(df12, Seq(Row("alex"))) } } } @@ -1116,7 +1179,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "people", false), Row("test", "empty_table", false), Row("test", "employee", false), Row("test", "item", false), Row("test", "dept", false), - Row("test", "person", false), Row("test", "view1", false), Row("test", "view2", false))) + Row("test", "person", false), Row("test", "view1", false), Row("test", "view2", false), + Row("test", "datetime", false))) } test("SQL API: create table as select") { From 778adb6f39e70a9f362db718a71da9bd84ccb29f Mon Sep 17 00:00:00 2001 From: chenzhx Date: Thu, 30 Jun 2022 19:53:42 +0800 Subject: [PATCH 02/10] Update code --- .../sql/connector/expressions/Extract.java | 82 +++---------------- 1 file changed, 12 insertions(+), 70 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java index 062baaa976df4..ad8a155432eb4 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java @@ -25,77 +25,19 @@ * Represent an extract function, which extracts and returns the value of a * and a source expression where the field should be extracted. *

    - * The currently supported field names: + * The currently supported fields names following the ISO standard: *

      - *
    1. Field Name: SECOND - *
        - *
      • SQL semantic: EXTRACT(SECOND FROM source)
      • - *
      • Since version: 3.4.0
      • - *
      - *
    2. - *
    3. Field Name: MINUTE - *
        - *
      • SQL semantic: EXTRACT(MINUTE FROM source)
      • - *
      • Since version: 3.4.0
      • - *
      - *
    4. - *
    5. Field Name: HOUR - *
        - *
      • SQL semantic: EXTRACT(HOUR FROM source)
      • - *
      • Since version: 3.4.0
      • - *
      - *
    6. - *
    7. Field Name: MONTH - *
        - *
      • SQL semantic: EXTRACT(MONTH FROM source)
      • - *
      • Since version: 3.4.0
      • - *
      - *
    8. - *
    9. Field Name: QUARTER - *
        - *
      • SQL semantic: EXTRACT(QUARTER FROM source)
      • - *
      • Since version: 3.4.0
      • - *
      - *
    10. - *
    11. Field Name: YEAR - *
        - *
      • SQL semantic: EXTRACT(YEAR FROM source)
      • - *
      • Since version: 3.4.0
      • - *
      - *
    12. - *
    13. Field Name: ISO_DAY_OF_WEEK - *
        - *
      • SQL semantic: EXTRACT(ISO_DAY_OF_WEEK FROM source)
      • - *
      • Database dialects need to follow ISO semantics when handling ISO_DAY_OF_WEEK.
      • - *
      • Since version: 3.4.0
      • - *
      - *
    14. - *
    15. Field Name: DAY - *
        - *
      • SQL semantic: EXTRACT(DAY FROM source)
      • - *
      • Since version: 3.4.0
      • - *
      - *
    16. - *
    17. Field Name: DOY - *
        - *
      • SQL semantic: EXTRACT(DOY FROM source)
      • - *
      • Since version: 3.4.0
      • - *
      - *
    18. - *
    19. Field Name: WEEK - *
        - *
      • SQL semantic: EXTRACT(WEEK FROM source)
      • - *
      • Database dialects need to follow ISO semantics when handling WEEK.
      • - *
      • Since version: 3.4.0
      • - *
      - *
    20. - *
    21. Field Name: YEAR_OF_WEEK - *
        - *
      • SQL semantic: EXTRACT(YEAR_OF_WEEK FROM source)
      • - *
      • Database dialects need to follow ISO semantics when handling YEAR_OF_WEEK.
      • - *
      • Since version: 3.4.0
      • - *
      - *
    22. + *
    23. SECOND Since 3.4.0
    24. + *
    25. MINUTE Since 3.4.0
    26. + *
    27. HOUR Since 3.4.0
    28. + *
    29. MONTH Since 3.4.0
    30. + *
    31. QUARTER Since 3.4.0
    32. + *
    33. YEAR Since 3.4.0
    34. + *
    35. ISO_DAY_OF_WEEK Since 3.4.0
    36. + *
    37. DAY Since 3.4.0
    38. + *
    39. DOY Since 3.4.0
    40. + *
    41. WEEK Since 3.4.0
    42. + *
    43. YEAR_OF_WEEK Since 3.4.0
    44. *
    * * @since 3.4.0 From f87296ca229c29a2c311f9bd794a39be4280a77d Mon Sep 17 00:00:00 2001 From: chenzhx Date: Fri, 1 Jul 2022 08:08:57 +0800 Subject: [PATCH 03/10] Update code --- .../org/apache/spark/sql/connector/expressions/Extract.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java index ad8a155432eb4..dd239954b9ec9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java @@ -23,7 +23,7 @@ /** * Represent an extract function, which extracts and returns the value of a - * and a source expression where the field should be extracted. + * specified datetime field from a datetime or interval value expression. *

    * The currently supported fields names following the ISO standard: *

      From fb7c63febc650d2f7163ea2f1714297e25e1bcbb Mon Sep 17 00:00:00 2001 From: chenzhx Date: Fri, 1 Jul 2022 15:55:12 +0800 Subject: [PATCH 04/10] Update code --- .../spark/sql/connector/expressions/Extract.java | 2 +- .../sql/connector/util/V2ExpressionSQLBuilder.java | 9 +-------- .../spark/sql/catalyst/util/V2ExpressionBuilder.scala | 10 +++++++--- .../scala/org/apache/spark/sql/jdbc/H2Dialect.scala | 5 +++-- .../scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 4 ++-- 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java index dd239954b9ec9..42e656b33577a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java @@ -33,7 +33,7 @@ *
    1. MONTH Since 3.4.0
    2. *
    3. QUARTER Since 3.4.0
    4. *
    5. YEAR Since 3.4.0
    6. - *
    7. ISO_DAY_OF_WEEK Since 3.4.0
    8. + *
    9. DAY_OF_WEEK Since 3.4.0
    10. *
    11. DAY Since 3.4.0
    12. *
    13. DOY Since 3.4.0
    14. *
    15. WEEK Since 3.4.0
    16. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java index d91a3b72d1d79..2a01102614908 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java @@ -336,13 +336,6 @@ protected String visitTrim(String direction, String[] inputs) { } protected String visitExtract(String field, String source) { - switch (field) { - case "DAY_OF_WEEK": - return "(EXTRACT(ISO_DAY_OF_WEEK FROM " + source + ") % 7)+ 1"; - case "WEEK_DAY": - return "EXTRACT(ISO_DAY_OF_WEEK FROM " + source + ") -1"; - default: - return "EXTRACT(" + field + " FROM " + source + ")"; - } + return "EXTRACT(" + field + " FROM " + source + ")"; } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index c361f85e68c51..7f32defadb757 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, Extract => V2Extract, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc} import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate} -import org.apache.spark.sql.types.BooleanType +import org.apache.spark.sql.types.{BooleanType, IntegerType} /** * The builder to generate V2 expressions from catalyst expressions. @@ -378,9 +378,13 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { case Year(child) => generateExpression(child).map(v => new V2Extract("YEAR", v)) case DayOfWeek(child) => - generateExpression(child).map(v => new V2Extract("DAY_OF_WEEK", v)) + generateExpression(child).map(v => new GeneralScalarExpression("+", + Array[V2Expression](new GeneralScalarExpression("%", + Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(7, IntegerType))), + LiteralValue(1, IntegerType)))) case WeekDay(child) => - generateExpression(child).map(v => new V2Extract("WEEK_DAY", v)) + generateExpression(child).map(v => new GeneralScalarExpression("-", + Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(1, IntegerType)))) case DayOfMonth(child) => generateExpression(child).map(v => new V2Extract("DAY", v)) case DayOfYear(child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index b368863fec677..9f1becfa55e20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -41,8 +41,8 @@ private[sql] object H2Dialect extends JdbcDialect { "POWER", "SQRT", "FLOOR", "CEIL", "ROUND", "SIN", "SINH", "COS", "COSH", "TAN", "TANH", "COT", "ASIN", "ACOS", "ATAN", "ATAN2", "DEGREES", "RADIANS", "SIGN", "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM", "SECOND", "MINUTE", - "HOUR", "MONTH", "QUARTER", "YEAR", "DAY", "DOY", "DAY_OF_WEEK", "WEEK_DAY", - "WEEK", "YEAR_OF_WEEK") + "HOUR", "MONTH", "QUARTER", "YEAR", "DAY", "DOY", "DAY_OF_WEEK", "YEAR_OF_WEEK", + "WEEK") override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) @@ -143,6 +143,7 @@ private[sql] object H2Dialect extends JdbcDialect { override def visitExtract(field: String, source: String): String = { field match { + case "DAY_OF_WEEK" => s"EXTRACT(ISO_DAY_OF_WEEK FROM $source)" case "WEEK" => s"EXTRACT(ISO_WEEK FROM $source)" case "YEAR_OF_WEEK" => s"EXTRACT(ISO_WEEK_YEAR FROM $source)" case _ => super.visitExtract(field, source) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index c0c39f7b9b2a7..4604ac587b6b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -1093,8 +1093,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "weekday(date1) = 2 AND dayofweek(date1) = 4") checkFiltersRemoved(df12) val expectedPlanFragment12 = - "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(ISO_DAY_OF_WEEK FROM DATE1) -1 = 2, " + - "(EXTRACT(ISO_DAY_OF_WEEK FROM DAT..." + "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2, " + + "((EXTRACT(DAY_OF_WEEK FROM DATE1) ..." checkPushedInfo(df12, expectedPlanFragment12) checkAnswer(df12, Seq(Row("alex"))) } From 3117c32f25e5386cb71c8c0e2ded679fd84781a8 Mon Sep 17 00:00:00 2001 From: chenzhx Date: Fri, 1 Jul 2022 19:34:27 +0800 Subject: [PATCH 05/10] Update code --- .../sql/connector/expressions/Extract.java | 2 +- .../catalyst/util/V2ExpressionBuilder.scala | 4 +- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 4 +- .../apache/spark/sql/jdbc/JdbcDialects.scala | 11 --- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 67 ++++++++++++------- 5 files changed, 46 insertions(+), 42 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java index 42e656b33577a..a925f1ee31a98 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java @@ -35,7 +35,7 @@ *
    17. YEAR Since 3.4.0
    18. *
    19. DAY_OF_WEEK Since 3.4.0
    20. *
    21. DAY Since 3.4.0
    22. - *
    23. DOY Since 3.4.0
    24. + *
    25. DAY_OF_YEAR Since 3.4.0
    26. *
    27. WEEK Since 3.4.0
    28. *
    29. YEAR_OF_WEEK Since 3.4.0
    30. *
    diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index 7f32defadb757..f66c80f3bcf47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -377,18 +377,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { generateExpression(child).map(v => new V2Extract("QUARTER", v)) case Year(child) => generateExpression(child).map(v => new V2Extract("YEAR", v)) + // translate the DayOfWeek function in Spark using ISO standards case DayOfWeek(child) => generateExpression(child).map(v => new GeneralScalarExpression("+", Array[V2Expression](new GeneralScalarExpression("%", Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(7, IntegerType))), LiteralValue(1, IntegerType)))) + // translate the WeekDay function in Spark using ISO standards case WeekDay(child) => generateExpression(child).map(v => new GeneralScalarExpression("-", Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(1, IntegerType)))) case DayOfMonth(child) => generateExpression(child).map(v => new V2Extract("DAY", v)) case DayOfYear(child) => - generateExpression(child).map(v => new V2Extract("DOY", v)) + generateExpression(child).map(v => new V2Extract("DAY_OF_YEAR", v)) case WeekOfYear(child) => generateExpression(child).map(v => new V2Extract("WEEK", v)) case YearOfWeek(child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 9f1becfa55e20..ac1de7d5974e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -40,9 +40,7 @@ private[sql] object H2Dialect extends JdbcDialect { Set("ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LOG", "LOG10", "LN", "EXP", "POWER", "SQRT", "FLOOR", "CEIL", "ROUND", "SIN", "SINH", "COS", "COSH", "TAN", "TANH", "COT", "ASIN", "ACOS", "ATAN", "ATAN2", "DEGREES", "RADIANS", "SIGN", - "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM", "SECOND", "MINUTE", - "HOUR", "MONTH", "QUARTER", "YEAR", "DAY", "DOY", "DAY_OF_WEEK", "YEAR_OF_WEEK", - "WEEK") + "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM") override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index a313f627e604e..fa2a45be1852a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -270,17 +270,6 @@ abstract class JdbcDialect extends Serializable with Logging{ s"${this.getClass.getSimpleName} does not support function: TRIM") } } - - override def visitExtract(field: String, source: String): String = { - if (isSupportedFunction(field)) { - super.visitExtract(field, source) - } else { - // The framework will catch the error and give up the push-down. - // Please see `JdbcDialect.compileExpression(expr: Expression)` for more details. - throw new UnsupportedOperationException( - s"${this.getClass.getSimpleName} does not support function: EXTRACT") - } - } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 4604ac587b6b9..41d0984b06c16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -19,9 +19,7 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, DriverManager} import java.util.Properties - import scala.util.control.NonFatal - import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, ExplainSuiteHelper, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow @@ -31,9 +29,10 @@ import org.apache.spark.sql.connector.{IntegralAverage, StrLen} import org.apache.spark.sql.connector.catalog.functions.{ScalarFunction, UnboundFunction} import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, UserDefinedAggregateFunc} +import org.apache.spark.sql.execution.SimpleMode import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog -import org.apache.spark.sql.functions.{abs, acos, asin, atan, atan2, avg, ceil, coalesce, cos, cosh, cot, count, count_distinct, degrees, exp, floor, lit, log => logarithm, log10, not, pow, radians, round, signum, sin, sinh, sqrt, sum, tan, tanh, udf, when} +import org.apache.spark.sql.functions.{abs, acos, asin, atan, atan2, avg, ceil, coalesce, cos, cosh, cot, count, count_distinct, degrees, exp, floor, lit, log10, not, pow, radians, round, signum, sin, sinh, sqrt, sum, tan, tanh, udf, when, log => logarithm} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DataType, IntegerType, StringType} @@ -210,7 +209,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel private def checkPushedInfo(df: DataFrame, expectedPlanFragment: String*): Unit = { df.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => - checkKeywordsExistsInExplain(df, expectedPlanFragment: _*) + checkKeywordsExistsInExplain(df, SimpleMode, expectedPlanFragment: _*) } } @@ -1047,56 +1046,72 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ") checkFiltersRemoved(df7) val expectedPlanFragment7 = - "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DOY FROM DATE1) > 100, " + - "EXTRACT(DAY FROM DATE1) > 10]" + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100, " + + "EXTRACT(DAY FROM DATE1) > 10]" checkPushedInfo(df7, expectedPlanFragment7) checkAnswer(df7, Seq(Row("amy"), Row("alex"))) val df8 = sql("SELECT name FROM h2.test.datetime WHERE " + - "year(date1) = 2022 AND quarter(date1) = 2 AND month(date1) = 5") + "year(date1) = 2022 AND quarter(date1) = 2") checkFiltersRemoved(df8) val expectedPlanFragment8 = "[DATE1 IS NOT NULL, EXTRACT(YEAR FROM DATE1) = 2022, " + - "EXTRACT(QUARTER FROM DATE1) = 2, EXTRACT(MON...," + "EXTRACT(QUARTER FROM DATE1) = 2]" checkPushedInfo(df8, expectedPlanFragment8) checkAnswer(df8, Seq(Row("amy"), Row("alex"))) val df9 = sql("SELECT name FROM h2.test.datetime WHERE " + - "hour(time1) = 0 AND minute(time1) = 0 AND second(time1) = 0") + "second(time1) = 0 AND month(date1) = 5") checkFiltersRemoved(df9) val expectedPlanFragment9 = - "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " + - "EXTRACT(MINUTE FROM TIME1) = 0, EXTRACT(SECOND ..." + "PushedFilters: [TIME1 IS NOT NULL, DATE1 IS NOT NULL, EXTRACT(SECOND FROM TIME1) = 0, " + + "EXTRACT(MONTH FROM DATE1) ..." checkPushedInfo(df9, expectedPlanFragment9) checkAnswer(df9, Seq(Row("amy"), Row("alex"))) val df10 = sql("SELECT name FROM h2.test.datetime WHERE " + - "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") + "hour(time1) = 0 AND minute(time1) = 0") checkFiltersRemoved(df10) val expectedPlanFragment10 = - "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " + - "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]" + "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " + + "EXTRACT(MINUTE FROM TIME1) = 0]" checkPushedInfo(df10, expectedPlanFragment10) - checkAnswer(df10, Seq(Row("alex"), Row("amy"))) + checkAnswer(df10, Seq(Row("amy"), Row("alex"))) - // H2 does not support val df11 = sql("SELECT name FROM h2.test.datetime WHERE " + - "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " + - "AND datediff(date1, '2022-05-10') > 0") - checkFiltersRemoved(df11, false) + "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") + checkFiltersRemoved(df11) val expectedPlanFragment11 = - "PushedFilters: [DATE1 IS NOT NULL]" + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " + + "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]" checkPushedInfo(df11, expectedPlanFragment11) - checkAnswer(df11, Seq(Row("amy"))) + checkAnswer(df11, Seq(Row("alex"), Row("amy"))) + // H2 does not support val df12 = sql("SELECT name FROM h2.test.datetime WHERE " + - "weekday(date1) = 2 AND dayofweek(date1) = 4") - checkFiltersRemoved(df12) + "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " + + "AND datediff(date1, '2022-05-10') > 0") + checkFiltersRemoved(df12, false) val expectedPlanFragment12 = - "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2, " + - "((EXTRACT(DAY_OF_WEEK FROM DATE1) ..." + "PushedFilters: [DATE1 IS NOT NULL]" checkPushedInfo(df12, expectedPlanFragment12) - checkAnswer(df12, Seq(Row("alex"))) + checkAnswer(df12, Seq(Row("amy"))) + + val df13 = sql("SELECT name FROM h2.test.datetime WHERE " + + "weekday(date1) = 2") + checkFiltersRemoved(df13) + val expectedPlanFragment13 = + "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2]" + checkPushedInfo(df13, expectedPlanFragment13) + checkAnswer(df13, Seq(Row("alex"))) + + val df14 = sql("SELECT name FROM h2.test.datetime WHERE " + + "dayofweek(date1) = 4") + checkFiltersRemoved(df14) + val expectedPlanFragment14 = + "PushedFilters: [DATE1 IS NOT NULL, ((EXTRACT(DAY_OF_WEEK FROM DATE1) % 7) + 1) = 4]" + checkPushedInfo(df14, expectedPlanFragment14) + checkAnswer(df14, Seq(Row("alex"))) } } } From 0b13a5e28d0af2d24a55695c8360bb7ee03995e3 Mon Sep 17 00:00:00 2001 From: chenzhx Date: Sat, 2 Jul 2022 10:48:04 +0800 Subject: [PATCH 06/10] Update code --- .../spark/sql/catalyst/util/V2ExpressionBuilder.scala | 6 ++++-- .../test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 7 ++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index f66c80f3bcf47..1b69e7812353f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -377,13 +377,15 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { generateExpression(child).map(v => new V2Extract("QUARTER", v)) case Year(child) => generateExpression(child).map(v => new V2Extract("YEAR", v)) - // translate the DayOfWeek function in Spark using ISO standards + // DayOfWeek uses 1 = Sunday, 2 = Monday, ... and ISO standard is Monday=1, ..., + // so we use the formula ((ISO_standard % 7) + 1) to do translation. case DayOfWeek(child) => generateExpression(child).map(v => new GeneralScalarExpression("+", Array[V2Expression](new GeneralScalarExpression("%", Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(7, IntegerType))), LiteralValue(1, IntegerType)))) - // translate the WeekDay function in Spark using ISO standards + // WeekDay uses 0 = Monday, 1 = Tuesday, ... and ISO standard is Monday=1, ..., + // so we use the formula (ISO_standard - 1) to do translation. case WeekDay(child) => generateExpression(child).map(v => new GeneralScalarExpression("-", Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(1, IntegerType)))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 41d0984b06c16..28ac822fcc8b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, DriverManager} import java.util.Properties + import scala.util.control.NonFatal + import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, ExplainSuiteHelper, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow @@ -29,10 +31,9 @@ import org.apache.spark.sql.connector.{IntegralAverage, StrLen} import org.apache.spark.sql.connector.catalog.functions.{ScalarFunction, UnboundFunction} import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, UserDefinedAggregateFunc} -import org.apache.spark.sql.execution.SimpleMode import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog -import org.apache.spark.sql.functions.{abs, acos, asin, atan, atan2, avg, ceil, coalesce, cos, cosh, cot, count, count_distinct, degrees, exp, floor, lit, log10, not, pow, radians, round, signum, sin, sinh, sqrt, sum, tan, tanh, udf, when, log => logarithm} +import org.apache.spark.sql.functions.{abs, acos, asin, atan, atan2, avg, ceil, coalesce, cos, cosh, cot, count, count_distinct, degrees, exp, floor, lit, log => logarithm, log10, not, pow, radians, round, signum, sin, sinh, sqrt, sum, tan, tanh, udf, when} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DataType, IntegerType, StringType} @@ -209,7 +210,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel private def checkPushedInfo(df: DataFrame, expectedPlanFragment: String*): Unit = { df.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => - checkKeywordsExistsInExplain(df, SimpleMode, expectedPlanFragment: _*) + checkKeywordsExistsInExplain(df, expectedPlanFragment: _*) } } From 0e135cdd992248f1a3e431f87508e1678f2cfdc9 Mon Sep 17 00:00:00 2001 From: chenzhx Date: Sat, 2 Jul 2022 15:17:40 +0800 Subject: [PATCH 07/10] Update code --- .../scala/org/apache/spark/sql/jdbc/H2Dialect.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index ac1de7d5974e9..e693f12f23881 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -140,12 +140,13 @@ private[sql] object H2Dialect extends JdbcDialect { class H2JDBCSQLBuilder extends JDBCSQLBuilder { override def visitExtract(field: String, source: String): String = { - field match { - case "DAY_OF_WEEK" => s"EXTRACT(ISO_DAY_OF_WEEK FROM $source)" - case "WEEK" => s"EXTRACT(ISO_WEEK FROM $source)" - case "YEAR_OF_WEEK" => s"EXTRACT(ISO_WEEK_YEAR FROM $source)" - case _ => super.visitExtract(field, source) + val newField = field match { + case "DAY_OF_WEEK" => "ISO_DAY_OF_WEEK" + case "WEEK" => "ISO_WEEK" + case "YEAR_OF_WEEK" => "ISO_WEEK_YEAR" + case _ => field } + s"EXTRACT($newField FROM $source)" } } } From a3cfef139e8410d3e6fadbd5ec07dbc9252f8471 Mon Sep 17 00:00:00 2001 From: chenzhx Date: Tue, 5 Jul 2022 12:47:24 +0800 Subject: [PATCH 08/10] Update code --- .../catalyst/util/V2ExpressionBuilder.scala | 4 +- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 98 ++++++++++--------- 2 files changed, 55 insertions(+), 47 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index 1b69e7812353f..8bb65a8804471 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -377,14 +377,14 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { generateExpression(child).map(v => new V2Extract("QUARTER", v)) case Year(child) => generateExpression(child).map(v => new V2Extract("YEAR", v)) - // DayOfWeek uses 1 = Sunday, 2 = Monday, ... and ISO standard is Monday=1, ..., + // DayOfWeek uses Sunday = 1, Monday = 2, ... and ISO standard is Monday = 1, ..., // so we use the formula ((ISO_standard % 7) + 1) to do translation. case DayOfWeek(child) => generateExpression(child).map(v => new GeneralScalarExpression("+", Array[V2Expression](new GeneralScalarExpression("%", Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(7, IntegerType))), LiteralValue(1, IntegerType)))) - // WeekDay uses 0 = Monday, 1 = Tuesday, ... and ISO standard is Monday=1, ..., + // WeekDay uses Monday = 0, Tuesday = 1, ... and ISO standard is Monday = 1, ..., // so we use the formula (ISO_standard - 1) to do translation. case WeekDay(child) => generateExpression(child).map(v => new GeneralScalarExpression("-", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 28ac822fcc8b0..e8689857de771 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -1042,77 +1042,85 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } checkPushedInfo(df6, expectedPlanFragment6) checkAnswer(df6, Seq(Row(2, "david", 10000, 1300, true))) + } + } + } - val df7 = sql("SELECT name FROM h2.test.datetime WHERE " + + test("scan with filter push-down with date time functions") { + Seq(false, true).foreach { ansiMode => + withSQLConf(SQLConf.ANSI_ENABLED.key -> ansiMode.toString, + SQLConf.MAX_METADATA_STRING_LENGTH.key -> "200") { + + val df1 = sql("SELECT name FROM h2.test.datetime WHERE " + "dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ") - checkFiltersRemoved(df7) - val expectedPlanFragment7 = + checkFiltersRemoved(df1) + val expectedPlanFragment1 = "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100, " + - "EXTRACT(DAY FROM DATE1) > 10]" - checkPushedInfo(df7, expectedPlanFragment7) - checkAnswer(df7, Seq(Row("amy"), Row("alex"))) + "EXTRACT(DAY FROM DATE1) > 10]" + checkPushedInfo(df1, expectedPlanFragment1) + checkAnswer(df1, Seq(Row("amy"), Row("alex"))) - val df8 = sql("SELECT name FROM h2.test.datetime WHERE " + + val df2 = sql("SELECT name FROM h2.test.datetime WHERE " + "year(date1) = 2022 AND quarter(date1) = 2") - checkFiltersRemoved(df8) - val expectedPlanFragment8 = + checkFiltersRemoved(df2) + val expectedPlanFragment2 = "[DATE1 IS NOT NULL, EXTRACT(YEAR FROM DATE1) = 2022, " + - "EXTRACT(QUARTER FROM DATE1) = 2]" - checkPushedInfo(df8, expectedPlanFragment8) - checkAnswer(df8, Seq(Row("amy"), Row("alex"))) + "EXTRACT(QUARTER FROM DATE1) = 2]" + checkPushedInfo(df2, expectedPlanFragment2) + checkAnswer(df2, Seq(Row("amy"), Row("alex"))) - val df9 = sql("SELECT name FROM h2.test.datetime WHERE " + + val df3 = sql("SELECT name FROM h2.test.datetime WHERE " + "second(time1) = 0 AND month(date1) = 5") - checkFiltersRemoved(df9) - val expectedPlanFragment9 = - "PushedFilters: [TIME1 IS NOT NULL, DATE1 IS NOT NULL, EXTRACT(SECOND FROM TIME1) = 0, " + - "EXTRACT(MONTH FROM DATE1) ..." - checkPushedInfo(df9, expectedPlanFragment9) - checkAnswer(df9, Seq(Row("amy"), Row("alex"))) - - val df10 = sql("SELECT name FROM h2.test.datetime WHERE " + + checkFiltersRemoved(df3) + val expectedPlanFragment3 = + "PushedFilters: [TIME1 IS NOT NULL, DATE1 IS NOT NULL, " + + "EXTRACT(SECOND FROM TIME1) = 0, EXTRACT(MONTH FROM DATE1) = 5]" + checkPushedInfo(df3, expectedPlanFragment3) + checkAnswer(df3, Seq(Row("amy"), Row("alex"))) + + val df4 = sql("SELECT name FROM h2.test.datetime WHERE " + "hour(time1) = 0 AND minute(time1) = 0") - checkFiltersRemoved(df10) - val expectedPlanFragment10 = + checkFiltersRemoved(df4) + val expectedPlanFragment4 = "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " + "EXTRACT(MINUTE FROM TIME1) = 0]" - checkPushedInfo(df10, expectedPlanFragment10) - checkAnswer(df10, Seq(Row("amy"), Row("alex"))) + checkPushedInfo(df4, expectedPlanFragment4) + checkAnswer(df4, Seq(Row("amy"), Row("alex"))) - val df11 = sql("SELECT name FROM h2.test.datetime WHERE " + + val df5 = sql("SELECT name FROM h2.test.datetime WHERE " + "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") - checkFiltersRemoved(df11) - val expectedPlanFragment11 = + checkFiltersRemoved(df5) + val expectedPlanFragment5 = "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " + "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]" - checkPushedInfo(df11, expectedPlanFragment11) - checkAnswer(df11, Seq(Row("alex"), Row("amy"))) + checkPushedInfo(df5, expectedPlanFragment5) + checkAnswer(df5, Seq(Row("alex"), Row("amy"))) // H2 does not support - val df12 = sql("SELECT name FROM h2.test.datetime WHERE " + + val df6 = sql("SELECT name FROM h2.test.datetime WHERE " + "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " + "AND datediff(date1, '2022-05-10') > 0") - checkFiltersRemoved(df12, false) - val expectedPlanFragment12 = + checkFiltersRemoved(df6, false) + val expectedPlanFragment6 = "PushedFilters: [DATE1 IS NOT NULL]" - checkPushedInfo(df12, expectedPlanFragment12) - checkAnswer(df12, Seq(Row("amy"))) + checkPushedInfo(df6, expectedPlanFragment6) + checkAnswer(df6, Seq(Row("amy"))) - val df13 = sql("SELECT name FROM h2.test.datetime WHERE " + + val df7 = sql("SELECT name FROM h2.test.datetime WHERE " + "weekday(date1) = 2") - checkFiltersRemoved(df13) - val expectedPlanFragment13 = + checkFiltersRemoved(df7) + val expectedPlanFragment7 = "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2]" - checkPushedInfo(df13, expectedPlanFragment13) - checkAnswer(df13, Seq(Row("alex"))) + checkPushedInfo(df7, expectedPlanFragment7) + checkAnswer(df7, Seq(Row("alex"))) - val df14 = sql("SELECT name FROM h2.test.datetime WHERE " + + val df8 = sql("SELECT name FROM h2.test.datetime WHERE " + "dayofweek(date1) = 4") - checkFiltersRemoved(df14) - val expectedPlanFragment14 = + checkFiltersRemoved(df8) + val expectedPlanFragment8 = "PushedFilters: [DATE1 IS NOT NULL, ((EXTRACT(DAY_OF_WEEK FROM DATE1) % 7) + 1) = 4]" - checkPushedInfo(df14, expectedPlanFragment14) - checkAnswer(df14, Seq(Row("alex"))) + checkPushedInfo(df8, expectedPlanFragment8) + checkAnswer(df8, Seq(Row("alex"))) } } } From ea0e7c2649aa489fd3daa3f8e57158718b54a19c Mon Sep 17 00:00:00 2001 From: chenzhx Date: Tue, 5 Jul 2022 14:04:21 +0800 Subject: [PATCH 09/10] Update code --- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 146 +++++++++--------- 1 file changed, 71 insertions(+), 75 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index e8689857de771..01ee8b3fc4b20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -1047,81 +1047,77 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } test("scan with filter push-down with date time functions") { - Seq(false, true).foreach { ansiMode => - withSQLConf(SQLConf.ANSI_ENABLED.key -> ansiMode.toString, - SQLConf.MAX_METADATA_STRING_LENGTH.key -> "200") { - - val df1 = sql("SELECT name FROM h2.test.datetime WHERE " + - "dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ") - checkFiltersRemoved(df1) - val expectedPlanFragment1 = - "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100, " + - "EXTRACT(DAY FROM DATE1) > 10]" - checkPushedInfo(df1, expectedPlanFragment1) - checkAnswer(df1, Seq(Row("amy"), Row("alex"))) - - val df2 = sql("SELECT name FROM h2.test.datetime WHERE " + - "year(date1) = 2022 AND quarter(date1) = 2") - checkFiltersRemoved(df2) - val expectedPlanFragment2 = - "[DATE1 IS NOT NULL, EXTRACT(YEAR FROM DATE1) = 2022, " + - "EXTRACT(QUARTER FROM DATE1) = 2]" - checkPushedInfo(df2, expectedPlanFragment2) - checkAnswer(df2, Seq(Row("amy"), Row("alex"))) - - val df3 = sql("SELECT name FROM h2.test.datetime WHERE " + - "second(time1) = 0 AND month(date1) = 5") - checkFiltersRemoved(df3) - val expectedPlanFragment3 = - "PushedFilters: [TIME1 IS NOT NULL, DATE1 IS NOT NULL, " + - "EXTRACT(SECOND FROM TIME1) = 0, EXTRACT(MONTH FROM DATE1) = 5]" - checkPushedInfo(df3, expectedPlanFragment3) - checkAnswer(df3, Seq(Row("amy"), Row("alex"))) - - val df4 = sql("SELECT name FROM h2.test.datetime WHERE " + - "hour(time1) = 0 AND minute(time1) = 0") - checkFiltersRemoved(df4) - val expectedPlanFragment4 = - "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " + - "EXTRACT(MINUTE FROM TIME1) = 0]" - checkPushedInfo(df4, expectedPlanFragment4) - checkAnswer(df4, Seq(Row("amy"), Row("alex"))) - - val df5 = sql("SELECT name FROM h2.test.datetime WHERE " + - "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") - checkFiltersRemoved(df5) - val expectedPlanFragment5 = - "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " + - "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]" - checkPushedInfo(df5, expectedPlanFragment5) - checkAnswer(df5, Seq(Row("alex"), Row("amy"))) - - // H2 does not support - val df6 = sql("SELECT name FROM h2.test.datetime WHERE " + - "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " + - "AND datediff(date1, '2022-05-10') > 0") - checkFiltersRemoved(df6, false) - val expectedPlanFragment6 = - "PushedFilters: [DATE1 IS NOT NULL]" - checkPushedInfo(df6, expectedPlanFragment6) - checkAnswer(df6, Seq(Row("amy"))) - - val df7 = sql("SELECT name FROM h2.test.datetime WHERE " + - "weekday(date1) = 2") - checkFiltersRemoved(df7) - val expectedPlanFragment7 = - "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2]" - checkPushedInfo(df7, expectedPlanFragment7) - checkAnswer(df7, Seq(Row("alex"))) - - val df8 = sql("SELECT name FROM h2.test.datetime WHERE " + - "dayofweek(date1) = 4") - checkFiltersRemoved(df8) - val expectedPlanFragment8 = - "PushedFilters: [DATE1 IS NOT NULL, ((EXTRACT(DAY_OF_WEEK FROM DATE1) % 7) + 1) = 4]" - checkPushedInfo(df8, expectedPlanFragment8) - checkAnswer(df8, Seq(Row("alex"))) - } + withSQLConf(SQLConf.MAX_METADATA_STRING_LENGTH.key -> "200") { + val df1 = sql("SELECT name FROM h2.test.datetime WHERE " + + "dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ") + checkFiltersRemoved(df1) + val expectedPlanFragment1 = + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100, " + + "EXTRACT(DAY FROM DATE1) > 10]" + checkPushedInfo(df1, expectedPlanFragment1) + checkAnswer(df1, Seq(Row("amy"), Row("alex"))) + + val df2 = sql("SELECT name FROM h2.test.datetime WHERE " + + "year(date1) = 2022 AND quarter(date1) = 2") + checkFiltersRemoved(df2) + val expectedPlanFragment2 = + "[DATE1 IS NOT NULL, EXTRACT(YEAR FROM DATE1) = 2022, " + + "EXTRACT(QUARTER FROM DATE1) = 2]" + checkPushedInfo(df2, expectedPlanFragment2) + checkAnswer(df2, Seq(Row("amy"), Row("alex"))) + + val df3 = sql("SELECT name FROM h2.test.datetime WHERE " + + "second(time1) = 0 AND month(date1) = 5") + checkFiltersRemoved(df3) + val expectedPlanFragment3 = + "PushedFilters: [TIME1 IS NOT NULL, DATE1 IS NOT NULL, " + + "EXTRACT(SECOND FROM TIME1) = 0, EXTRACT(MONTH FROM DATE1) = 5]" + checkPushedInfo(df3, expectedPlanFragment3) + checkAnswer(df3, Seq(Row("amy"), Row("alex"))) + + val df4 = sql("SELECT name FROM h2.test.datetime WHERE " + + "hour(time1) = 0 AND minute(time1) = 0") + checkFiltersRemoved(df4) + val expectedPlanFragment4 = + "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " + + "EXTRACT(MINUTE FROM TIME1) = 0]" + checkPushedInfo(df4, expectedPlanFragment4) + checkAnswer(df4, Seq(Row("amy"), Row("alex"))) + + val df5 = sql("SELECT name FROM h2.test.datetime WHERE " + + "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") + checkFiltersRemoved(df5) + val expectedPlanFragment5 = + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " + + "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]" + checkPushedInfo(df5, expectedPlanFragment5) + checkAnswer(df5, Seq(Row("alex"), Row("amy"))) + + // H2 does not support + val df6 = sql("SELECT name FROM h2.test.datetime WHERE " + + "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " + + "AND datediff(date1, '2022-05-10') > 0") + checkFiltersRemoved(df6, false) + val expectedPlanFragment6 = + "PushedFilters: [DATE1 IS NOT NULL]" + checkPushedInfo(df6, expectedPlanFragment6) + checkAnswer(df6, Seq(Row("amy"))) + + val df7 = sql("SELECT name FROM h2.test.datetime WHERE " + + "weekday(date1) = 2") + checkFiltersRemoved(df7) + val expectedPlanFragment7 = + "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2]" + checkPushedInfo(df7, expectedPlanFragment7) + checkAnswer(df7, Seq(Row("alex"))) + + val df8 = sql("SELECT name FROM h2.test.datetime WHERE " + + "dayofweek(date1) = 4") + checkFiltersRemoved(df8) + val expectedPlanFragment8 = + "PushedFilters: [DATE1 IS NOT NULL, ((EXTRACT(DAY_OF_WEEK FROM DATE1) % 7) + 1) = 4]" + checkPushedInfo(df8, expectedPlanFragment8) + checkAnswer(df8, Seq(Row("alex"))) } } From 2600d7f6717e616e5ff22eb777cddc748e855f7f Mon Sep 17 00:00:00 2001 From: chenzhx Date: Tue, 5 Jul 2022 15:37:30 +0800 Subject: [PATCH 10/10] Update code --- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 200 ++++++++++-------- 1 file changed, 109 insertions(+), 91 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 01ee8b3fc4b20..c6e6d239dfd9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -208,9 +208,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } private def checkPushedInfo(df: DataFrame, expectedPlanFragment: String*): Unit = { - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - checkKeywordsExistsInExplain(df, expectedPlanFragment: _*) + withSQLConf(SQLConf.MAX_METADATA_STRING_LENGTH.key -> "1000") { + df.queryExecution.optimizedPlan.collect { + case _: DataSourceV2ScanRelation => + checkKeywordsExistsInExplain(df, expectedPlanFragment: _*) + } } } @@ -753,8 +755,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkSortRemoved(df9) checkLimitRemoved(df9) checkPushedInfo(df9, "PushedFilters: [], " + - "PushedTopN: ORDER BY [CASE WHEN (SALARY > 8000.00) AND " + - "(SALARY < 10000.00) THEN SALARY ELSE 0.00 END ASC NULL..., ") + "PushedTopN: " + + "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + + "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,") checkAnswer(df9, Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0))) @@ -771,8 +774,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkSortRemoved(df10, false) checkLimitRemoved(df10, false) checkPushedInfo(df10, "PushedFilters: [], " + - "PushedTopN: ORDER BY [CASE WHEN (SALARY > 8000.00) AND " + - "(SALARY < 10000.00) THEN SALARY ELSE 0.00 END ASC NULL..., ") + "PushedTopN: " + + "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + + "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,") checkAnswer(df10, Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0))) } @@ -889,8 +893,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .filter(floor($"bonus") === 1200) .filter(ceil($"bonus") === 1200) checkFiltersRemoved(df13) - checkPushedInfo(df13, "PushedFilters: [BONUS IS NOT NULL, LN(BONUS) > 7.0, EXP(BONUS) > 0.0, " + - "(POWER(BONUS, 2.0)) = 1440000.0, SQRT(BONU...,") + checkPushedInfo(df13, "PushedFilters: " + + "[BONUS IS NOT NULL, LN(BONUS) > 7.0, EXP(BONUS) > 0.0, (POWER(BONUS, 2.0)) = 1440000.0, " + + "SQRT(BONUS) > 34.0, FLOOR(BONUS) = 1200, CEIL(BONUS) = 1200],") checkAnswer(df13, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true))) @@ -912,8 +917,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .filter(radians($"bonus") > 20) .filter(signum($"bonus") === 1) checkFiltersRemoved(df15) - checkPushedInfo(df15, "PushedFilters: [BONUS IS NOT NULL, (LOG(2.0, BONUS)) > 10.0, " + - "LOG10(BONUS) > 3.0, (ROUND(BONUS, 0)) = 1200.0, DEG...,") + checkPushedInfo(df15, "PushedFilters: " + + "[BONUS IS NOT NULL, (LOG(2.0, BONUS)) > 10.0, LOG10(BONUS) > 3.0, " + + "(ROUND(BONUS, 0)) = 1200.0, DEGREES(BONUS) > 68754.0, RADIANS(BONUS) > 20.0, " + + "SIGN(BONUS) = 1.0],") checkAnswer(df15, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true))) @@ -930,8 +937,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .filter(atan($"bonus") > 1.4) .filter(atan2($"bonus", $"bonus") > 0.7) checkFiltersRemoved(df16) - checkPushedInfo(df16, "PushedFilters: [BONUS IS NOT NULL, SIN(BONUS) < -0.08, " + - "SINH(BONUS) > 200.0, COS(BONUS) > 0.9, COSH(BONUS) > 200....,") + checkPushedInfo(df16, "PushedFilters: [" + + "BONUS IS NOT NULL, SIN(BONUS) < -0.08, SINH(BONUS) > 200.0, COS(BONUS) > 0.9, " + + "COSH(BONUS) > 200.0, TAN(BONUS) < -0.08, TANH(BONUS) = 1.0, COT(BONUS) < -11.0, " + + "ASIN(BONUS) > 0.1, ACOS(BONUS) > 1.4, ATAN(BONUS) > 1.4, (ATAN2(BONUS, BONUS)) > 0.7],") checkAnswer(df16, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true))) @@ -1036,7 +1045,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkFiltersRemoved(df6, ansiMode) val expectedPlanFragment6 = if (ansiMode) { "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL, " + - "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, ...," + "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, " + + "CAST(DEPT AS short) > 1, CAST(BONUS AS decimal(20,2)) > 1200.00]" } else { "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL]," } @@ -1047,78 +1057,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } test("scan with filter push-down with date time functions") { - withSQLConf(SQLConf.MAX_METADATA_STRING_LENGTH.key -> "200") { - val df1 = sql("SELECT name FROM h2.test.datetime WHERE " + - "dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ") - checkFiltersRemoved(df1) - val expectedPlanFragment1 = - "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100, " + - "EXTRACT(DAY FROM DATE1) > 10]" - checkPushedInfo(df1, expectedPlanFragment1) - checkAnswer(df1, Seq(Row("amy"), Row("alex"))) - - val df2 = sql("SELECT name FROM h2.test.datetime WHERE " + - "year(date1) = 2022 AND quarter(date1) = 2") - checkFiltersRemoved(df2) - val expectedPlanFragment2 = - "[DATE1 IS NOT NULL, EXTRACT(YEAR FROM DATE1) = 2022, " + - "EXTRACT(QUARTER FROM DATE1) = 2]" - checkPushedInfo(df2, expectedPlanFragment2) - checkAnswer(df2, Seq(Row("amy"), Row("alex"))) - - val df3 = sql("SELECT name FROM h2.test.datetime WHERE " + - "second(time1) = 0 AND month(date1) = 5") - checkFiltersRemoved(df3) - val expectedPlanFragment3 = - "PushedFilters: [TIME1 IS NOT NULL, DATE1 IS NOT NULL, " + - "EXTRACT(SECOND FROM TIME1) = 0, EXTRACT(MONTH FROM DATE1) = 5]" - checkPushedInfo(df3, expectedPlanFragment3) - checkAnswer(df3, Seq(Row("amy"), Row("alex"))) - - val df4 = sql("SELECT name FROM h2.test.datetime WHERE " + - "hour(time1) = 0 AND minute(time1) = 0") - checkFiltersRemoved(df4) - val expectedPlanFragment4 = - "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " + - "EXTRACT(MINUTE FROM TIME1) = 0]" - checkPushedInfo(df4, expectedPlanFragment4) - checkAnswer(df4, Seq(Row("amy"), Row("alex"))) - - val df5 = sql("SELECT name FROM h2.test.datetime WHERE " + - "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") - checkFiltersRemoved(df5) - val expectedPlanFragment5 = - "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " + - "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]" - checkPushedInfo(df5, expectedPlanFragment5) - checkAnswer(df5, Seq(Row("alex"), Row("amy"))) - - // H2 does not support - val df6 = sql("SELECT name FROM h2.test.datetime WHERE " + - "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " + - "AND datediff(date1, '2022-05-10') > 0") - checkFiltersRemoved(df6, false) - val expectedPlanFragment6 = - "PushedFilters: [DATE1 IS NOT NULL]" - checkPushedInfo(df6, expectedPlanFragment6) - checkAnswer(df6, Seq(Row("amy"))) - - val df7 = sql("SELECT name FROM h2.test.datetime WHERE " + - "weekday(date1) = 2") - checkFiltersRemoved(df7) - val expectedPlanFragment7 = - "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2]" - checkPushedInfo(df7, expectedPlanFragment7) - checkAnswer(df7, Seq(Row("alex"))) - - val df8 = sql("SELECT name FROM h2.test.datetime WHERE " + - "dayofweek(date1) = 4") - checkFiltersRemoved(df8) - val expectedPlanFragment8 = - "PushedFilters: [DATE1 IS NOT NULL, ((EXTRACT(DAY_OF_WEEK FROM DATE1) % 7) + 1) = 4]" - checkPushedInfo(df8, expectedPlanFragment8) - checkAnswer(df8, Seq(Row("alex"))) - } + val df1 = sql("SELECT name FROM h2.test.datetime WHERE " + + "dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ") + checkFiltersRemoved(df1) + val expectedPlanFragment1 = + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100, " + + "EXTRACT(DAY FROM DATE1) > 10]" + checkPushedInfo(df1, expectedPlanFragment1) + checkAnswer(df1, Seq(Row("amy"), Row("alex"))) + + val df2 = sql("SELECT name FROM h2.test.datetime WHERE " + + "year(date1) = 2022 AND quarter(date1) = 2") + checkFiltersRemoved(df2) + val expectedPlanFragment2 = + "[DATE1 IS NOT NULL, EXTRACT(YEAR FROM DATE1) = 2022, " + + "EXTRACT(QUARTER FROM DATE1) = 2]" + checkPushedInfo(df2, expectedPlanFragment2) + checkAnswer(df2, Seq(Row("amy"), Row("alex"))) + + val df3 = sql("SELECT name FROM h2.test.datetime WHERE " + + "second(time1) = 0 AND month(date1) = 5") + checkFiltersRemoved(df3) + val expectedPlanFragment3 = + "PushedFilters: [TIME1 IS NOT NULL, DATE1 IS NOT NULL, " + + "EXTRACT(SECOND FROM TIME1) = 0, EXTRACT(MONTH FROM DATE1) = 5]" + checkPushedInfo(df3, expectedPlanFragment3) + checkAnswer(df3, Seq(Row("amy"), Row("alex"))) + + val df4 = sql("SELECT name FROM h2.test.datetime WHERE " + + "hour(time1) = 0 AND minute(time1) = 0") + checkFiltersRemoved(df4) + val expectedPlanFragment4 = + "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " + + "EXTRACT(MINUTE FROM TIME1) = 0]" + checkPushedInfo(df4, expectedPlanFragment4) + checkAnswer(df4, Seq(Row("amy"), Row("alex"))) + + val df5 = sql("SELECT name FROM h2.test.datetime WHERE " + + "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") + checkFiltersRemoved(df5) + val expectedPlanFragment5 = + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " + + "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]" + checkPushedInfo(df5, expectedPlanFragment5) + checkAnswer(df5, Seq(Row("alex"), Row("amy"))) + + // H2 does not support + val df6 = sql("SELECT name FROM h2.test.datetime WHERE " + + "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " + + "AND datediff(date1, '2022-05-10') > 0") + checkFiltersRemoved(df6, false) + val expectedPlanFragment6 = + "PushedFilters: [DATE1 IS NOT NULL]" + checkPushedInfo(df6, expectedPlanFragment6) + checkAnswer(df6, Seq(Row("amy"))) + + val df7 = sql("SELECT name FROM h2.test.datetime WHERE " + + "weekday(date1) = 2") + checkFiltersRemoved(df7) + val expectedPlanFragment7 = + "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2]" + checkPushedInfo(df7, expectedPlanFragment7) + checkAnswer(df7, Seq(Row("alex"))) + + val df8 = sql("SELECT name FROM h2.test.datetime WHERE " + + "dayofweek(date1) = 4") + checkFiltersRemoved(df8) + val expectedPlanFragment8 = + "PushedFilters: [DATE1 IS NOT NULL, ((EXTRACT(DAY_OF_WEEK FROM DATE1) % 7) + 1) = 4]" + checkPushedInfo(df8, expectedPlanFragment8) + checkAnswer(df8, Seq(Row("alex"))) } test("scan with filter push-down with UDF") { @@ -1298,7 +1306,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkFiltersRemoved(df2) val expectedPlanFragment2 = "PushedFilters: [NAME IS NOT NULL, TRIM(BOTH FROM NAME) = 'jen', " + - "(TRIM(BOTH 'j' FROM NAME)) = 'en', (TRANSLATE(NA..." + "(TRIM(BOTH 'j' FROM NAME)) = 'en', (TRANSLATE(NAME, 'e', '1')) = 'j1n']" checkPushedInfo(df2, expectedPlanFragment2) checkAnswer(df2, Seq(Row(6, "jen", 12000, 1200, true))) @@ -1814,10 +1822,20 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel """.stripMargin) checkAggregateRemoved(df) checkPushedInfo(df, - "PushedAggregates: [COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00)" + - " THEN SALARY ELSE 0.00 END), COUNT(CAS..., " + - "PushedFilters: [], " + - "PushedGroupByExpressions: [DEPT], ") + "PushedAggregates: " + + "[COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END), " + + "COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY <= 13000.00) THEN SALARY ELSE 0.00 END), " + + "COUNT(CASE WHEN (SALARY > 11000.00) OR (SALARY < 10000.00) THEN SALARY ELSE 0.00 END), " + + "COUNT(CASE WHEN (SALARY >= 12000.00) OR (SALARY < 9000.00) THEN SALARY ELSE 0.00 END), " + + "MAX(CASE WHEN (SALARY <= 10000.00) AND (SALARY >= 8000.00) THEN SALARY ELSE 0.00 END), " + + "MAX(CASE WHEN (SALARY <= 9000.00) OR (SALARY > 10000.00) THEN SALARY ELSE 0.00 END), " + + "MAX(CASE WHEN (SALARY = 0.00) OR (SALARY >= 8000.00) THEN SALARY ELSE 0.00 END), " + + "MAX(CASE WHEN (SALARY <= 8000.00) OR (SALARY >= 10000.00) THEN 0.00 ELSE SALARY END), " + + "MIN(CASE WHEN (SALARY <= 8000.00) AND (SALARY IS NOT NULL) THEN SALARY ELSE 0.00 END), " + + "SUM(CASE WHEN SALARY > 10000.00 THEN 2 WHEN SALARY > 8000.00 THEN 1 END), " + + "AVG(CASE WHEN (SALARY <= 8000.00) AND (SALARY IS NULL) THEN SALARY ELSE 0.00 END)], " + + "PushedFilters: [], " + + "PushedGroupByExpressions: [DEPT],") checkAnswer(df, Seq(Row(1, 1, 1, 1, 1, 0d, 12000d, 0d, 12000d, 0d, 0d, 2, 0d), Row(2, 2, 2, 2, 2, 10000d, 12000d, 10000d, 12000d, 0d, 0d, 3, 0d), Row(2, 2, 2, 2, 2, 10000d, 9000d, 10000d, 10000d, 9000d, 0d, 2, 0d)))