From 88241865ac192a04270ab86b1be65f1b29466707 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 3 Oct 2016 16:49:54 +0900 Subject: [PATCH 1/2] JacksonParser silently parses null as 0 when the field is not nullable --- .../sql/catalyst/json/JacksonParser.scala | 175 +++++++++++------- .../datasources/json/JsonSuite.scala | 46 ++++- 2 files changed, 149 insertions(+), 72 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index f80e6373d2f89..1415bef40fb34 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -34,6 +34,8 @@ import org.apache.spark.util.Utils private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg) +private[sql] class NotAllowedNullException(msg: String) extends SparkSQLJsonProcessingException(msg) + /** * Constructs a parser for a given schema that translates a json string to an [[InternalRow]]. */ @@ -61,46 +63,50 @@ class JacksonParser( @transient private[this] var isWarningPrintedForMalformedRecord: Boolean = false + private def dropmalformedModeWarningMessage(record: String): String = + s"""Found at least one malformed records (sample: $record). The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |Code example to print all malformed records (scala): + |=================================================== + |// The corrupted record exists in column $columnNameOfCorruptRecord + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin + + private def permissiveModeWarningMessage(record: String): String = + s"""Found at least one malformed records (sample: $record). The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |Code example to print all malformed records (scala): + |=================================================== + |// The corrupted record exists in column $columnNameOfCorruptRecord. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin + /** * This function deals with the cases it fails to parse. This function will be called * when exceptions are caught during converting. This functions also deals with `mode` option. */ - private def failedRecord(record: String): Seq[InternalRow] = { + private def nullSafeFailedRecord(record: String): Seq[InternalRow] = { // create a row even if no corrupt record column is present if (options.failFast) { throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record") } if (options.dropMalformed) { if (!isWarningPrintedForMalformedRecord) { - logWarning( - s"""Found at least one malformed records (sample: $record). The JSON reader will drop - |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which - |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE - |mode and use the default inferred schema. - | - |Code example to print all malformed records (scala): - |=================================================== - |// The corrupted record exists in column ${columnNameOfCorruptRecord} - |val parsedJson = spark.read.json("/path/to/json/file/test.json") - | - """.stripMargin) + logWarning(dropmalformedModeWarningMessage(record)) isWarningPrintedForMalformedRecord = true } Nil } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) { if (!isWarningPrintedForMalformedRecord) { - logWarning( - s"""Found at least one malformed records (sample: $record). The JSON reader will replace - |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. - |To find out which corrupted records have been replaced with null, please use the - |default inferred schema instead of providing a custom schema. - | - |Code example to print all malformed records (scala): - |=================================================== - |// The corrupted record exists in column ${columnNameOfCorruptRecord}. - |val parsedJson = spark.read.json("/path/to/json/file/test.json") - | - """.stripMargin) + logWarning(permissiveModeWarningMessage(record)) isWarningPrintedForMalformedRecord = true } emptyRow @@ -114,6 +120,18 @@ class JacksonParser( } } + private def failedRecord(record: String): Seq[InternalRow] = { + if (options.dropMalformed) { + if (!isWarningPrintedForMalformedRecord) { + logWarning(dropmalformedModeWarningMessage(record)) + isWarningPrintedForMalformedRecord = true + } + Nil + } else { + throw new NotAllowedNullException(s"Null not allowed: $record") + } + } + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method @@ -122,7 +140,7 @@ class JacksonParser( def makeRootConverter(dataType: DataType): ValueConverter = dataType match { case st: StructType => val elementConverter = makeConverter(st) - val fieldConverters = st.map(_.dataType).map(makeConverter) + val fieldConverters = st.map(f => makeConverter(f.dataType, f.nullable)) (parser: JsonParser) => parseJsonToken(parser, dataType) { case START_OBJECT => convertObject(parser, st, fieldConverters) // SPARK-3308: support reading top level JSON arrays and take every element @@ -143,7 +161,7 @@ class JacksonParser( case ArrayType(st: StructType, _) => val elementConverter = makeConverter(st) - val fieldConverters = st.map(_.dataType).map(makeConverter) + val fieldConverters = st.map(f => makeConverter(f.dataType, f.nullable)) (parser: JsonParser) => parseJsonToken(parser, dataType) { // the business end of SPARK-3308: // when an object is found but an array is requested just wrap it in a list. @@ -159,35 +177,37 @@ class JacksonParser( * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. */ - private def makeConverter(dataType: DataType): ValueConverter = dataType match { + private def makeConverter( + dataType: DataType, + nullable: Boolean = true): ValueConverter = dataType match { case BooleanType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_TRUE => true case VALUE_FALSE => false } case ByteType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_NUMBER_INT => parser.getByteValue } case ShortType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_NUMBER_INT => parser.getShortValue } case IntegerType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_NUMBER_INT => parser.getIntValue } case LongType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_NUMBER_INT => parser.getLongValue } case FloatType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => parser.getFloatValue @@ -207,7 +227,7 @@ class JacksonParser( } case DoubleType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => parser.getDoubleValue @@ -227,7 +247,7 @@ class JacksonParser( } case StringType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_STRING => UTF8String.fromString(parser.getText) @@ -241,7 +261,7 @@ class JacksonParser( } case TimestampType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_STRING => // This one will lose microseconds parts. // See https://issues.apache.org/jira/browse/SPARK-10681. @@ -257,7 +277,7 @@ class JacksonParser( } case DateType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_STRING => val stringValue = parser.getText // This one will lose microseconds parts. @@ -276,43 +296,53 @@ class JacksonParser( } case BinaryType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case VALUE_STRING => parser.getBinaryValue } case dt: DecimalType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => Decimal(parser.getDecimalValue, dt.precision, dt.scale) } case st: StructType => - val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => parseJsonToken(parser, dataType) { + val fieldConverters = st.map(f => makeConverter(f.dataType, f.nullable)) + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case START_OBJECT => convertObject(parser, st, fieldConverters) } case at: ArrayType => - val elementConverter = makeConverter(at.elementType) - (parser: JsonParser) => parseJsonToken(parser, dataType) { + val elementConverter = makeConverter(at.elementType, at.containsNull) + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case START_ARRAY => convertArray(parser, elementConverter) } case mt: MapType => - val valueConverter = makeConverter(mt.valueType) - (parser: JsonParser) => parseJsonToken(parser, dataType) { + val valueConverter = makeConverter(mt.valueType, mt.valueContainsNull) + (parser: JsonParser) => parseJsonToken(parser, dataType, nullable) { case START_OBJECT => convertMap(parser, valueConverter) } case udt: UserDefinedType[_] => - makeConverter(udt.sqlType) + makeConverter(udt.sqlType, nullable) case _ => (parser: JsonParser) => // Here, we pass empty `PartialFunction` so that this case can be // handled as a failed conversion. It will throw an exception as // long as the value is not null. - parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any]) + parseJsonToken(parser, dataType, nullable)(PartialFunction.empty[JsonToken, Any]) + } + + private def makeNullConverter(nullable: Boolean): ValueConverter = { + if (nullable) { + (parser: JsonParser) => null + } else { + (parser: JsonParser) => + throw new NotAllowedNullException( + s"Null not allowed (current token: ${parser.getCurrentToken}).") + } } /** @@ -322,16 +352,18 @@ class JacksonParser( */ private def parseJsonToken( parser: JsonParser, - dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = { + dataType: DataType, + nullable: Boolean = true)(f: PartialFunction[JsonToken, Any]): Any = { + val nullParser = makeNullConverter(nullable) parser.getCurrentToken match { case FIELD_NAME => // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens parser.nextToken() - parseJsonToken(parser, dataType)(f) + parseJsonToken(parser, dataType, nullable)(f) - case null | VALUE_NULL => null + case null | VALUE_NULL => nullParser.apply(parser) - case other => f.applyOrElse(other, failedConversion(parser, dataType)) + case other => f.applyOrElse(other, failedConversion(parser, dataType, nullable)) } } @@ -341,18 +373,23 @@ class JacksonParser( */ private def failedConversion( parser: JsonParser, - dataType: DataType): PartialFunction[JsonToken, Any] = { - case VALUE_STRING if parser.getTextLength < 1 => - // If conversion is failed, this produces `null` rather than throwing exception. - // This will protect the mismatch of types. - null - - case token => - // We cannot parse this token based on the given data type. So, we throw a - // SparkSQLJsonProcessingException and this exception will be caught by - // `parse` method. - throw new SparkSQLJsonProcessingException( - s"Failed to parse a value for data type $dataType (current token: $token).") + dataType: DataType, + nullable: Boolean): PartialFunction[JsonToken, Any] = { + val nullParser = makeNullConverter(nullable) + + { + case VALUE_STRING if parser.getTextLength < 1 => + // If conversion is failed, this produces `null` rather than throwing exception. + // This will protect the mismatch of types. + nullParser.apply(parser) + + case token => + // We cannot parse this token based on the given data type. So, we throw a + // SparkSQLJsonProcessingException and this exception will be caught by + // `parse` method. + throw new SparkSQLJsonProcessingException( + s"Failed to parse a value for data type $dataType (current token: $token).") + } } /** @@ -418,7 +455,6 @@ class JacksonParser( Utils.tryWithResource(factory.createParser(input)) { parser => parser.nextToken() rootConverter.apply(parser) match { - case null => failedRecord(input) case row: InternalRow => row :: Nil case array: ArrayData => // Here, as we support reading top level JSON arrays and take every element @@ -428,15 +464,14 @@ class JacksonParser( } else { array.toArray[InternalRow](schema) } - case _ => - failedRecord(input) + case _ => nullSafeFailedRecord(input) } } } catch { - case _: JsonProcessingException => - failedRecord(input) - case _: SparkSQLJsonProcessingException => + case e: NotAllowedNullException => failedRecord(input) + case _: JsonProcessingException | _: SparkSQLJsonProcessingException => + nullSafeFailedRecord(input) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 456052f79afcc..5910212c184c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -50,8 +50,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(expected.getClass == actual.getClass, s"Failed to promote ${actual.getClass} to ${expected.getClass}.") assert(expected == actual, - s"Promoted value ${actual}(${actual.getClass}) does not equal the expected value " + - s"${expected}(${expected.getClass}).") + s"Promoted value $actual(${actual.getClass}) does not equal the expected value " + + s"$expected(${expected.getClass}).") } val factory = new JsonFactory() @@ -1749,4 +1749,46 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat) } } + + test("Do not allow siently parsing the values for non-nullable fields") { + // Json having null. + val testJson = """{"nullInt":null}""" :: Nil + val testSchema = StructType(StructField("nullInt", IntegerType, false) :: Nil) + val data = spark.sparkContext.parallelize(testJson) + + val exceptionOne = intercept[SparkException] { + spark.read.schema(testSchema).option("mode", "PERMISSIVE").json(data).collect() + } + assert(exceptionOne.getMessage.contains("Null not allowed: {")) + + val exceptionTwo = intercept[SparkException] { + spark.read.schema(testSchema).option("mode", "FAILFAST").json(data).collect() + } + assert(exceptionTwo.getMessage.contains("Null not allowed: {")) + + val dropmalformedDf = spark.read.schema(testSchema).option("mode", "DROPMALFORMED").json(data) + assert(dropmalformedDf.collect().length == 0) + + // Nested json having null. + val testNestedJson = """{"nullStruct": { "e" : null }}""" :: Nil + val testNestedSchema = + StructType( + StructField("nullStruct", + StructType(StructField("e", StringType, false) :: Nil), true):: Nil) + val nestedData = spark.sparkContext.parallelize(testNestedJson) + + val exceptionThree = intercept[SparkException] { + spark.read.schema(testNestedSchema).option("mode", "PERMISSIVE").json(nestedData).collect() + } + assert(exceptionThree.getMessage.contains("Null not allowed: {")) + + val exceptionFour = intercept[SparkException] { + spark.read.schema(testNestedSchema).option("mode", "FAILFAST").json(nestedData).collect() + } + assert(exceptionFour.getMessage.contains("Null not allowed: {")) + + val dropmalformedNestedDf = + spark.read.schema(testNestedSchema).option("mode", "DROPMALFORMED").json(nestedData) + assert(dropmalformedNestedDf.collect().length == 0) + } } From cb5ece70bc840820697fa73943bc28bd63c96b41 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 3 Oct 2016 19:59:37 +0900 Subject: [PATCH 2/2] Consistent name --- .../apache/spark/sql/catalyst/json/JacksonParser.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 1415bef40fb34..9de79d4fdcd85 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -354,14 +354,14 @@ class JacksonParser( parser: JsonParser, dataType: DataType, nullable: Boolean = true)(f: PartialFunction[JsonToken, Any]): Any = { - val nullParser = makeNullConverter(nullable) + val nullConverter = makeNullConverter(nullable) parser.getCurrentToken match { case FIELD_NAME => // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens parser.nextToken() parseJsonToken(parser, dataType, nullable)(f) - case null | VALUE_NULL => nullParser.apply(parser) + case null | VALUE_NULL => nullConverter.apply(parser) case other => f.applyOrElse(other, failedConversion(parser, dataType, nullable)) } @@ -375,13 +375,13 @@ class JacksonParser( parser: JsonParser, dataType: DataType, nullable: Boolean): PartialFunction[JsonToken, Any] = { - val nullParser = makeNullConverter(nullable) + val nullConverter = makeNullConverter(nullable) { case VALUE_STRING if parser.getTextLength < 1 => // If conversion is failed, this produces `null` rather than throwing exception. // This will protect the mismatch of types. - nullParser.apply(parser) + nullConverter.apply(parser) case token => // We cannot parse this token based on the given data type. So, we throw a