diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index dc26a28c74f11..e7bfb77e46c26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf /** * Options for parsing JSON data into Spark SQL rows. @@ -76,6 +77,10 @@ private[sql] class JSONOptions( // Whether to ignore column of all null values or empty array/struct during schema inference val dropFieldIfAllNull = parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false) + // Whether to ignore null fields during json generating + val ignoreNullFields = parameters.getOrElse("ignoreNullFields", + SQLConf.get.jsonGeneratorIgnoreNullFields).toBoolean + // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 3ee7e484690d5..aaf2ecf7923ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -184,6 +184,9 @@ private[sql] class JacksonGenerator( if (!row.isNullAt(i)) { gen.writeFieldName(field.name) fieldWriters(i).apply(row, i) + } else if (!options.ignoreNullFields) { + gen.writeFieldName(field.name) + gen.writeNull() } i += 1 } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0ec661fc16c88..69d5a552c611c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1153,6 +1153,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val JSON_GENERATOR_IGNORE_NULL_FIELDS = + buildConf("spark.sql.jsonGenerator.ignoreNullFields") + .doc("If false, JacksonGenerator will generate null for null fields in Struct.") + .stringConf + .createWithDefault("true") + val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream sink.") @@ -2323,6 +2329,8 @@ class SQLConf extends Serializable with Logging { def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE) + def jsonGeneratorIgnoreNullFields: String = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS) + def parallelFileListingInStatsComputation: Boolean = getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala index 9b27490ed0e35..2bb948ec24fb3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala @@ -39,6 +39,33 @@ class JacksonGeneratorSuite extends SparkFunSuite { assert(writer.toString === """{"a":1}""") } + test("SPARK-29444: initial with StructType and write out an empty row " + + "with ignoreNullFields=false") { + val dataType = StructType(StructField("a", IntegerType) :: Nil) + val input = InternalRow(null) + val writer = new CharArrayWriter() + val allowNullOption = + new JSONOptions(Map("ignoreNullFields" -> "false"), gmtId) + val gen = new JacksonGenerator(dataType, writer, allowNullOption) + gen.write(input) + gen.flush() + assert(writer.toString === """{"a":null}""") + } + + test("SPARK-29444: initial with StructType field and write out a row " + + "with ignoreNullFields=false and struct inner null") { + val fieldType = StructType(StructField("b", IntegerType) :: Nil) + val dataType = StructType(StructField("a", fieldType) :: Nil) + val input = InternalRow(InternalRow(null)) + val writer = new CharArrayWriter() + val allowNullOption = + new JSONOptions(Map("ignoreNullFields" -> "false"), gmtId) + val gen = new JacksonGenerator(dataType, writer, allowNullOption) + gen.write(input) + gen.flush() + assert(writer.toString === """{"a":{"b":null}}""") + } + test("initial with StructType and write out rows") { val dataType = StructType(StructField("a", IntegerType) :: Nil) val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil)