Skip to content

Commit d9996c6

Browse files
huaxingaodongjoon-hyun
authored andcommitted
[SPARK-41990][SQL] Use FieldReference.column instead of apply in V1 to V2 filter conversion
### What changes were proposed in this pull request? Use `FieldReference.column` instead of `FieldReference.apply` in V1 to V2 filter conversion ### Why are the changes needed? Previously, filtering by composite field name doesn't work ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes apache#39564 from huaxingao/field_reference. Authored-by: huaxingao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 2878cd8 commit d9996c6

File tree

2 files changed

+42
-13
lines changed

2 files changed

+42
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ package org.apache.spark.sql.sources
1919

2020
import org.apache.spark.annotation.{Evolving, Stable}
2121
import org.apache.spark.sql.catalyst.expressions.Literal
22+
import org.apache.spark.sql.catalyst.parser.ParseException
2223
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
23-
import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
24+
import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue, NamedReference}
2425
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate}
2526
import org.apache.spark.sql.types.StringType
2627
import org.apache.spark.unsafe.types.UTF8String
@@ -74,6 +75,15 @@ sealed abstract class Filter {
7475
* Converts V1 filter to V2 filter
7576
*/
7677
private[sql] def toV2: Predicate
78+
79+
protected def toV2Column(attribute: String): NamedReference = {
80+
try {
81+
FieldReference(attribute)
82+
} catch {
83+
case _: ParseException =>
84+
FieldReference.column(attribute)
85+
}
86+
}
7787
}
7888

