Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -356,29 +356,8 @@ case class DataSource(
}
globPath
}.toArray

val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)

val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
} else {
new InMemoryFileIndex(
sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)
}

HadoopFsRelation(
fileCatalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)(sparkSession)

createHadoopRelation(format, globbedPaths)

case _ =>
throw new AnalysisException(
Expand All @@ -387,6 +366,36 @@ case class DataSource(

relation
}
/**
* Creates Hadoop relation based on format and globbed file paths
* @param format format of the data source file
* @param globPaths Path to the file resolved by Hadoop library
* @return Hadoop relation object
*/
def createHadoopRelation(format: FileFormat,
globPaths: Array[Path]): BaseRelation = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this inlined.

val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)

val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
} else {
new InMemoryFileIndex(sparkSession, globPaths, options, Some(partitionSchema))
}

HadoopFsRelation(
fileCatalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)(sparkSession)
}

/**
* Writes the given [[DataFrame]] out in this [[FileFormat]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,14 @@ object TextInputCSVDataSource extends CSVDataSource {
options: CSVOptions): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
// Fix for SPARK-19340. resolveRelation replaces with createHadoopRelation
// to avoid pattern resolution for already resolved file path
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName
).resolveRelation(checkFilesExist = false))
).createHadoopRelation(new TextFileFormat, inputPaths.map(_.getPath).toArray))
.select("value").as[String](Encoders.STRING)
} else {
val charset = options.charset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
require(files.nonEmpty, "Cannot infer schema from an empty set of files")

val parsedOptions =
new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)

Expand Down Expand Up @@ -110,7 +110,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
"The field for corrupt records must be string type and nullable")
}
}

(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value
val parser = new UnivocityParser(dataSchema, requiredSchema, parsedOptions)
Expand Down
3 changes: 3 additions & 0 deletions sql/core/src/test/resources/test-data/filename19340{00-1}.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1, one
2, two
3, three
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._

class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

import testImplicits._

private val carsFile = "test-data/cars.csv"
Expand All @@ -55,20 +56,22 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
private val datesFile = "test-data/dates.csv"
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
private val valueMalformedFile = "test-data/value-malformed.csv"
private val filenameSpecialChr = "filename19340*.csv"


private def testFile(fileName: String): String = {
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
}

/** Verifies data and schema. */
private def verifyCars(
df: DataFrame,
withHeader: Boolean,
numCars: Int = 3,
numFields: Int = 5,
checkHeader: Boolean = true,
checkValues: Boolean = true,
checkTypes: Boolean = false): Unit = {
df: DataFrame,
withHeader: Boolean,
numCars: Int = 3,
numFields: Int = 5,
checkHeader: Boolean = true,
checkValues: Boolean = true,
checkTypes: Boolean = false): Unit = {

val numColumns = numFields
val numRows = if (withHeader) numCars else numCars + 1
Expand Down Expand Up @@ -209,7 +212,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
spark.sql(
s"""
|CREATE TEMPORARY VIEW carsTable USING csv
|OPTIONS (path "${testFile(carsFile8859)}", header "true",
|OPTIONS (path "${testFile(carsFile8859)}

", header "true",
|charset "iso-8859-1", delimiter "þ")
""".stripMargin.replaceAll("\n", " "))
// scalastyle:on
Expand Down Expand Up @@ -329,7 +334,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}

test("DDL test with empty file") {
withView("carsTable") {
withView("carsTable"
) {
spark.sql(
s"""
|CREATE TEMPORARY VIEW carsTable
Expand All @@ -343,8 +349,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}

test("DDL test with schema") {
withView("carsTable") {
spark.sql(
withView(
"carsTable") {
spark.
sql(
s"""
|CREATE TEMPORARY VIEW carsTable
|(yearMade double, makeName string, modelName string, comments string, blank string)
Expand Down Expand Up @@ -503,8 +511,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.load(testFile(commentsFile))
.collect()

val expected =
Seq(Seq(1, 2, 3, 4, 5.01D, Timestamp.valueOf("2015-08-20 15:57:00")),
val
expected =
Seq(Seq(1
, 2, 3, 4, 5.01D, Timestamp.valueOf(
"2015-08-20 15:57:00")),
Seq(6, 7, 8, 9, 0, Timestamp.valueOf("2015-08-21 16:58:01")),
Seq(1, 2, 3, 4, 5, Timestamp.valueOf("2015-08-23 18:00:42")))

Expand Down Expand Up @@ -661,7 +672,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(testFile(simpleSparseFile))
.
load(testFile(
simpleSparseFile))

assert(
df.schema.fields.map(field => field.dataType).deep ==
Expand Down Expand Up @@ -975,7 +988,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

test("load null when the schema is larger than parsed tokens ") {
withTempPath { path =>
Seq("1").toDF().write.text(path.getAbsolutePath)
Seq("1").toDF().
write.text(path.
getAbsolutePath)
val schema = StructType(
StructField("a", IntegerType, true) ::
StructField("b", IntegerType, true) :: Nil)
Expand All @@ -988,15 +1003,18 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}


test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") {
Seq(false, true).foreach { wholeFile =>
val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
val df1 = spark
.read
.option("mode", "PERMISSIVE")
.option("wholeFile", wholeFile)
.option("wholeFile",
wholeFile)
.schema(schema)
.csv(testFile(valueMalformedFile))
.csv(
testFile(valueMalformedFile))
checkAnswer(df1,
Row(null, null) ::
Row(1, java.sql.Date.valueOf("1983-08-04")) ::
Expand All @@ -1010,10 +1028,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.schema(schemaWithCorrField1)
.csv(testFile(valueMalformedFile))
.schema(
schemaWithCorrField1)
.csv(testFile(
valueMalformedFile))
checkAnswer(df2,
Row(null, null, "0,2013-111-11 12:13:14") ::
Row(
null, null, "0,2013-111-11 12:13:14") ::
Row(1, java.sql.Date.valueOf("1983-08-04"), null) ::
Nil)

Expand All @@ -1028,8 +1049,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.schema(schemaWithCorrField2)
.csv(testFile(valueMalformedFile))
checkAnswer(df3,
.csv(
testFile(
valueMalformedFile))
checkAnswer(
df3,
Row(null, "0,2013-111-11 12:13:14", null) ::
Row(1, null, java.sql.Date.valueOf("1983-08-04")) ::
Nil)
Expand Down Expand Up @@ -1105,6 +1129,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}


test("Empty string dataset produces empty dataframe and keep user-defined schema") {
val df1 = spark.read.csv(spark.emptyDataset[String])
assert(df1.schema === spark.emptyDataFrame.schema)
Expand All @@ -1115,4 +1140,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(df2.schema === schema)
}


test("SPARK-19340 special characters in csv file name") {
val csvDF =spark.read
.option("header", "false")
// testFile doesn't work with filenames that contain special characters
.csv(testFile("test-data") + "/" + filenameSpecialChr ).take(1)
}
}