From 507a929694653d49d1eb42398131743e0d004f65 Mon Sep 17 00:00:00 2001 From: lxsmnv Date: Sun, 19 Feb 2017 20:52:40 -0500 Subject: [PATCH 1/5] SPARK-19340 file path resolution for csv files fixed --- .../execution/datasources/DataSource.scala | 49 ++++++++++++------- .../datasources/csv/CSVFileFormat.scala | 11 +++-- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d510581f90e69..cac1f9bc1a098 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -376,25 +376,7 @@ case class DataSource( val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) - val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && - catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { - val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes - new CatalogFileIndex( - sparkSession, - catalogTable.get, - catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) - } else { - new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema)) - } - - HadoopFsRelation( - fileCatalog, - partitionSchema = partitionSchema, - dataSchema = dataSchema.asNullable, - bucketSpec = bucketSpec, - format, - caseInsensitiveOptions)(sparkSession) - + createHadoopRelation(format, globbedPaths) case _ => throw new AnalysisException( s"$className is not a valid Spark SQL Data Source.") @@ -403,6 +385,35 @@ case class DataSource( relation } + /** + * Creates Hadoop relation based on format and globbed file paths + * @param format format of the data source file + * @param globPaths Path to the file resolved by Hadoop library + * @return Hadoop relation object + */ + def createHadoopRelation(format: FileFormat, + globPaths: Array[Path]): BaseRelation = { + val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) + val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && + catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { + val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes + new CatalogFileIndex( + sparkSession, + catalogTable.get, + catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) + } else { + new InMemoryFileIndex(sparkSession, globPaths, options, Some(partitionSchema)) + } + + HadoopFsRelation( + fileCatalog, + partitionSchema = partitionSchema, + dataSchema = dataSchema.asNullable, + bucketSpec = bucketSpec, + format, + caseInsensitiveOptions)(sparkSession) + } + /** * Writes the given [[DataFrame]] out in this [[FileFormat]]. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 566f40f454393..dfe5e3009ec81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -56,8 +56,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { require(files.nonEmpty, "Cannot infer schema from an empty set of files") val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) - val paths = files.map(_.getPath.toString) - val lines: Dataset[String] = createBaseDataset(sparkSession, csvOptions, paths) + val lines: Dataset[String] = createBaseDataset(sparkSession, csvOptions, files) val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis Some(CSVInferSchema.infer(lines, caseSensitive, csvOptions)) } @@ -128,14 +127,16 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { private def createBaseDataset( sparkSession: SparkSession, options: CSVOptions, - inputPaths: Seq[String]): Dataset[String] = { + inputPaths: Seq[FileStatus]): Dataset[String] = { if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { + // Fix for SPARK-19340. resolveRelation replaces with createHadoopRelation + // to avoid pattern resolution for already resolved file path sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, - paths = inputPaths, + paths = inputPaths.map(_.getPath().toString), className = classOf[TextFileFormat].getName - ).resolveRelation(checkFilesExist = false)) + ).createHadoopRelation(new TextFileFormat, inputPaths.map(_.getPath).toArray)) .select("value").as[String](Encoders.STRING) } else { val charset = options.charset From db0b82238893c5da818e68b19d0bb29dcc945753 Mon Sep 17 00:00:00 2001 From: lxsmnv Date: Mon, 20 Feb 2017 14:45:06 -0500 Subject: [PATCH 2/5] Removed redundant getOrInferFileFormatSchema call --- .../sql/execution/datasources/DataSource.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index cac1f9bc1a098..f3536548fc882 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -374,8 +374,6 @@ case class DataSource( globPath }.toArray - val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) - createHadoopRelation(format, globbedPaths) case _ => throw new AnalysisException( @@ -384,13 +382,12 @@ case class DataSource( relation } - /** - * Creates Hadoop relation based on format and globbed file paths - * @param format format of the data source file - * @param globPaths Path to the file resolved by Hadoop library - * @return Hadoop relation object - */ + * Creates Hadoop relation based on format and globbed file paths + * @param format format of the data source file + * @param globPaths Path to the file resolved by Hadoop library + * @return Hadoop relation object + */ def createHadoopRelation(format: FileFormat, globPaths: Array[Path]): BaseRelation = { val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) From 9bcb78f9a884265b6d1d36868a6c6472785dbab5 Mon Sep 17 00:00:00 2001 From: lxsmnv Date: Mon, 20 Feb 2017 17:15:46 -0500 Subject: [PATCH 3/5] Fixed schema inference for non UTF-8 files --- .../spark/sql/execution/datasources/csv/CSVFileFormat.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index dfe5e3009ec81..f0075cf4e7e44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -128,20 +128,21 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession: SparkSession, options: CSVOptions, inputPaths: Seq[FileStatus]): Dataset[String] = { + val pathValues = inputPaths.map(_.getPath().toString) if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { // Fix for SPARK-19340. resolveRelation replaces with createHadoopRelation // to avoid pattern resolution for already resolved file path sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, - paths = inputPaths.map(_.getPath().toString), + paths = pathValues, className = classOf[TextFileFormat].getName ).createHadoopRelation(new TextFileFormat, inputPaths.map(_.getPath).toArray)) .select("value").as[String](Encoders.STRING) } else { val charset = options.charset val rdd = sparkSession.sparkContext - .hadoopFile[LongWritable, Text, TextInputFormat](inputPaths.mkString(",")) + .hadoopFile[LongWritable, Text, TextInputFormat](pathValues.mkString(",")) .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) sparkSession.createDataset(rdd)(Encoders.STRING) } From 02c9f3af6708b249e2bf3a60a104d7d59a4cd43c Mon Sep 17 00:00:00 2001 From: lxsmnv Date: Mon, 20 Feb 2017 19:12:42 -0500 Subject: [PATCH 4/5] Test added for SPARK-19340 --- .../src/test/resources/test-data/filename19340{00-1}.csv | 3 +++ .../spark/sql/execution/datasources/csv/CSVSuite.scala | 8 ++++++++ 2 files changed, 11 insertions(+) create mode 100644 sql/core/src/test/resources/test-data/filename19340{00-1}.csv diff --git a/sql/core/src/test/resources/test-data/filename19340{00-1}.csv b/sql/core/src/test/resources/test-data/filename19340{00-1}.csv new file mode 100644 index 0000000000000..b5ccbe180822b --- /dev/null +++ b/sql/core/src/test/resources/test-data/filename19340{00-1}.csv @@ -0,0 +1,3 @@ +1, one +2, two +3, three diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 0c9a7298c3fa0..b0cc5933601cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -53,6 +53,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val numbersFile = "test-data/numbers.csv" private val datesFile = "test-data/dates.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" + private val filenameSpecialChr = "filename19340*.csv" private def testFile(fileName: String): String = { Thread.currentThread().getContextClassLoader.getResource(fileName).toString @@ -958,4 +959,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { checkAnswer(df, Row(1, null)) } } + + test("SPARK-19340 special characters in csv file name") { + val csvDF = spark.read + .option("header", "false") + // testFile doesn't work with filenames that contain special characters + .csv(testFile("test-data") + "/" + filenameSpecialChr ).take(1) + } } From fb3fb3166972513c792291db7a47d939905d136c Mon Sep 17 00:00:00 2001 From: lxsmnv Date: Sat, 4 Mar 2017 12:07:12 -0500 Subject: [PATCH 5/5] Merged with recent changes --- .../datasources/csv/CSVDataSource.scala | 4 +- .../execution/datasources/csv/CSVSuite.scala | 71 +++++++++++++------ 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 73e6abc6dad37..84a946b7d635f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -155,12 +155,14 @@ object TextInputCSVDataSource extends CSVDataSource { options: CSVOptions): Dataset[String] = { val paths = inputPaths.map(_.getPath.toString) if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { + // Fix for SPARK-19340. resolveRelation replaces with createHadoopRelation + // to avoid pattern resolution for already resolved file path sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, className = classOf[TextFileFormat].getName - ).resolveRelation(checkFilesExist = false)) + ).createHadoopRelation(new TextFileFormat, inputPaths.map(_.getPath).toArray)) .select("value").as[String](Encoders.STRING) } else { val charset = options.charset diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 90a12933e5f80..a25ac0a705e71 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { + import testImplicits._ private val carsFile = "test-data/cars.csv" @@ -64,13 +65,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { /** Verifies data and schema. */ private def verifyCars( - df: DataFrame, - withHeader: Boolean, - numCars: Int = 3, - numFields: Int = 5, - checkHeader: Boolean = true, - checkValues: Boolean = true, - checkTypes: Boolean = false): Unit = { + df: DataFrame, + withHeader: Boolean, + numCars: Int = 3, + numFields: Int = 5, + checkHeader: Boolean = true, + checkValues: Boolean = true, + checkTypes: Boolean = false): Unit = { val numColumns = numFields val numRows = if (withHeader) numCars else numCars + 1 @@ -195,7 +196,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { spark.sql( s""" |CREATE TEMPORARY VIEW carsTable USING csv - |OPTIONS (path "${testFile(carsFile8859)}", header "true", + |OPTIONS (path "${testFile(carsFile8859)} + +", header "true", |charset "iso-8859-1", delimiter "รพ") """.stripMargin.replaceAll("\n", " ")) // scalastyle:on @@ -315,7 +318,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("DDL test with empty file") { - withView("carsTable") { + withView("carsTable" + ) { spark.sql( s""" |CREATE TEMPORARY VIEW carsTable @@ -329,8 +333,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("DDL test with schema") { - withView("carsTable") { - spark.sql( + withView( + "carsTable") { + spark. + sql( s""" |CREATE TEMPORARY VIEW carsTable |(yearMade double, makeName string, modelName string, comments string, blank string) @@ -489,8 +495,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .load(testFile(commentsFile)) .collect() - val expected = - Seq(Seq(1, 2, 3, 4, 5.01D, Timestamp.valueOf("2015-08-20 15:57:00")), + val + expected = + Seq(Seq(1 + , 2, 3, 4, 5.01D, Timestamp.valueOf( + "2015-08-20 15:57:00")), Seq(6, 7, 8, 9, 0, Timestamp.valueOf("2015-08-21 16:58:01")), Seq(1, 2, 3, 4, 5, Timestamp.valueOf("2015-08-23 18:00:42"))) @@ -647,7 +656,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .format("csv") .option("header", "true") .option("inferSchema", "true") - .load(testFile(simpleSparseFile)) + . + load(testFile( + simpleSparseFile)) assert( df.schema.fields.map(field => field.dataType).deep == @@ -961,7 +972,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("load null when the schema is larger than parsed tokens ") { withTempPath { path => - Seq("1").toDF().write.text(path.getAbsolutePath) + Seq("1").toDF(). + write.text(path. + getAbsolutePath) val schema = StructType( StructField("a", IntegerType, true) :: StructField("b", IntegerType, true) :: Nil) @@ -981,9 +994,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val df1 = spark .read .option("mode", "PERMISSIVE") - .option("wholeFile", wholeFile) + .option("wholeFile", + wholeFile) .schema(schema) - .csv(testFile(valueMalformedFile)) + .csv( + testFile(valueMalformedFile)) checkAnswer(df1, Row(null, null) :: Row(1, java.sql.Date.valueOf("1983-08-04")) :: @@ -997,10 +1012,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .option("mode", "PERMISSIVE") .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .option("wholeFile", wholeFile) - .schema(schemaWithCorrField1) - .csv(testFile(valueMalformedFile)) + .schema( + schemaWithCorrField1) + .csv(testFile( + valueMalformedFile)) checkAnswer(df2, - Row(null, null, "0,2013-111-11 12:13:14") :: + Row( + null, null, "0,2013-111-11 12:13:14") :: Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: Nil) @@ -1015,8 +1033,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .option("wholeFile", wholeFile) .schema(schemaWithCorrField2) - .csv(testFile(valueMalformedFile)) - checkAnswer(df3, + .csv( + testFile( + valueMalformedFile)) + checkAnswer( + df3, Row(null, "0,2013-111-11 12:13:14", null) :: Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: Nil) @@ -1092,11 +1113,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(df.schema === spark.emptyDataFrame.schema) checkAnswer(df, spark.emptyDataFrame) } + } test("SPARK-19340 special characters in csv file name") { - val csvDF = spark.read + val csvDF =spark.read .option("header", "false") // testFile doesn't work with filenames that contain special characters .csv(testFile("test-data") + "/" + filenameSpecialChr ).take(1) } } + + +