From 446ae98e1f61d189bee5d192531229088bcb9219 Mon Sep 17 00:00:00 2001 From: Branden Smith Date: Mon, 21 Jan 2019 06:06:09 +0000 Subject: [PATCH 1/5] [WIP] filter out empty/whitespace JSON lines when skipping parsing --- .../sql/catalyst/util/FailureSafeParser.scala | 17 ++++++++---- .../datasources/json/JsonDataSource.scala | 26 ++++++++++++++++--- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 4baf052bfe564..dfeb76c2f168c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -55,11 +55,18 @@ class FailureSafeParser[IN]( def parse(input: IN): Iterator[InternalRow] = { try { - if (skipParsing) { - Iterator.single(InternalRow.empty) - } else { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) - } + Thread.dumpStack() + if (skipParsing) { + // scalastyle:off println + println("!!!! NOT PARSING !!!!") + // scalastyle:on println + Iterator.single(InternalRow.empty) + } else { + // scalastyle:off println + println("!!!! PARSING !!!!") + // scalastyle:on println + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + } } catch { case e: BadRecordException => mode match { case PermissiveMode => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 456f08a2a2ee7..002392e0da3ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.datasources.json import java.io.InputStream +import java.lang.Character.isWhitespace import java.net.URI +import java.nio.ByteBuffer import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import com.google.common.io.ByteStreams @@ -27,13 +29,12 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat - import org.apache.spark.TaskContext import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JsonInferSchema, JSONOptions} +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JSONOptions, JacksonParser, JsonInferSchema} import org.apache.spark.sql.catalyst.util.FailureSafeParser import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources._ @@ -42,6 +43,8 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils +import scala.collection.Iterator.continually + /** * Common functions for parsing JSON files */ @@ -125,6 +128,23 @@ object TextInputJsonDataSource extends JsonDataSource { .select("value").as(Encoders.STRING) } + private def isAllWhitespace(rowText: Text): Boolean = { + val afdsafdsa = + rowText.getLength == 0 || { + val rowTextBuffer = ByteBuffer.wrap(rowText.getBytes) + continually { + val cp = Text.bytesToCodePoint(rowTextBuffer) + print(s"[${Character.getName(cp)}]") + cp + } .takeWhile(_ >= 0).take(rowText.getLength).forall(isWhitespace) + } + // scalastyle:off println + println() + println(new String(rowText.getBytes)) + // scalastyle:on println + afdsafdsa + } + override def readFile( conf: Configuration, file: PartitionedFile, @@ -142,7 +162,7 @@ object TextInputJsonDataSource extends JsonDataSource { schema, parser.options.columnNameOfCorruptRecord, parser.options.multiLine) - linesReader.flatMap(safeParser.parse) + linesReader.filterNot(isAllWhitespace).flatMap(safeParser.parse) } private def textToUTF8String(value: Text): UTF8String = { From 236227fab6d5c9bedf696be8a8a071e5df6a3380 Mon Sep 17 00:00:00 2001 From: Branden Smith Date: Mon, 21 Jan 2019 16:39:33 +0000 Subject: [PATCH 2/5] remove println/dumpStack --- .../sql/catalyst/util/FailureSafeParser.scala | 7 ------- .../datasources/json/JsonDataSource.scala | 15 ++++----------- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index dfeb76c2f168c..fa6d8cc4ca913 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -55,16 +55,9 @@ class FailureSafeParser[IN]( def parse(input: IN): Iterator[InternalRow] = { try { - Thread.dumpStack() if (skipParsing) { - // scalastyle:off println - println("!!!! NOT PARSING !!!!") - // scalastyle:on println Iterator.single(InternalRow.empty) } else { - // scalastyle:off println - println("!!!! PARSING !!!!") - // scalastyle:on println rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) } } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 002392e0da3ce..2f1306b5dc9f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -129,20 +129,13 @@ object TextInputJsonDataSource extends JsonDataSource { } private def isAllWhitespace(rowText: Text): Boolean = { - val afdsafdsa = rowText.getLength == 0 || { val rowTextBuffer = ByteBuffer.wrap(rowText.getBytes) - continually { - val cp = Text.bytesToCodePoint(rowTextBuffer) - print(s"[${Character.getName(cp)}]") - cp - } .takeWhile(_ >= 0).take(rowText.getLength).forall(isWhitespace) + continually(Text.bytesToCodePoint(rowTextBuffer)) + .takeWhile(_ >= 0) + .take(rowText.getLength) + .forall(isWhitespace) } - // scalastyle:off println - println() - println(new String(rowText.getBytes)) - // scalastyle:on println - afdsafdsa } override def readFile( From e4d9052a4530b0a57b32ad141e606a834f518694 Mon Sep 17 00:00:00 2001 From: Branden Smith Date: Mon, 21 Jan 2019 18:57:59 +0000 Subject: [PATCH 3/5] add test for non-parsed JSON count --- .../resources/test-data/with-empty-line.json | 7 +++++++ .../execution/datasources/json/JsonSuite.scala | 17 +++++++++++++++++ 2 files changed, 24 insertions(+) create mode 100644 sql/core/src/test/resources/test-data/with-empty-line.json diff --git a/sql/core/src/test/resources/test-data/with-empty-line.json b/sql/core/src/test/resources/test-data/with-empty-line.json new file mode 100644 index 0000000000000..41573aa02a59e --- /dev/null +++ b/sql/core/src/test/resources/test-data/with-empty-line.json @@ -0,0 +1,7 @@ +{ "a" : 1 , "b" : 2 , "c" : 3 } + + { "a" : 4 , "b" : 5 , "c" : 6 } + +{ "a" : 7 , "b" : 8 , "c" : 9 } + + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 49dd9c22e831b..c9b289b963098 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2426,6 +2426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { countForMalformedJSON(0, Seq("")) } + test("count() for non-multiline input with empty lines") { + val withEmptyLineData = Array(Map("a" -> 1, "b" -> 2, "c" -> 3), + Map("a" -> 4, "b" -> 5, "c" -> 6), + Map("a" -> 7, "b" -> 8, "c" -> 9)) + val df = spark.read.json("src/test/resources/test-data/with-empty-line.json") + // important to do this .count() first, prior to caching/persisting/computing/collecting, to + // test the non-parsed-count pathway + assert(df.count() === withEmptyLineData.length, + "JSON DataFrame unparsed-count should exclude whitespace-only lines") + // cache and collect to check that count stays stable under those operations + df.cache() + assert(df.count() === withEmptyLineData.length, + "JSON DataFrame parsed-count should exclude whitespace-only lines") + val collected = df.collect().map(_.getValuesMap(Seq("a", "b", "c"))) + assert(collected === withEmptyLineData) + } + test("SPARK-25040: empty strings should be disallowed") { def failedOnEmptyString(dataType: DataType): Unit = { val df = spark.read.schema(s"a ${dataType.catalogString}") From 13942b8da17d7e58e690b7a8b27c1a3fe20be135 Mon Sep 17 00:00:00 2001 From: Branden Smith Date: Tue, 22 Jan 2019 05:35:08 +0000 Subject: [PATCH 4/5] fix scala import style errors --- .../sql/execution/datasources/json/JsonDataSource.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 2f1306b5dc9f3..7b9964aeecb9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -22,6 +22,8 @@ import java.lang.Character.isWhitespace import java.net.URI import java.nio.ByteBuffer +import scala.collection.Iterator.continually + import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import com.google.common.io.ByteStreams import org.apache.hadoop.conf.Configuration @@ -29,12 +31,13 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + import org.apache.spark.TaskContext import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JSONOptions, JacksonParser, JsonInferSchema} +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JsonInferSchema, JSONOptions} import org.apache.spark.sql.catalyst.util.FailureSafeParser import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources._ @@ -43,8 +46,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils -import scala.collection.Iterator.continually - /** * Common functions for parsing JSON files */ From 051d84a624413cdf7cd5285b88072ca32663d3ad Mon Sep 17 00:00:00 2001 From: Branden Smith Date: Wed, 23 Jan 2019 05:33:04 +0000 Subject: [PATCH 5/5] push down non-parsed json record filter into FailureSafeParser --- .../sql/catalyst/util/FailureSafeParser.scala | 9 ++++++-- .../datasources/json/JsonDataSource.scala | 23 +++++++++++-------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index fa6d8cc4ca913..5ad954cd2a07a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -28,7 +28,8 @@ class FailureSafeParser[IN]( mode: ParseMode, schema: StructType, columnNameOfCorruptRecord: String, - isMultiLine: Boolean) { + isMultiLine: Boolean, + unparsedRecordIsNonEmpty: IN => Boolean = (_: IN) => true) { private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) @@ -56,7 +57,11 @@ class FailureSafeParser[IN]( def parse(input: IN): Iterator[InternalRow] = { try { if (skipParsing) { - Iterator.single(InternalRow.empty) + if (unparsedRecordIsNonEmpty(input)) { + Iterator.single(InternalRow.empty) + } else { + Iterator.empty + } } else { rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 7b9964aeecb9b..810824f428385 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -129,14 +129,16 @@ object TextInputJsonDataSource extends JsonDataSource { .select("value").as(Encoders.STRING) } - private def isAllWhitespace(rowText: Text): Boolean = { - rowText.getLength == 0 || { - val rowTextBuffer = ByteBuffer.wrap(rowText.getBytes) - continually(Text.bytesToCodePoint(rowTextBuffer)) - .takeWhile(_ >= 0) - .take(rowText.getLength) - .forall(isWhitespace) - } + private def textLineHasNonWhitespace(rowText: Text): Boolean = { + val isAllWhitespace: Boolean = + rowText.getLength == 0 || { + val rowTextBuffer = ByteBuffer.wrap(rowText.getBytes) + continually(Text.bytesToCodePoint(rowTextBuffer)) + .takeWhile(_ >= 0) + .take(rowText.getLength) + .forall(isWhitespace) + } + !isAllWhitespace } override def readFile( @@ -155,8 +157,9 @@ object TextInputJsonDataSource extends JsonDataSource { parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) - linesReader.filterNot(isAllWhitespace).flatMap(safeParser.parse) + parser.options.multiLine, + textLineHasNonWhitespace) + linesReader.flatMap(safeParser.parse) } private def textToUTF8String(value: Text): UTF8String = {