Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import org.apache.spark.util.Utils

private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)

private[sql] class NotAllowedNullException(msg: String) extends SparkSQLJsonProcessingException(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this one to differentiate this case from other SparkSQLJsonProcessingException[1] because when it fails to parse due to not-allowed null, permissive parse mode is not allowed (because permissive mode virtually means allowing nulls on other fields).

[1]https://github.com/HyukjinKwon/spark/blob/cb5ece70bc840820697fa73943bc28bd63c96b41/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala#L471-L474


/**
* Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
*/
Expand Down Expand Up @@ -61,46 +63,50 @@ class JacksonParser(
@transient
private[this] var isWarningPrintedForMalformedRecord: Boolean = false

private def dropmalformedModeWarningMessage(record: String): String =
s"""Found at least one malformed records (sample: $record). The JSON reader will drop
|all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
|corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
|mode and use the default inferred schema.
|
|Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column $columnNameOfCorruptRecord
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin

private def permissiveModeWarningMessage(record: String): String =
s"""Found at least one malformed records (sample: $record). The JSON reader will replace
|all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
|To find out which corrupted records have been replaced with null, please use the
|default inferred schema instead of providing a custom schema.
|
|Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column $columnNameOfCorruptRecord.
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin

/**
* This function deals with the cases it fails to parse. This function will be called
* when exceptions are caught during converting. This functions also deals with `mode` option.
*/
private def failedRecord(record: String): Seq[InternalRow] = {
private def nullSafeFailedRecord(record: String): Seq[InternalRow] = {
// create a row even if no corrupt record column is present
if (options.failFast) {
throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
if (!isWarningPrintedForMalformedRecord) {
logWarning(
s"""Found at least one malformed records (sample: $record). The JSON reader will drop
|all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
|corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
|mode and use the default inferred schema.
|
|Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column ${columnNameOfCorruptRecord}
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin)
logWarning(dropmalformedModeWarningMessage(record))
isWarningPrintedForMalformedRecord = true
}
Nil
} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
if (!isWarningPrintedForMalformedRecord) {
logWarning(
s"""Found at least one malformed records (sample: $record). The JSON reader will replace
|all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
|To find out which corrupted records have been replaced with null, please use the
|default inferred schema instead of providing a custom schema.
|
|Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column ${columnNameOfCorruptRecord}.
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin)
logWarning(permissiveModeWarningMessage(record))
isWarningPrintedForMalformedRecord = true
}
emptyRow
Expand All @@ -114,6 +120,18 @@ class JacksonParser(
}
}

private def failedRecord(record: String): Seq[InternalRow] = {
if (options.dropMalformed) {
if (!isWarningPrintedForMalformedRecord) {
logWarning(dropmalformedModeWarningMessage(record))
isWarningPrintedForMalformedRecord = true
}
Nil
} else {
throw new NotAllowedNullException(s"Null not allowed: $record")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain why null is not allowed in the error message. Is it possible to know which field is not nullable? I am not sure if just printing the record is really helpful at here (considering users may often have long JSON records).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me try to produce a better message.

}
}

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
Expand All @@ -122,7 +140,7 @@ class JacksonParser(
def makeRootConverter(dataType: DataType): ValueConverter = dataType match {
case st: StructType =>
val elementConverter = makeConverter(st)
val fieldConverters = st.map(_.dataType).map(makeConverter)
val fieldConverters = st.map(f => makeConverter(f.dataType, f.nullable))
(parser: JsonParser) => parseJsonToken(parser, dataType) {
case START_OBJECT => convertObject(parser, st, fieldConverters)
// SPARK-3308: support reading top level JSON arrays and take every element
Expand All @@ -143,7 +161,7 @@ class JacksonParser(

case ArrayType(st: StructType, _) =>
val elementConverter = makeConverter(st)
val fieldConverters = st.map(_.dataType).map(makeConverter)
val fieldConverters = st.map(f => makeConverter(f.dataType, f.nullable))
(parser: JsonParser) => parseJsonToken(parser, dataType) {
// the business end of SPARK-3308:
// when an object is found but an array is requested just wrap it in a list.
Expand All @@ -159,35 +177,37 @@ class JacksonParser(
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema.
*/
private def makeConverter(dataType: DataType): ValueConverter = dataType match {
private def makeConverter(
dataType: DataType,
nullable: Boolean = true): ValueConverter = dataType match {
case BooleanType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_TRUE => true
case VALUE_FALSE => false
}

case ByteType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_NUMBER_INT => parser.getByteValue
}

case ShortType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_NUMBER_INT => parser.getShortValue
}

case IntegerType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_NUMBER_INT => parser.getIntValue
}

case LongType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_NUMBER_INT => parser.getLongValue
}

case FloatType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
parser.getFloatValue

Expand All @@ -207,7 +227,7 @@ class JacksonParser(
}

case DoubleType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
parser.getDoubleValue

Expand All @@ -227,7 +247,7 @@ class JacksonParser(
}

case StringType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_STRING =>
UTF8String.fromString(parser.getText)

Expand All @@ -241,7 +261,7 @@ class JacksonParser(
}

case TimestampType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_STRING =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Expand All @@ -257,7 +277,7 @@ class JacksonParser(
}

case DateType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_STRING =>
val stringValue = parser.getText
// This one will lose microseconds parts.
Expand All @@ -276,43 +296,53 @@ class JacksonParser(
}

case BinaryType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case VALUE_STRING => parser.getBinaryValue
}

case dt: DecimalType =>
(parser: JsonParser) => parseJsonToken(parser, dataType) {
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
Decimal(parser.getDecimalValue, dt.precision, dt.scale)
}

case st: StructType =>
val fieldConverters = st.map(_.dataType).map(makeConverter)
(parser: JsonParser) => parseJsonToken(parser, dataType) {
val fieldConverters = st.map(f => makeConverter(f.dataType, f.nullable))
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case START_OBJECT => convertObject(parser, st, fieldConverters)
}

case at: ArrayType =>
val elementConverter = makeConverter(at.elementType)
(parser: JsonParser) => parseJsonToken(parser, dataType) {
val elementConverter = makeConverter(at.elementType, at.containsNull)
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case START_ARRAY => convertArray(parser, elementConverter)
}

case mt: MapType =>
val valueConverter = makeConverter(mt.valueType)
(parser: JsonParser) => parseJsonToken(parser, dataType) {
val valueConverter = makeConverter(mt.valueType, mt.valueContainsNull)
(parser: JsonParser) => parseJsonToken(parser, dataType, nullable) {
case START_OBJECT => convertMap(parser, valueConverter)
}

case udt: UserDefinedType[_] =>
makeConverter(udt.sqlType)
makeConverter(udt.sqlType, nullable)

case _ =>
(parser: JsonParser) =>
// Here, we pass empty `PartialFunction` so that this case can be
// handled as a failed conversion. It will throw an exception as
// long as the value is not null.
parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any])
parseJsonToken(parser, dataType, nullable)(PartialFunction.empty[JsonToken, Any])
}

private def makeNullConverter(nullable: Boolean): ValueConverter = {
if (nullable) {
(parser: JsonParser) => null
} else {
(parser: JsonParser) =>
throw new NotAllowedNullException(
s"Null not allowed (current token: ${parser.getCurrentToken}).")
}
}

/**
Expand All @@ -322,16 +352,18 @@ class JacksonParser(
*/
private def parseJsonToken(
parser: JsonParser,
dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = {
dataType: DataType,
nullable: Boolean = true)(f: PartialFunction[JsonToken, Any]): Any = {
val nullConverter = makeNullConverter(nullable)
parser.getCurrentToken match {
case FIELD_NAME =>
// There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens
parser.nextToken()
parseJsonToken(parser, dataType)(f)
parseJsonToken(parser, dataType, nullable)(f)

case null | VALUE_NULL => null
case null | VALUE_NULL => nullConverter.apply(parser)

case other => f.applyOrElse(other, failedConversion(parser, dataType))
case other => f.applyOrElse(other, failedConversion(parser, dataType, nullable))
}
}

Expand All @@ -341,18 +373,23 @@ class JacksonParser(
*/
private def failedConversion(
parser: JsonParser,
dataType: DataType): PartialFunction[JsonToken, Any] = {
case VALUE_STRING if parser.getTextLength < 1 =>
// If conversion is failed, this produces `null` rather than throwing exception.
// This will protect the mismatch of types.
null

case token =>
// We cannot parse this token based on the given data type. So, we throw a
// SparkSQLJsonProcessingException and this exception will be caught by
// `parse` method.
throw new SparkSQLJsonProcessingException(
s"Failed to parse a value for data type $dataType (current token: $token).")
dataType: DataType,
nullable: Boolean): PartialFunction[JsonToken, Any] = {
val nullConverter = makeNullConverter(nullable)

{
case VALUE_STRING if parser.getTextLength < 1 =>
// If conversion is failed, this produces `null` rather than throwing exception.
// This will protect the mismatch of types.
nullConverter.apply(parser)

case token =>
// We cannot parse this token based on the given data type. So, we throw a
// SparkSQLJsonProcessingException and this exception will be caught by
// `parse` method.
throw new SparkSQLJsonProcessingException(
s"Failed to parse a value for data type $dataType (current token: $token).")
}
}

/**
Expand Down Expand Up @@ -418,7 +455,6 @@ class JacksonParser(
Utils.tryWithResource(factory.createParser(input)) { parser =>
parser.nextToken()
rootConverter.apply(parser) match {
case null => failedRecord(input)
case row: InternalRow => row :: Nil
case array: ArrayData =>
// Here, as we support reading top level JSON arrays and take every element
Expand All @@ -428,15 +464,14 @@ class JacksonParser(
} else {
array.toArray[InternalRow](schema)
}
case _ =>
failedRecord(input)
case _ => nullSafeFailedRecord(input)
}
}
} catch {
case _: JsonProcessingException =>
failedRecord(input)
case _: SparkSQLJsonProcessingException =>
case e: NotAllowedNullException =>
failedRecord(input)
case _: JsonProcessingException | _: SparkSQLJsonProcessingException =>
nullSafeFailedRecord(input)
}
}
}
Expand Down
Loading