7989
/**
@@ -91,7 +101,7 @@ case class EqualTo(attribute: String, value: Any) extends Filter {
91101
override def toV2: Predicate = {
92102
val literal = Literal(value)
93103
new Predicate("=",
94-
Array(FieldReference(attribute), LiteralValue(literal.value, literal.dataType)))
104+
Array(toV2Column(attribute), LiteralValue(literal.value, literal.dataType)))
95105
}
96106
}
97107

@@ -111,7 +121,7 @@ case class EqualNullSafe(attribute: String, value: Any) extends Filter {
111121
override def toV2: Predicate = {
112122
val literal = Literal(value)
113123
new Predicate("<=>",
114-
Array(FieldReference(attribute), LiteralValue(literal.value, literal.dataType)))
124+
Array(toV2Column(attribute), LiteralValue(literal.value, literal.dataType)))
115125
}
116126
}
117127

@@ -130,7 +140,7 @@ case class GreaterThan(attribute: String, value: Any) extends Filter {
130140
override def toV2: Predicate = {
131141
val literal = Literal(value)
132142
new Predicate(">",
133-
Array(FieldReference(attribute), LiteralValue(literal.value, literal.dataType)))
143+
Array(toV2Column(attribute), LiteralValue(literal.value, literal.dataType)))
134144
}
135145
}
136146

@@ -149,7 +159,7 @@ case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter {
149159
override def toV2: Predicate = {
150160
val literal = Literal(value)
151161
new Predicate(">=",
152-
Array(FieldReference(attribute), LiteralValue(literal.value, literal.dataType)))
162+
Array(toV2Column(attribute), LiteralValue(literal.value, literal.dataType)))
153163
}
154164
}
155165

@@ -168,7 +178,7 @@ case class LessThan(attribute: String, value: Any) extends Filter {
168178
override def toV2: Predicate = {
169179
val literal = Literal(value)
170180
new Predicate("<",
171-
Array(FieldReference(attribute), LiteralValue(literal.value, literal.dataType)))
181+
Array(toV2Column(attribute), LiteralValue(literal.value, literal.dataType)))
172182
}
173183
}
174184

@@ -187,7 +197,7 @@ case class LessThanOrEqual(attribute: String, value: Any) extends Filter {
187197
override def toV2: Predicate = {
188198
val literal = Literal(value)
189199
new Predicate("<=",
190-
Array(FieldReference(attribute), LiteralValue(literal.value, literal.dataType)))
200+
Array(toV2Column(attribute), LiteralValue(literal.value, literal.dataType)))
191201
}
192202
}
193203

@@ -230,7 +240,7 @@ case class In(attribute: String, values: Array[Any]) extends Filter {
230240
val literal = Literal(value)
231241
LiteralValue(literal.value, literal.dataType)
232242
}
233-
new Predicate("IN", FieldReference(attribute) +: literals)
243+
new Predicate("IN", toV2Column(attribute) +: literals)
234244
}
235245
}
236246

@@ -245,7 +255,7 @@ case class In(attribute: String, values: Array[Any]) extends Filter {
245255
@Stable
246256
case class IsNull(attribute: String) extends Filter {
247257
override def references: Array[String] = Array(attribute)
248-
override def toV2: Predicate = new Predicate("IS_NULL", Array(FieldReference(attribute)))
258+
override def toV2: Predicate = new Predicate("IS_NULL", Array(toV2Column(attribute)))
249259
}
250260

251261
/**
@@ -259,7 +269,7 @@ case class IsNull(attribute: String) extends Filter {
259269
@Stable
260270
case class IsNotNull(attribute: String) extends Filter {
261271
override def references: Array[String] = Array(attribute)
262-
override def toV2: Predicate = new Predicate("IS_NOT_NULL", Array(FieldReference(attribute)))
272+
override def toV2: Predicate = new Predicate("IS_NOT_NULL", Array(toV2Column(attribute)))
263273
}
264274

265275
/**
@@ -308,7 +318,7 @@ case class Not(child: Filter) extends Filter {
308318
case class StringStartsWith(attribute: String, value: String) extends Filter {
309319
override def references: Array[String] = Array(attribute)
310320
override def toV2: Predicate = new Predicate("STARTS_WITH",
311-
Array(FieldReference(attribute), LiteralValue(UTF8String.fromString(value), StringType)))
321+
Array(toV2Column(attribute), LiteralValue(UTF8String.fromString(value), StringType)))
312322
}
313323

314324
/**
@@ -324,7 +334,7 @@ case class StringStartsWith(attribute: String, value: String) extends Filter {
324334
case class StringEndsWith(attribute: String, value: String) extends Filter {
325335
override def references: Array[String] = Array(attribute)
326336
override def toV2: Predicate = new Predicate("ENDS_WITH",
327-
Array(FieldReference(attribute), LiteralValue(UTF8String.fromString(value), StringType)))
337+
Array(toV2Column(attribute), LiteralValue(UTF8String.fromString(value), StringType)))
328338
}
329339

330340
/**
@@ -340,7 +350,7 @@ case class StringEndsWith(attribute: String, value: String) extends Filter {
340350
case class StringContains(attribute: String, value: String) extends Filter {
341351
override def references: Array[String] = Array(attribute)
342352
override def toV2: Predicate = new Predicate("CONTAINS",
343-
Array(FieldReference(attribute), LiteralValue(UTF8String.fromString(value), StringType)))
353+
Array(toV2Column(attribute), LiteralValue(UTF8String.fromString(value), StringType)))
344354
}
345355

346356
/**

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,20 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
276276
"INSERT INTO test.datetime VALUES ('2018-07-12', '2018-07-12 09:51:15.0')").executeUpdate()
277277
conn.commit()
278278

279+
conn.prepareStatement(
280+
"CREATE TABLE test.composite_name (`last name` TEXT(32) NOT NULL, id INTEGER NOT NULL)")
281+
.executeUpdate()
282+
conn.prepareStatement("INSERT INTO test.composite_name VALUES ('smith', 1)").executeUpdate()
283+
conn.prepareStatement("INSERT INTO test.composite_name VALUES ('jones', 2)").executeUpdate()
284+
conn.commit()
285+
286+
sql(
287+
s"""
288+
|CREATE OR REPLACE TEMPORARY VIEW composite_name
289+
|USING org.apache.spark.sql.jdbc
290+
|OPTIONS (url '$url', dbtable 'TEST.COMPOSITE_NAME', user 'testUser', password 'testPass')
291+
""".stripMargin.replaceAll("\n", " "))
292+
279293
// Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types.
280294
}
281295

@@ -1963,4 +1977,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
19631977
}
19641978
}
19651979
}
1980+
1981+
test("SPARK-41990: Filter with composite name") {
1982+
val df = sql("SELECT * FROM composite_name WHERE `last name` = 'smith'")
1983+
assert(df.collect.toSet === Set(Row("smith", 1)))
1984+
}
19661985
}

0 commit comments

Comments
 (0)