From c8d8ff5523c93dec6888c72492da7e9e4fd4c6aa Mon Sep 17 00:00:00 2001 From: sandeep katta Date: Tue, 17 Sep 2019 23:22:29 +0530 Subject: [PATCH] fix malformed record issue --- .../datasources/csv/UnivocityParser.scala | 7 ++++--- .../src/test/resources/test-data/malformedRow.csv | 5 +++++ .../sql/execution/datasources/csv/CSVSuite.scala | 14 ++++++++++++++ 3 files changed, 23 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/malformedRow.csv diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 42e396452522..69bd11f0ae3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -203,12 +203,13 @@ class UnivocityParser( } } - private val doParse = if (requiredSchema.nonEmpty) { - (input: String) => convert(tokenizer.parseLine(input)) - } else { + private val doParse = if (options.columnPruning && requiredSchema.isEmpty) { // If `columnPruning` enabled and partition attributes scanned only, // `schema` gets empty. (_: String) => InternalRow.empty + } else { + // parse if the columnPruning is disabled or requiredSchema is nonEmpty + (input: String) => convert(tokenizer.parseLine(input)) } /** diff --git a/sql/core/src/test/resources/test-data/malformedRow.csv b/sql/core/src/test/resources/test-data/malformedRow.csv new file mode 100644 index 000000000000..8cfb3eefb982 --- /dev/null +++ b/sql/core/src/test/resources/test-data/malformedRow.csv @@ -0,0 +1,5 @@ +fruit,color,price,quantity +apple,red,1,3 +banana,yellow,2,4 +orange,orange,3,5 +malformedrow diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index df9d1548f1ea..d714cb2433ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -62,6 +62,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te private val datesFile = "test-data/dates.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" private val valueMalformedFile = "test-data/value-malformed.csv" + private val malformedRowFile = "test-data/malformedRow.csv" /** Verifies data and schema. */ private def verifyCars( @@ -1861,4 +1862,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } } + + test("SPARK-29101 test count with DROPMALFORMED mode") { + Seq((true, 4), (false, 3)).foreach { case (csvColumnPruning, expectedCount) => + withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> csvColumnPruning.toString) { + val count = spark.read + .option("header", "true") + .option("mode", "DROPMALFORMED") + .csv(testFile(malformedRowFile)) + .count() + assert(expectedCount == count) + } + } + } }