diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index c9384e44255b8..dc626e2663c9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -356,29 +356,8 @@ case class DataSource( } globPath }.toArray - - val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) - - val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && - catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { - val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes - new CatalogFileIndex( - sparkSession, - catalogTable.get, - catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) - } else { - new InMemoryFileIndex( - sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache) - } - - HadoopFsRelation( - fileCatalog, - partitionSchema = partitionSchema, - dataSchema = dataSchema.asNullable, - bucketSpec = bucketSpec, - format, - caseInsensitiveOptions)(sparkSession) + + createHadoopRelation(format, globbedPaths) case _ => throw new AnalysisException( @@ -387,6 +366,36 @@ case class DataSource( relation } + /** + * Creates Hadoop relation based on format and globbed file paths + * @param format format of the data source file + * @param globPaths Path to the file resolved by Hadoop library + * @return Hadoop relation object + */ + def createHadoopRelation(format: FileFormat, + globPaths: Array[Path]): BaseRelation = { + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) + + val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && + catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { + val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes + new CatalogFileIndex( + sparkSession, + catalogTable.get, + catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) + } else { + new InMemoryFileIndex(sparkSession, globPaths, options, Some(partitionSchema)) + } + + HadoopFsRelation( + fileCatalog, + partitionSchema = partitionSchema, + dataSchema = dataSchema.asNullable, + bucketSpec = bucketSpec, + format, + caseInsensitiveOptions)(sparkSession) + } /** * Writes the given [[DataFrame]] out in this [[FileFormat]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 35ff924f27ce5..5a7b3276a5582 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -168,12 +168,14 @@ object TextInputCSVDataSource extends CSVDataSource { options: CSVOptions): Dataset[String] = { val paths = inputPaths.map(_.getPath.toString) if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { + // Fix for SPARK-19340. resolveRelation replaces with createHadoopRelation + // to avoid pattern resolution for already resolved file path sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, className = classOf[TextFileFormat].getName - ).resolveRelation(checkFilesExist = false)) + ).createHadoopRelation(new TextFileFormat, inputPaths.map(_.getPath).toArray)) .select("value").as[String](Encoders.STRING) } else { val charset = options.charset diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 29c41455279e6..c912b295bdc18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -52,7 +52,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { require(files.nonEmpty, "Cannot infer schema from an empty set of files") - + val parsedOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) @@ -110,7 +110,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { "The field for corrupt records must be string type and nullable") } } - (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value val parser = new UnivocityParser(dataSchema, requiredSchema, parsedOptions) diff --git a/sql/core/src/test/resources/test-data/filename19340{00-1}.csv b/sql/core/src/test/resources/test-data/filename19340{00-1}.csv new file mode 100644 index 0000000000000..b5ccbe180822b --- /dev/null +++ b/sql/core/src/test/resources/test-data/filename19340{00-1}.csv @@ -0,0 +1,3 @@ +1, one +2, two +3, three diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 4435e4df38ef6..c244294666320 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { + import testImplicits._ private val carsFile = "test-data/cars.csv" @@ -55,6 +56,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val datesFile = "test-data/dates.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" private val valueMalformedFile = "test-data/value-malformed.csv" + private val filenameSpecialChr = "filename19340*.csv" + private def testFile(fileName: String): String = { Thread.currentThread().getContextClassLoader.getResource(fileName).toString @@ -62,13 +65,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { /** Verifies data and schema. */ private def verifyCars( - df: DataFrame, - withHeader: Boolean, - numCars: Int = 3, - numFields: Int = 5, - checkHeader: Boolean = true, - checkValues: Boolean = true, - checkTypes: Boolean = false): Unit = { + df: DataFrame, + withHeader: Boolean, + numCars: Int = 3, + numFields: Int = 5, + checkHeader: Boolean = true, + checkValues: Boolean = true, + checkTypes: Boolean = false): Unit = { val numColumns = numFields val numRows = if (withHeader) numCars else numCars + 1 @@ -209,7 +212,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { spark.sql( s""" |CREATE TEMPORARY VIEW carsTable USING csv - |OPTIONS (path "${testFile(carsFile8859)}", header "true", + |OPTIONS (path "${testFile(carsFile8859)} + +", header "true", |charset "iso-8859-1", delimiter "รพ") """.stripMargin.replaceAll("\n", " ")) // scalastyle:on @@ -329,7 +334,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("DDL test with empty file") { - withView("carsTable") { + withView("carsTable" + ) { spark.sql( s""" |CREATE TEMPORARY VIEW carsTable @@ -343,8 +349,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("DDL test with schema") { - withView("carsTable") { - spark.sql( + withView( + "carsTable") { + spark. + sql( s""" |CREATE TEMPORARY VIEW carsTable |(yearMade double, makeName string, modelName string, comments string, blank string) @@ -503,8 +511,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .load(testFile(commentsFile)) .collect() - val expected = - Seq(Seq(1, 2, 3, 4, 5.01D, Timestamp.valueOf("2015-08-20 15:57:00")), + val + expected = + Seq(Seq(1 + , 2, 3, 4, 5.01D, Timestamp.valueOf( + "2015-08-20 15:57:00")), Seq(6, 7, 8, 9, 0, Timestamp.valueOf("2015-08-21 16:58:01")), Seq(1, 2, 3, 4, 5, Timestamp.valueOf("2015-08-23 18:00:42"))) @@ -661,7 +672,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .format("csv") .option("header", "true") .option("inferSchema", "true") - .load(testFile(simpleSparseFile)) + . + load(testFile( + simpleSparseFile)) assert( df.schema.fields.map(field => field.dataType).deep == @@ -975,7 +988,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("load null when the schema is larger than parsed tokens ") { withTempPath { path => - Seq("1").toDF().write.text(path.getAbsolutePath) + Seq("1").toDF(). + write.text(path. + getAbsolutePath) val schema = StructType( StructField("a", IntegerType, true) :: StructField("b", IntegerType, true) :: Nil) @@ -988,15 +1003,18 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") { Seq(false, true).foreach { wholeFile => val schema = new StructType().add("a", IntegerType).add("b", TimestampType) val df1 = spark .read .option("mode", "PERMISSIVE") - .option("wholeFile", wholeFile) + .option("wholeFile", + wholeFile) .schema(schema) - .csv(testFile(valueMalformedFile)) + .csv( + testFile(valueMalformedFile)) checkAnswer(df1, Row(null, null) :: Row(1, java.sql.Date.valueOf("1983-08-04")) :: @@ -1010,10 +1028,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .option("mode", "PERMISSIVE") .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .option("wholeFile", wholeFile) - .schema(schemaWithCorrField1) - .csv(testFile(valueMalformedFile)) + .schema( + schemaWithCorrField1) + .csv(testFile( + valueMalformedFile)) checkAnswer(df2, - Row(null, null, "0,2013-111-11 12:13:14") :: + Row( + null, null, "0,2013-111-11 12:13:14") :: Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: Nil) @@ -1028,8 +1049,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .option("wholeFile", wholeFile) .schema(schemaWithCorrField2) - .csv(testFile(valueMalformedFile)) - checkAnswer(df3, + .csv( + testFile( + valueMalformedFile)) + checkAnswer( + df3, Row(null, "0,2013-111-11 12:13:14", null) :: Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: Nil) @@ -1105,6 +1129,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("Empty string dataset produces empty dataframe and keep user-defined schema") { val df1 = spark.read.csv(spark.emptyDataset[String]) assert(df1.schema === spark.emptyDataFrame.schema) @@ -1115,4 +1140,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(df2.schema === schema) } + + test("SPARK-19340 special characters in csv file name") { + val csvDF =spark.read + .option("header", "false") + // testFile doesn't work with filenames that contain special characters + .csv(testFile("test-data") + "/" + filenameSpecialChr ).take(1) + } } + + +