Skip to content

Commit e42724b

Browse files
sureshthalamatirxin
authored andcommitted
[SPARK-13167][SQL] Include rows with null values for partition column when reading from JDBC datasources.
Rows with null values in partition column are not included in the results because none of the partition where clause specify is null predicate on the partition column. This fix adds is null predicate on the partition column to the first JDBC partition where clause. Example: JDBCPartition(THEID < 1 or THEID is null, 0),JDBCPartition(THEID >= 1 AND THEID < 2,1), JDBCPartition(THEID >= 2, 2) Author: sureshthalamati <[email protected]> Closes #11063 from sureshthalamati/nullable_jdbc_part_col_spark-13167.
1 parent a640c5b commit e42724b

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ private[sql] object JDBCRelation {
4444
* exactly once. The parameters minValue and maxValue are advisory in that
4545
* incorrect values may cause the partitioning to be poor, but no data
4646
* will fail to be represented.
47+
*
48+
* Null value predicate is added to the first partition where clause to include
49+
* the rows with null value for the partitions column.
50+
*
51+
* @param partitioning partition information to generate the where clause for each partition
52+
* @return an array of partitions with where clause for each partition
4753
*/
4854
def columnPartition(partitioning: JDBCPartitioningInfo): Array[Partition] = {
4955
if (partitioning == null) return Array[Partition](JDBCPartition(null, 0))
@@ -66,7 +72,7 @@ private[sql] object JDBCRelation {
6672
if (upperBound == null) {
6773
lowerBound
6874
} else if (lowerBound == null) {
69-
upperBound
75+
s"$upperBound or $column is null"
7076
} else {
7177
s"$lowerBound AND $upperBound"
7278
}

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,27 @@ class JDBCSuite extends SparkFunSuite
171171
|OPTIONS (url '$url', dbtable 'TEST.NULLTYPES', user 'testUser', password 'testPass')
172172
""".stripMargin.replaceAll("\n", " "))
173173

174+
conn.prepareStatement(
175+
"create table test.emp(name TEXT(32) NOT NULL," +
176+
" theid INTEGER, \"Dept\" INTEGER)").executeUpdate()
177+
conn.prepareStatement(
178+
"insert into test.emp values ('fred', 1, 10)").executeUpdate()
179+
conn.prepareStatement(
180+
"insert into test.emp values ('mary', 2, null)").executeUpdate()
181+
conn.prepareStatement(
182+
"insert into test.emp values ('joe ''foo'' \"bar\"', 3, 30)").executeUpdate()
183+
conn.prepareStatement(
184+
"insert into test.emp values ('kathy', null, null)").executeUpdate()
185+
conn.commit()
186+
187+
sql(
188+
s"""
189+
|CREATE TEMPORARY TABLE nullparts
190+
|USING org.apache.spark.sql.jdbc
191+
|OPTIONS (url '$url', dbtable 'TEST.EMP', user 'testUser', password 'testPass',
192+
|partitionColumn '"Dept"', lowerBound '1', upperBound '4', numPartitions '4')
193+
""".stripMargin.replaceAll("\n", " "))
194+
174195
// Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types.
175196
}
176197

@@ -338,6 +359,23 @@ class JDBCSuite extends SparkFunSuite
338359
.collect().length === 3)
339360
}
340361

362+
test("Partioning on column that might have null values.") {
363+
assert(
364+
sqlContext.read.jdbc(urlWithUserAndPass, "TEST.EMP", "theid", 0, 4, 3, new Properties)
365+
.collect().length === 4)
366+
assert(
367+
sqlContext.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new Properties)
368+
.collect().length === 4)
369+
// partitioning on a nullable quoted column
370+
assert(
371+
sqlContext.read.jdbc(urlWithUserAndPass, "TEST.EMP", """"Dept"""", 0, 4, 3, new Properties)
372+
.collect().length === 4)
373+
}
374+
375+
test("SELECT * on partitioned table with a nullable partioncolumn") {
376+
assert(sql("SELECT * FROM nullparts").collect().size == 4)
377+
}
378+
341379
test("H2 integral types") {
342380
val rows = sql("SELECT * FROM inttypes WHERE A IS NOT NULL").collect()
343381
assert(rows.length === 1)

0 commit comments

Comments
 (0)