Skip to content

Commit e4f397c

Browse files
committed
Unit tests.
1 parent b2c06f8 commit e4f397c

File tree

2 files changed

+76
-3
lines changed

2 files changed

+76
-3
lines changed

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
595595
}
596596
}
597597

598-
test("Pre insert nullability check") {
598+
test("Pre insert nullability check (ArrayType)") {
599599
val df1 =
600600
createDataFrame(Tuple1(Seq(Int.box(1), null.asInstanceOf[Integer])) :: Nil).toDF("a")
601601
val expectedSchema1 =
@@ -610,11 +610,55 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
610610
StructType(
611611
StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
612612
assert(df2.schema === expectedSchema2)
613-
df2.saveAsTable("arrayInParquet", SaveMode.Append)
613+
df2.insertInto("arrayInParquet", overwrite = false)
614+
createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a")
615+
.saveAsTable("arrayInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
616+
createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a")
617+
.saveAsTable("arrayInParquet", "parquet", SaveMode.Append)
618+
refreshTable("arrayInParquet")
614619

615620
checkAnswer(
616621
sql("SELECT a FROM arrayInParquet"),
617-
Row(ArrayBuffer(1, null)) :: Row(ArrayBuffer(2, 3)) :: Nil)
622+
Row(ArrayBuffer(1, null)) ::
623+
Row(ArrayBuffer(2, 3)) ::
624+
Row(ArrayBuffer(4, 5)) ::
625+
Row(ArrayBuffer(6, null)) :: Nil)
626+
627+
sql("DROP TABLE arrayInParquet")
628+
}
629+
630+
test("Pre insert nullability check (MapType)") {
631+
val df1 =
632+
createDataFrame(Tuple1(Map(1 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
633+
val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = true)
634+
val expectedSchema1 =
635+
StructType(
636+
StructField("a", mapType1, nullable = true) :: Nil)
637+
assert(df1.schema === expectedSchema1)
638+
df1.saveAsTable("mapInParquet", "parquet", SaveMode.Overwrite)
639+
640+
val df2 =
641+
createDataFrame(Tuple1(Map(2 -> 3)) :: Nil).toDF("a")
642+
val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = false)
643+
val expectedSchema2 =
644+
StructType(
645+
StructField("a", mapType2, nullable = true) :: Nil)
646+
assert(df2.schema === expectedSchema2)
647+
df2.insertInto("mapInParquet", overwrite = false)
648+
createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
649+
.saveAsTable("mapInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
650+
createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
651+
.saveAsTable("mapInParquet", "parquet", SaveMode.Append)
652+
refreshTable("mapInParquet")
653+
654+
checkAnswer(
655+
sql("SELECT a FROM mapInParquet"),
656+
Row(Map(1 -> null)) ::
657+
Row(Map(2 -> 3)) ::
658+
Row(Map(4 -> 5)) ::
659+
Row(Map(6 -> null)) :: Nil)
660+
661+
sql("DROP TABLE mapInParquet")
618662
}
619663

620664
test("SPARK-6024 wide schema support") {

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.hive.test.TestHive._
3030
import org.apache.spark.sql.hive.test.TestHive.implicits._
3131
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
3232
import org.apache.spark.sql.SaveMode
33+
import org.apache.spark.sql.types._
3334

3435
// The data where the partitioning key exists only in the directory structure.
3536
case class ParquetData(intField: Int, stringField: String)
@@ -450,6 +451,34 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
450451
super.afterAll()
451452
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
452453
}
454+
455+
test("values in arrays and maps stored in parquet are always nullable") {
456+
val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a")
457+
val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false)
458+
val arrayType1 = ArrayType(IntegerType, containsNull = false)
459+
val expectedSchema1 =
460+
StructType(
461+
StructField("m", mapType1, nullable = true) ::
462+
StructField("a", arrayType1, nullable = true) :: Nil)
463+
assert(df.schema === expectedSchema1)
464+
465+
df.saveAsTable("alwaysNullable", "parquet")
466+
467+
val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
468+
val arrayType2 = ArrayType(IntegerType, containsNull = true)
469+
val expectedSchema2 =
470+
StructType(
471+
StructField("m", mapType2, nullable = true) ::
472+
StructField("a", arrayType2, nullable = true) :: Nil)
473+
474+
assert(table("alwaysNullable").schema === expectedSchema2)
475+
476+
checkAnswer(
477+
sql("SELECT m, a FROM alwaysNullable"),
478+
Row(Map(2 -> 3), Seq(4, 5, 6)))
479+
480+
sql("DROP TABLE alwaysNullable")
481+
}
453482
}
454483

455484
class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {

0 commit comments

Comments
 (0)