Skip to content

Commit 9814b97

Browse files
Nathan Howellyhuai
authored andcommitted
[SPARK-8093] [SQL] Remove empty structs inferred from JSON documents
Author: Nathan Howell <[email protected]> Closes apache#6799 from NathanHowell/spark-8093 and squashes the following commits: 76ac3e8 [Nathan Howell] [SPARK-8093] [SQL] Remove empty structs inferred from JSON documents
1 parent 1fa29c2 commit 9814b97

File tree

3 files changed

+48
-17
lines changed

3 files changed

+48
-17
lines changed

sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private[sql] object InferSchema {
4343
}
4444

4545
// perform schema inference on each row and merge afterwards
46-
schemaData.mapPartitions { iter =>
46+
val rootType = schemaData.mapPartitions { iter =>
4747
val factory = new JsonFactory()
4848
iter.map { row =>
4949
try {
@@ -55,8 +55,13 @@ private[sql] object InferSchema {
5555
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
5656
}
5757
}
58-
}.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType) match {
59-
case st: StructType => nullTypeToStringType(st)
58+
}.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType)
59+
60+
canonicalizeType(rootType) match {
61+
case Some(st: StructType) => st
62+
case _ =>
63+
// canonicalizeType erases all empty structs, including the only one we want to keep
64+
StructType(Seq())
6065
}
6166
}
6267

@@ -116,22 +121,35 @@ private[sql] object InferSchema {
116121
}
117122
}
118123

119-
private def nullTypeToStringType(struct: StructType): StructType = {
120-
val fields = struct.fields.map {
121-
case StructField(fieldName, dataType, nullable, _) =>
122-
val newType = dataType match {
123-
case NullType => StringType
124-
case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
125-
case ArrayType(struct: StructType, containsNull) =>
126-
ArrayType(nullTypeToStringType(struct), containsNull)
127-
case struct: StructType => nullTypeToStringType(struct)
128-
case other: DataType => other
129-
}
124+
/**
125+
* Convert NullType to StringType and remove StructTypes with no fields
126+
*/
127+
private def canonicalizeType: DataType => Option[DataType] = {
128+
case at@ArrayType(elementType, _) =>
129+
for {
130+
canonicalType <- canonicalizeType(elementType)
131+
} yield {
132+
at.copy(canonicalType)
133+
}
130134

131-
StructField(fieldName, newType, nullable)
132-
}
135+
case StructType(fields) =>
136+
val canonicalFields = for {
137+
field <- fields
138+
if field.name.nonEmpty
139+
canonicalType <- canonicalizeType(field.dataType)
140+
} yield {
141+
field.copy(dataType = canonicalType)
142+
}
143+
144+
if (canonicalFields.nonEmpty) {
145+
Some(StructType(canonicalFields))
146+
} else {
147+
// per SPARK-8093: empty structs should be deleted
148+
None
149+
}
133150

134-
StructType(fields)
151+
case NullType => Some(StringType)
152+
case other => Some(other)
135153
}
136154

137155
/**

sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,4 +1103,8 @@ class JsonSuite extends QueryTest with TestJsonData {
11031103
}
11041104
}
11051105

1106+
test("SPARK-8093 Erase empty structs") {
1107+
val emptySchema = InferSchema(emptyRecords, 1.0, "")
1108+
assert(StructType(Seq()) === emptySchema)
1109+
}
11061110
}

sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,5 +189,14 @@ trait TestJsonData {
189189
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
190190
"""]""" :: Nil)
191191

192+
def emptyRecords: RDD[String] =
193+
ctx.sparkContext.parallelize(
194+
"""{""" ::
195+
"""""" ::
196+
"""{"a": {}}""" ::
197+
"""{"a": {"b": {}}}""" ::
198+
"""{"b": [{"c": {}}]}""" ::
199+
"""]""" :: Nil)
200+
192201
def empty: RDD[String] = ctx.sparkContext.parallelize(Seq[String]())
193202
}

0 commit comments

Comments
 (0)