Skip to content

Commit 1c7f0ab

Browse files
yhuaimarmbrus
authored andcommitted
[SPARK-3339][SQL] Support for skipping json lines that fail to parse
This PR aims to provide a way to skip/query corrupt JSON records. To do so, we introduce an internal column to hold corrupt records (the default name is `_corrupt_record`. This name can be changed by setting the value of `spark.sql.columnNameOfCorruptRecord`). When there is a parsing error, we will put the corrupt record in its unparsed format to the internal column. Users can skip/query this column through SQL. * To query those corrupt records ``` -- For Hive parser SELECT `_corrupt_record` FROM jsonTable WHERE `_corrupt_record` IS NOT NULL -- For our SQL parser SELECT _corrupt_record FROM jsonTable WHERE _corrupt_record IS NOT NULL ``` * To skip corrupt records and query regular records ``` -- For Hive parser SELECT field1, field2 FROM jsonTable WHERE `_corrupt_record` IS NULL -- For our SQL parser SELECT field1, field2 FROM jsonTable WHERE _corrupt_record IS NULL ``` Generally, it is not recommended to change the name of the internal column. If the name has to be changed to avoid possible name conflicts, you can use `sqlContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, <new column name>)` or `sqlContext.sql(SET spark.sql.columnNameOfCorruptRecord=<new column name>)`. Author: Yin Huai <[email protected]> Closes #2680 from yhuai/corruptJsonRecord and squashes the following commits: 4c9828e [Yin Huai] Merge remote-tracking branch 'upstream/master' into corruptJsonRecord 309616a [Yin Huai] Change the default name of corrupt record to "_corrupt_record". b4a3632 [Yin Huai] Merge remote-tracking branch 'upstream/master' into corruptJsonRecord 9375ae9 [Yin Huai] Set the column name of corrupt json record back to the default one after the unit test. ee584c0 [Yin Huai] Provide a way to query corrupt json records as unparsed strings.
1 parent 1faa113 commit 1c7f0ab

File tree

6 files changed

+116
-19
lines changed

6 files changed

+116
-19
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ private[spark] object SQLConf {
3535
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
3636
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
3737
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
38+
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
3839

3940
// This is only used for the thriftserver
4041
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
@@ -131,6 +132,9 @@ private[sql] trait SQLConf {
131132
private[spark] def inMemoryPartitionPruning: Boolean =
132133
getConf(IN_MEMORY_PARTITION_PRUNING, "false").toBoolean
133134

135+
private[spark] def columnNameOfCorruptRecord: String =
136+
getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record")
137+
134138
/** ********************** SQLConf functionality methods ************ */
135139

136140
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
195195
*/
196196
@Experimental
197197
def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = {
198+
val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
198199
val appliedSchema =
199-
Option(schema).getOrElse(JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, 1.0)))
200-
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema)
200+
Option(schema).getOrElse(
201+
JsonRDD.nullTypeToStringType(
202+
JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
203+
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
201204
applySchema(rowRDD, appliedSchema)
202205
}
203206

@@ -206,8 +209,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
206209
*/
207210
@Experimental
208211
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
209-
val appliedSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio))
210-
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema)
212+
val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
213+
val appliedSchema =
214+
JsonRDD.nullTypeToStringType(
215+
JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
216+
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
211217
applySchema(rowRDD, appliedSchema)
212218
}
213219

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,12 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
148148
* It goes through the entire dataset once to determine the schema.
149149
*/
150150
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
151-
val appliedScalaSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))
152-
val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
151+
val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
152+
val appliedScalaSchema =
153+
JsonRDD.nullTypeToStringType(
154+
JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord))
155+
val scalaRowRDD =
156+
JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord)
153157
val logicalPlan =
154158
LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
155159
new JavaSchemaRDD(sqlContext, logicalPlan)
@@ -162,10 +166,14 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
162166
*/
163167
@Experimental
164168
def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = {
169+
val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
165170
val appliedScalaSchema =
166171
Option(asScalaDataType(schema)).getOrElse(
167-
JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))).asInstanceOf[SStructType]
168-
val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
172+
JsonRDD.nullTypeToStringType(
173+
JsonRDD.inferSchema(
174+
json.rdd, 1.0, columnNameOfCorruptJsonRecord))).asInstanceOf[SStructType]
175+
val scalaRowRDD = JsonRDD.jsonStringToRow(
176+
json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord)
169177
val logicalPlan =
170178
LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
171179
new JavaSchemaRDD(sqlContext, logicalPlan)

sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
2222
import scala.math.BigDecimal
2323
import java.sql.Timestamp
2424

25+
import com.fasterxml.jackson.core.JsonProcessingException
2526
import com.fasterxml.jackson.databind.ObjectMapper
2627

