From 70cdc0e6efbc596bebe0f5d4d982591ae3b8693d Mon Sep 17 00:00:00 2001 From: andrej-db Date: Wed, 18 Sep 2024 12:18:13 +0200 Subject: [PATCH 01/16] PostgresDialect: add PostgresSqlBuilder, add override for xor push-down --- .../spark/sql/jdbc/PostgresDialect.scala | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 03fefd82802e..5fb31d8bd07d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -23,6 +23,7 @@ import java.util import java.util.Locale import scala.util.Using +import scala.util.control.NonFatal import org.apache.spark.internal.LogKeys.COLUMN_NAME import org.apache.spark.internal.MDC @@ -30,7 +31,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException} import org.apache.spark.sql.connector.catalog.Identifier -import org.apache.spark.sql.connector.expressions.NamedReference +import org.apache.spark.sql.connector.expressions.{Expression, GeneralScalarExpression, NamedReference} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo @@ -379,4 +380,26 @@ private case class PostgresDialect() case _ => } } + + override def compileExpression(expr: Expression): Option[String] = { + val builder = new PostgresSQLBuilder() + try { + Some(builder.build(expr)) + } catch { + case NonFatal(e) => + logWarning("Error occurs while compiling V2 expression", e) + None + } + } + + private class PostgresSQLBuilder extends JDBCSQLBuilder { + override def build(expr: Expression): String = { + expr match { + // Postgres uses '#' for xor, rather then '^'. + case e: GeneralScalarExpression if e.name() == "^" => + visitBinaryArithmetic("#", inputToSQL(e.children().head), inputToSQL(e.children()(1))) + case _ => super.build(expr) + } + } + } } From 7e815768f41fce090b545db601a0cf96af814d30 Mon Sep 17 00:00:00 2001 From: andrej-db Date: Sun, 22 Sep 2024 16:34:26 +0200 Subject: [PATCH 02/16] V2JDBCTest: add test --- .../scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 54635f69f8b6..386cba7c54b5 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -986,4 +986,13 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu test("scan with filter push-down with date time functions") { testDatetime(s"$catalogAndNamespace.${caseConvert("datetime")}") } + + test("xor operator push-down") { + val df1 = spark.sql( + s"""SELECT * FROM $catalogAndNamespace.${caseConvert("pattern_testing_table")} + |WHERE id ^ 3 = 0""".stripMargin) + val rows1 = df1.collect() + assert(rows1.length === 1) + assert(rows1(0).getInt(0) === 3) + } } From 02cf3d341bc84805be94cb758bf6d9edd9b438a1 Mon Sep 17 00:00:00 2001 From: andrej-db Date: Mon, 23 Sep 2024 15:38:13 +0200 Subject: [PATCH 03/16] V2JDBCTest: remove test PostgresIntegrationSuite: add test --- .../apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 8 ++++++++ .../scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 9 --------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 3076b599ef4e..07500ef046dc 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -629,4 +629,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { checkAnswer(df6, Row(LocalDateTime.of(2018, 11, 17, 13, 33, 33))) } } + + test("SPARK-49695: Postgres fix xor push-down") { + val df = spark.sql("select c0, c1 from bar where c1 ^ 42 = 0") + val rows = df.collect() + assert(rows.length == 1) + assert(rows(0).getInt(2) === 42) + assert(rows(0).getString(1) === "hello") + } } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 386cba7c54b5..54635f69f8b6 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -986,13 +986,4 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu test("scan with filter push-down with date time functions") { testDatetime(s"$catalogAndNamespace.${caseConvert("datetime")}") } - - test("xor operator push-down") { - val df1 = spark.sql( - s"""SELECT * FROM $catalogAndNamespace.${caseConvert("pattern_testing_table")} - |WHERE id ^ 3 = 0""".stripMargin) - val rows1 = df1.collect() - assert(rows1.length === 1) - assert(rows1(0).getInt(0) === 3) - } } From 6df7f8174a203c09e624e89d4c5dc44992f8aa82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Gobelji=C4=87?= Date: Thu, 24 Oct 2024 00:37:28 +0200 Subject: [PATCH 04/16] Update PostgresIntegrationSuite.scala --- .../apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 07500ef046dc..3076b599ef4e 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -629,12 +629,4 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { checkAnswer(df6, Row(LocalDateTime.of(2018, 11, 17, 13, 33, 33))) } } - - test("SPARK-49695: Postgres fix xor push-down") { - val df = spark.sql("select c0, c1 from bar where c1 ^ 42 = 0") - val rows = df.collect() - assert(rows.length == 1) - assert(rows(0).getInt(2) === 42) - assert(rows(0).getString(1) === "hello") - } } From f2982aef46899fa1f1598aa2b9d199e1cd03e881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Gobelji=C4=87?= Date: Thu, 24 Oct 2024 00:39:18 +0200 Subject: [PATCH 05/16] Update PostgresIntegrationSuite.scala --- .../spark/sql/jdbc/v2/PostgresIntegrationSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 850391e8dc33..436964c06fd1 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -123,4 +123,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT ) } } + + test("SPARK-49695: Postgres fix xor push-down") { + val df = spark.sql("select id, name from employee where id ^ 6 = 0") + val rows = df.collect() + assert(rows.length == 1) + assert(rows(0).getInt(1) === 6) + assert(rows(0).getString(2) === "jen") + } } From f7aadd75f5c7dad58ba2a1c1bc474eb41cf7cd5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Gobelji=C4=87?= Date: Thu, 24 Oct 2024 10:23:38 +0200 Subject: [PATCH 06/16] Update PostgresIntegrationSuite.scala From 6d18ec63cf020b8683fd93874f7dbcfc6181fca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Gobelji=C4=87?= Date: Thu, 24 Oct 2024 11:54:42 +0200 Subject: [PATCH 07/16] Update PostgresIntegrationSuite.scala --- .../apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 58cc7a784a22..d9c2856a730b 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -233,6 +233,9 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT test("SPARK-49695: Postgres fix xor push-down") { val df = spark.sql("select id, name from employee where id ^ 6 = 0") val rows = df.collect() + assert(!df.queryExecution.optimizedPlan.exists { plan => + plan.isInstanceOf[Filter] + }) assert(rows.length == 1) assert(rows(0).getInt(1) === 6) assert(rows(0).getString(2) === "jen") From 7b956592f66755157822b31e0f9d8cfc3b53f42c Mon Sep 17 00:00:00 2001 From: andrej-db Date: Thu, 24 Oct 2024 12:17:07 +0200 Subject: [PATCH 08/16] PostgresIntegrationSuite: override --- .../spark/sql/jdbc/PostgresDialect.scala | 26 +++---------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index ebf61b184985..dca03978c253 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -310,6 +310,10 @@ private case class PostgresDialect() case _ => super.visitExtract(field, source) } } + + override def visitBinaryArithmetic(name: String, l: String, r: String): String = { + l + " " + name.replace('^', '#') + " " + r + } } override def compileExpression(expr: Expression): Option[String] = { @@ -404,26 +408,4 @@ private case class PostgresDialect() case _ => } } - - override def compileExpression(expr: Expression): Option[String] = { - val builder = new PostgresSQLBuilder() - try { - Some(builder.build(expr)) - } catch { - case NonFatal(e) => - logWarning("Error occurs while compiling V2 expression", e) - None - } - } - - private class PostgresSQLBuilder extends JDBCSQLBuilder { - override def build(expr: Expression): String = { - expr match { - // Postgres uses '#' for xor, rather then '^'. - case e: GeneralScalarExpression if e.name() == "^" => - visitBinaryArithmetic("#", inputToSQL(e.children().head), inputToSQL(e.children()(1))) - case _ => super.build(expr) - } - } - } } From 485a48d5cce2f44c94fed659e1f51e6b064fd189 Mon Sep 17 00:00:00 2001 From: andrej-db Date: Thu, 24 Oct 2024 12:17:32 +0200 Subject: [PATCH 09/16] PostgresIntegrationSuite: import --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index dca03978c253..91750917f92e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException} import org.apache.spark.sql.connector.catalog.Identifier -import org.apache.spark.sql.connector.expressions.{Expression, GeneralScalarExpression, NamedReference} +import org.apache.spark.sql.connector.expressions.{Expression, NamedReference} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo From 309c130a46f17f7f5927751ead620d22f7b080a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Gobelji=C4=87?= Date: Mon, 25 Nov 2024 14:44:06 +0100 Subject: [PATCH 10/16] Update PostgresIntegrationSuite.scala --- .../spark/sql/jdbc/v2/PostgresIntegrationSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index d9c2856a730b..684e56dc9143 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -22,6 +22,7 @@ import java.sql.Connection import org.apache.spark.{SparkConf, SparkSQLException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.DatabaseOnDocker import org.apache.spark.sql.types._ @@ -233,9 +234,9 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT test("SPARK-49695: Postgres fix xor push-down") { val df = spark.sql("select id, name from employee where id ^ 6 = 0") val rows = df.collect() - assert(!df.queryExecution.optimizedPlan.exists { plan => - plan.isInstanceOf[Filter] - }) + assert(df.queryExecution.sparkPlan.collectFirst { + case f: FilterExec => f + }.isEmpty) assert(rows.length == 1) assert(rows(0).getInt(1) === 6) assert(rows(0).getString(2) === "jen") From dc784659a8eac6ef08112f1734df5a43edfe0435 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Gobelji=C4=87?= Date: Mon, 25 Nov 2024 18:59:11 +0100 Subject: [PATCH 11/16] Update PostgresIntegrationSuite.scala --- .../org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 684e56dc9143..68ef09ce1755 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -232,7 +232,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT } test("SPARK-49695: Postgres fix xor push-down") { - val df = spark.sql("select id, name from employee where id ^ 6 = 0") + val df = spark.sql(s"select id, name from $catalogAndNamespace.employee where id ^ 6 = 0") val rows = df.collect() assert(df.queryExecution.sparkPlan.collectFirst { case f: FilterExec => f From 823cf7c9a56396e9cce1f8c5d5fa6efc24b206fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Gobelji=C4=87?= Date: Tue, 26 Nov 2024 10:53:52 +0100 Subject: [PATCH 12/16] Update PostgresIntegrationSuite.scala --- .../org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 07e0cc1910d2..73b139f7534e 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -254,7 +254,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT } test("SPARK-49695: Postgres fix xor push-down") { - val df = spark.sql(s"select id, name from $catalogAndNamespace.employee where id ^ 6 = 0") + val df = spark.sql(s"select id, name from $catalogName.employee where id ^ 6 = 0") val rows = df.collect() assert(df.queryExecution.sparkPlan.collectFirst { case f: FilterExec => f From 9a9a6a28eb2b2498cf3c2a20f11dc184c2523506 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Gobelji=C4=87?= Date: Tue, 3 Dec 2024 11:55:32 +0100 Subject: [PATCH 13/16] Update PostgresIntegrationSuite.scala --- .../org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 73b139f7534e..44bed64e2fa4 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -254,7 +254,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT } test("SPARK-49695: Postgres fix xor push-down") { - val df = spark.sql(s"select id, name from $catalogName.employee where id ^ 6 = 0") + val df = spark.sql(s"select dept, name from $catalogName.employee where dept ^ 6 = 0") val rows = df.collect() assert(df.queryExecution.sparkPlan.collectFirst { case f: FilterExec => f From b843186053b7e82f16a307008794912884f7160b Mon Sep 17 00:00:00 2001 From: andrej-gobeljic_data Date: Tue, 3 Dec 2024 17:05:17 +0100 Subject: [PATCH 14/16] fix --- .../apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 44bed64e2fa4..89a3e1aa5e32 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -260,8 +260,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT case f: FilterExec => f }.isEmpty) assert(rows.length == 1) - assert(rows(0).getInt(1) === 6) - assert(rows(0).getString(2) === "jen") + assert(rows(0).getInt(0) === 6) + assert(rows(0).getString(1) === "jen") } override def testDatetime(tbl: String): Unit = { From 574b7fcc85355048d6bb8e957c51724c03889c11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Gobelji=C4=87?= Date: Wed, 4 Dec 2024 11:22:49 +0100 Subject: [PATCH 15/16] Update PostgresIntegrationSuite.scala From cdf372445f8466ad985e23fd5ffba947283076c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Gobelji=C4=87?= Date: Wed, 4 Dec 2024 17:07:10 +0100 Subject: [PATCH 16/16] Update connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala Co-authored-by: Maxim Gekk --- .../apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 89a3e1aa5e32..fdd4a72bbbcf 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -256,9 +256,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT test("SPARK-49695: Postgres fix xor push-down") { val df = spark.sql(s"select dept, name from $catalogName.employee where dept ^ 6 = 0") val rows = df.collect() - assert(df.queryExecution.sparkPlan.collectFirst { - case f: FilterExec => f - }.isEmpty) + assert(!df.queryExecution.sparkPlan.exists(_.isInstanceOf[FilterExec])) assert(rows.length == 1) assert(rows(0).getInt(0) === 6) assert(rows(0).getString(1) === "jen")