Skip to content

Commit 4248397

Browse files
andrej-dbMaxGekk
authored andcommitted
[SPARK-49695][SQL] Postgres fix xor push-down
### What changes were proposed in this pull request? This PR fixes the pushdown of ^ operator (XOR operator) for Postgres. Those two databases use this as exponent, rather then bitwise xor. Fix is consisted of overriding the SQLExpressionBuilder to replace the '^' character with '#'. ### Why are the changes needed? Result is incorrect. ### Does this PR introduce _any_ user-facing change? Yes. The user will now have a proper translation of the ^ operator. ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#48144 from andrej-db/SPARK-49695-PostgresXOR. Lead-authored-by: Andrej Gobeljić <[email protected]> Co-authored-by: andrej-db <[email protected]> Co-authored-by: andrej-gobeljic_data <[email protected]> Signed-off-by: Max Gekk <[email protected]>
1 parent f1eecd3 commit 4248397

File tree

2 files changed

+14
-0
lines changed

2 files changed

+14
-0
lines changed

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.sql.Connection
2222
import org.apache.spark.{SparkConf, SparkSQLException}
2323
import org.apache.spark.sql.AnalysisException
2424
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
25+
import org.apache.spark.sql.execution.FilterExec
2526
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
2627
import org.apache.spark.sql.jdbc.PostgresDatabaseOnDocker
2728
import org.apache.spark.sql.types._
@@ -243,6 +244,15 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
243244
}
244245
}
245246

247+
test("SPARK-49695: Postgres fix xor push-down") {
248+
val df = spark.sql(s"select dept, name from $catalogName.employee where dept ^ 6 = 0")
249+
val rows = df.collect()
250+
assert(!df.queryExecution.sparkPlan.exists(_.isInstanceOf[FilterExec]))
251+
assert(rows.length == 1)
252+
assert(rows(0).getInt(0) === 6)
253+
assert(rows(0).getString(1) === "jen")
254+
}
255+
246256
override def testDatetime(tbl: String): Unit = {
247257
val df1 = sql(s"SELECT name FROM $tbl WHERE " +
248258
"dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ")

sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,10 @@ private case class PostgresDialect()
310310
case _ => super.visitExtract(field, source)
311311
}
312312
}
313+
314+
override def visitBinaryArithmetic(name: String, l: String, r: String): String = {
315+
l + " " + name.replace('^', '#') + " " + r
316+
}
313317
}
314318

315319
override def compileExpression(expr: Expression): Option[String] = {

0 commit comments

Comments
 (0)