2728
import org.apache.spark.rdd.RDD
@@ -35,16 +36,19 @@ private[sql] object JsonRDD extends Logging {
3536

3637
private[sql] def jsonStringToRow(
3738
json: RDD[String],
38-
schema: StructType): RDD[Row] = {
39-
parseJson(json).map(parsed => asRow(parsed, schema))
39+
schema: StructType,
40+
columnNameOfCorruptRecords: String): RDD[Row] = {
41+
parseJson(json, columnNameOfCorruptRecords).map(parsed => asRow(parsed, schema))
4042
}
4143

4244
private[sql] def inferSchema(
4345
json: RDD[String],
44-
samplingRatio: Double = 1.0): StructType = {
46+
samplingRatio: Double = 1.0,
47+
columnNameOfCorruptRecords: String): StructType = {
4548
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
4649
val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
47-
val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
50+
val allKeys =
51+
parseJson(schemaData, columnNameOfCorruptRecords).map(allKeysWithValueTypes).reduce(_ ++ _)
4852
createSchema(allKeys)
4953
}
5054

@@ -274,7 +278,9 @@ private[sql] object JsonRDD extends Logging {
274278
case atom => atom
275279
}
276280

277-
private def parseJson(json: RDD[String]): RDD[Map[String, Any]] = {
281+
private def parseJson(
282+
json: RDD[String],
283+
columnNameOfCorruptRecords: String): RDD[Map[String, Any]] = {
278284
// According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72],
279285
// ObjectMapper will not return BigDecimal when
280286
// "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled
@@ -289,12 +295,16 @@ private[sql] object JsonRDD extends Logging {
289295
// For example: for {"key": 1, "key":2}, we will get "key"->2.
290296
val mapper = new ObjectMapper()
291297
iter.flatMap { record =>
292-
val parsed = mapper.readValue(record, classOf[Object]) match {
293-
case map: java.util.Map[_, _] => scalafy(map).asInstanceOf[Map[String, Any]] :: Nil
294-
case list: java.util.List[_] => scalafy(list).asInstanceOf[Seq[Map[String, Any]]]
295-
}
298+
try {
299+
val parsed = mapper.readValue(record, classOf[Object]) match {
300+
case map: java.util.Map[_, _] => scalafy(map).asInstanceOf[Map[String, Any]] :: Nil
301+
case list: java.util.List[_] => scalafy(list).asInstanceOf[Seq[Map[String, Any]]]
302+
}
296303

297-
parsed
304+
parsed
305+
} catch {
306+
case e: JsonProcessingException => Map(columnNameOfCorruptRecords -> record) :: Nil
307+
}
298308
}
299309
})
300310
}

sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import org.apache.spark.sql.catalyst.types._
2121
import org.apache.spark.sql.catalyst.util._
2222
import org.apache.spark.sql.json.JsonRDD.{enforceCorrectType, compatibleType}
2323
import org.apache.spark.sql.QueryTest
24+
import org.apache.spark.sql.SQLConf
25+
import org.apache.spark.sql.test.TestSQLContext
2426
import org.apache.spark.sql.test.TestSQLContext._
2527

2628
import java.sql.Timestamp
@@ -644,7 +646,65 @@ class JsonSuite extends QueryTest {
644646
("str_a_1", null, null) ::
645647
("str_a_2", null, null) ::
646648
(null, "str_b_3", null) ::
647-
("str_a_4", "str_b_4", "str_c_4") ::Nil
649+
("str_a_4", "str_b_4", "str_c_4") :: Nil
648650
)
649651
}
652+
653+
test("Corrupt records") {
654+
// Test if we can query corrupt records.
655+
val oldColumnNameOfCorruptRecord = TestSQLContext.columnNameOfCorruptRecord
656+
TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
657+
658+
val jsonSchemaRDD = jsonRDD(corruptRecords)
659+
jsonSchemaRDD.registerTempTable("jsonTable")
660+
661+
val schema = StructType(
662+
StructField("_unparsed", StringType, true) ::
663+
StructField("a", StringType, true) ::
664+
StructField("b", StringType, true) ::
665+
StructField("c", StringType, true) :: Nil)
666+
667+
assert(schema === jsonSchemaRDD.schema)
668+
669+
// In HiveContext, backticks should be used to access columns starting with a underscore.
670+
checkAnswer(
671+
sql(
672+
"""
673+
|SELECT a, b, c, _unparsed
674+
|FROM jsonTable
675+
""".stripMargin),
676+
(null, null, null, "{") ::
677+
(null, null, null, "") ::
678+
(null, null, null, """{"a":1, b:2}""") ::
679+
(null, null, null, """{"a":{, b:3}""") ::
680+
("str_a_4", "str_b_4", "str_c_4", null) ::
681+
(null, null, null, "]") :: Nil
682+
)
683+
684+
checkAnswer(
685+
sql(
686+
"""
687+
|SELECT a, b, c
688+
|FROM jsonTable
689+
|WHERE _unparsed IS NULL
690+
""".stripMargin),
691+
("str_a_4", "str_b_4", "str_c_4") :: Nil
692+
)
693+
694+
checkAnswer(
695+
sql(
696+
"""
697+
|SELECT _unparsed
698+
|FROM jsonTable
699+
|WHERE _unparsed IS NOT NULL
700+
""".stripMargin),
701+
Seq("{") ::
702+
Seq("") ::
703+
Seq("""{"a":1, b:2}""") ::
704+
Seq("""{"a":{, b:3}""") ::
705+
Seq("]") :: Nil
706+
)
707+
708+
TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord)
709+
}
650710
}

sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,13 @@ object TestJsonData {
143143
"""[{"a":"str_a_2"}, {"b":"str_b_3"}]""" ::
144144
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
145145
"""[]""" :: Nil)
146+
147+
val corruptRecords =
148+
TestSQLContext.sparkContext.parallelize(
149+
"""{""" ::
150+
"""""" ::
151+
"""{"a":1, b:2}""" ::
152+
"""{"a":{, b:3}""" ::
153+
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
154+
"""]""" :: Nil)
146155
}

0 commit comments

Comments
 (0)