From 466e3dda451e99abaef44942118cc40f9e9a6660 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 28 Jan 2019 12:42:25 +0800 Subject: [PATCH] Revert count optimization in JSON datasource by SPARK-24959 --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 16 +++++++++++----- .../catalyst/expressions/csvExpressions.scala | 3 +-- .../catalyst/expressions/jsonExpressions.scala | 3 +-- .../sql/catalyst/util/FailureSafeParser.scala | 11 ++--------- sql/core/benchmarks/JSONBenchmark-results.txt | 1 - .../org/apache/spark/sql/DataFrameReader.scala | 6 ++---- .../datasources/json/JsonDataSource.scala | 6 ++---- .../datasources/json/JsonBenchmark.scala | 3 --- 8 files changed, 19 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 82a5b3c302b18..79dff6fda33d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -188,11 +188,19 @@ class UnivocityParser( } } + private val doParse = if (requiredSchema.nonEmpty) { + (input: String) => convert(tokenizer.parseLine(input)) + } else { + // If `columnPruning` enabled and partition attributes scanned only, + // `schema` gets empty. + (_: String) => InternalRow.empty + } + /** * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): InternalRow = doParse(input) private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) @@ -282,8 +290,7 @@ private[sql] object UnivocityParser { input => Seq(parser.convert(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) val handleHeader: () => Unit = () => headerChecker.checkHeaderColumnNames(tokenizer) @@ -336,8 +343,7 @@ private[sql] object UnivocityParser { input => Seq(parser.parse(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) filteredLines.flatMap(safeParser.parse) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 83b0299bac440..65b10f36373d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -117,8 +117,7 @@ case class CsvToStructs( input => Seq(rawParser.parse(input)), mode, nullableSchema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) } override def dataType: DataType = nullableSchema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 3403349c8974e..655e44e4e4919 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -582,8 +582,7 @@ case class JsonToStructs( input => rawParser.parse(input, createParser, identity[UTF8String]), mode, parserSchema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) } override def dataType: DataType = nullableSchema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 76745b11c84c9..361c8b29db33d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -27,8 +27,7 @@ class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, schema: StructType, - columnNameOfCorruptRecord: String, - isMultiLine: Boolean) { + columnNameOfCorruptRecord: String) { private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) @@ -56,15 +55,9 @@ class FailureSafeParser[IN]( } } - private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty - def parse(input: IN): Iterator[InternalRow] = { try { - if (skipParsing) { - Iterator.single(InternalRow.empty) - } else { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) - } + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) } catch { case e: BadRecordException => mode match { case PermissiveMode => diff --git a/sql/core/benchmarks/JSONBenchmark-results.txt b/sql/core/benchmarks/JSONBenchmark-results.txt index 947f57da08442..f16e60c333748 100644 --- a/sql/core/benchmarks/JSONBenchmark-results.txt +++ b/sql/core/benchmarks/JSONBenchmark-results.txt @@ -41,7 +41,6 @@ Select a subset of 10 columns: Best/Avg Time(ms) Rate(M/s) Per Ro ------------------------------------------------------------------------------------------------ Select 10 columns + count() 19539 / 19896 0.5 1953.9 1.0X Select 1 column + count() 16412 / 16445 0.6 1641.2 1.2X -count() 2783 / 2801 3.6 278.3 7.0X Preparing data for benchmarking ... Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 2b1521730bc07..713c9a9faa742 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -468,8 +468,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, schema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming) @@ -538,8 +537,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => Seq(rawParser.parse(input)), parsedOptions.parseMode, schema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 456f08a2a2ee7..7ec2267e3461f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -140,8 +140,7 @@ object TextInputJsonDataSource extends JsonDataSource { input => parser.parse(input, textParser, textToUTF8String), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) linesReader.flatMap(safeParser.parse) } @@ -225,8 +224,7 @@ object MultiLineJsonDataSource extends JsonDataSource { input => parser.parse[InputStream](input, streamParser, partitionedFileString), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) safeParser.parse( CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala index 27f702348c0b7..25f76205f86de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala @@ -217,9 +217,6 @@ object JSONBenchmark extends SqlBasedBenchmark { benchmark.addCase(s"Select 1 column + count()", numIters) { _ => ds.select($"col1").filter((_: Row) => true).count() } - benchmark.addCase(s"count()", numIters) { _ => - ds.count() - } benchmark.run() }