From 84cb0686aaa05c9f8710efdc5d119ebfa47f986c Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 31 Jan 2019 15:47:57 +0800 Subject: [PATCH] Revert count optimization in JSON datasource by SPARK-24959 --- .../org/apache/spark/sql/DataFrameReader.scala | 6 ++---- .../datasources/FailureSafeParser.scala | 11 ++--------- .../datasources/csv/UnivocityParser.scala | 16 +++++++++++----- .../datasources/json/JsonDataSource.scala | 6 ++---- .../datasources/json/JsonBenchmarks.scala | 4 ---- 5 files changed, 17 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 869c584aed909..e9278a0964935 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -450,8 +450,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, schema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming) @@ -526,8 +525,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => Seq(rawParser.parse(input)), parsedOptions.parseMode, schema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index 90e81661bae7a..e618f17236784 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -29,8 +29,7 @@ class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, schema: StructType, - columnNameOfCorruptRecord: String, - isMultiLine: Boolean) { + columnNameOfCorruptRecord: String) { private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) @@ -58,15 +57,9 @@ class FailureSafeParser[IN]( } } - private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty - def parse(input: IN): Iterator[InternalRow] = { try { - if (skipParsing) { - Iterator.single(InternalRow.empty) - } else { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) - } + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) } catch { case e: BadRecordException => mode match { case PermissiveMode => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 9088d43905e28..42e3964525220 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -203,11 +203,19 @@ class UnivocityParser( } } + private val doParse = if (requiredSchema.nonEmpty) { + (input: String) => convert(tokenizer.parseLine(input)) + } else { + // If `columnPruning` enabled and partition attributes scanned only, + // `schema` gets empty. + (_: String) => InternalRow.empty + } + /** * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): InternalRow = doParse(input) private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) @@ -290,8 +298,7 @@ private[csv] object UnivocityParser { input => Seq(parser.convert(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens => safeParser.parse(tokens) }.flatten @@ -339,8 +346,7 @@ private[csv] object UnivocityParser { input => Seq(parser.parse(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) filteredLines.flatMap(safeParser.parse) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 76f58371ae264..d6c588894d7f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -139,8 +139,7 @@ object TextInputJsonDataSource extends JsonDataSource { input => parser.parse(input, textParser, textToUTF8String), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) linesReader.flatMap(safeParser.parse) } @@ -224,8 +223,7 @@ object MultiLineJsonDataSource extends JsonDataSource { input => parser.parse[InputStream](input, streamParser, partitionedFileString), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) safeParser.parse( CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala index a2b747eaab411..5592aa6e3346b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala @@ -194,9 +194,6 @@ object JSONBenchmarks { benchmark.addCase(s"Select 1 column + count()", 3) { _ => ds.select($"col1").filter((_: Row) => true).count() } - benchmark.addCase(s"count()", 3) { _ => - ds.count() - } /* Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz @@ -205,7 +202,6 @@ object JSONBenchmarks { --------------------------------------------------------------------------------------------- Select 10 columns + count() 9961 / 10006 1.0 996.1 1.0X Select 1 column + count() 8355 / 8470 1.2 835.5 1.2X - count() 2104 / 2156 4.8 210.4 4.7X */ benchmark.run() }