diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 4baf052bfe564..5ad954cd2a07a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -28,7 +28,8 @@ class FailureSafeParser[IN]( mode: ParseMode, schema: StructType, columnNameOfCorruptRecord: String, - isMultiLine: Boolean) { + isMultiLine: Boolean, + unparsedRecordIsNonEmpty: IN => Boolean = (_: IN) => true) { private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) @@ -55,11 +56,15 @@ class FailureSafeParser[IN]( def parse(input: IN): Iterator[InternalRow] = { try { - if (skipParsing) { - Iterator.single(InternalRow.empty) - } else { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) - } + if (skipParsing) { + if (unparsedRecordIsNonEmpty(input)) { + Iterator.single(InternalRow.empty) + } else { + Iterator.empty + } + } else { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + } } catch { case e: BadRecordException => mode match { case PermissiveMode => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 456f08a2a2ee7..810824f428385 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -18,7 +18,11 @@ package org.apache.spark.sql.execution.datasources.json import java.io.InputStream +import java.lang.Character.isWhitespace import java.net.URI +import java.nio.ByteBuffer + +import scala.collection.Iterator.continually import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import com.google.common.io.ByteStreams @@ -125,6 +129,18 @@ object TextInputJsonDataSource extends JsonDataSource { .select("value").as(Encoders.STRING) } + private def textLineHasNonWhitespace(rowText: Text): Boolean = { + val isAllWhitespace: Boolean = + rowText.getLength == 0 || { + val rowTextBuffer = ByteBuffer.wrap(rowText.getBytes) + continually(Text.bytesToCodePoint(rowTextBuffer)) + .takeWhile(_ >= 0) + .take(rowText.getLength) + .forall(isWhitespace) + } + !isAllWhitespace + } + override def readFile( conf: Configuration, file: PartitionedFile, @@ -141,7 +157,8 @@ object TextInputJsonDataSource extends JsonDataSource { parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.multiLine, + textLineHasNonWhitespace) linesReader.flatMap(safeParser.parse) } diff --git a/sql/core/src/test/resources/test-data/with-empty-line.json b/sql/core/src/test/resources/test-data/with-empty-line.json new file mode 100644 index 0000000000000..41573aa02a59e --- /dev/null +++ b/sql/core/src/test/resources/test-data/with-empty-line.json @@ -0,0 +1,7 @@ +{ "a" : 1 , "b" : 2 , "c" : 3 } + + { "a" : 4 , "b" : 5 , "c" : 6 } + +{ "a" : 7 , "b" : 8 , "c" : 9 } + + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 49dd9c22e831b..c9b289b963098 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2426,6 +2426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { countForMalformedJSON(0, Seq("")) } + test("count() for non-multiline input with empty lines") { + val withEmptyLineData = Array(Map("a" -> 1, "b" -> 2, "c" -> 3), + Map("a" -> 4, "b" -> 5, "c" -> 6), + Map("a" -> 7, "b" -> 8, "c" -> 9)) + val df = spark.read.json("src/test/resources/test-data/with-empty-line.json") + // important to do this .count() first, prior to caching/persisting/computing/collecting, to + // test the non-parsed-count pathway + assert(df.count() === withEmptyLineData.length, + "JSON DataFrame unparsed-count should exclude whitespace-only lines") + // cache and collect to check that count stays stable under those operations + df.cache() + assert(df.count() === withEmptyLineData.length, + "JSON DataFrame parsed-count should exclude whitespace-only lines") + val collected = df.collect().map(_.getValuesMap(Seq("a", "b", "c"))) + assert(collected === withEmptyLineData) + } + test("SPARK-25040: empty strings should be disallowed") { def failedOnEmptyString(dataType: DataType): Unit = { val df = spark.read.schema(s"a ${dataType.catalogString}")