diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index cfd66af18892..aa6c9dca7a04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.math.BigDecimal -import java.text.{NumberFormat, SimpleDateFormat} +import java.text.NumberFormat import java.util.Locale import scala.util.control.Exception._ @@ -192,59 +192,59 @@ private[csv] object CSVTypeCast { nullable: Boolean = true, options: CSVOptions = CSVOptions()): Any = { - castType match { - case _: ByteType => if (datum == options.nullValue && nullable) null else datum.toByte - case _: ShortType => if (datum == options.nullValue && nullable) null else datum.toShort - case _: IntegerType => if (datum == options.nullValue && nullable) null else datum.toInt - case _: LongType => if (datum == options.nullValue && nullable) null else datum.toLong - case _: FloatType => - if (datum == options.nullValue && nullable) { - null - } else if (datum == options.nanValue) { - Float.NaN - } else if (datum == options.negativeInf) { - Float.NegativeInfinity - } else if (datum == options.positiveInf) { - Float.PositiveInfinity - } else { - Try(datum.toFloat) - .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) - } - case _: DoubleType => - if (datum == options.nullValue && nullable) { - null - } else if (datum == options.nanValue) { - Double.NaN - } else if (datum == options.negativeInf) { - Double.NegativeInfinity - } else if (datum == options.positiveInf) { - Double.PositiveInfinity - } else { - Try(datum.toDouble) - .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) - } - case _: BooleanType => datum.toBoolean - case dt: DecimalType => - if (datum == options.nullValue && nullable) { - null - } else { - val value = new BigDecimal(datum.replaceAll(",", "")) - Decimal(value, dt.precision, dt.scale) - } - case _: TimestampType if options.dateFormat != null => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - options.dateFormat.parse(datum).getTime * 1000L - case _: TimestampType => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - DateTimeUtils.stringToTime(datum).getTime * 1000L - case _: DateType if options.dateFormat != null => - DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime) - case _: DateType => - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) - case _: StringType => UTF8String.fromString(datum) - case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") + if (datum == null || (datum == options.nullValue && nullable)) { + null + } else { + castType match { + case _: ByteType => datum.toByte + case _: ShortType => datum.toShort + case _: IntegerType => datum.toInt + case _: LongType => datum.toLong + case _: FloatType => + if (datum == options.nanValue) { + Float.NaN + } else if (datum == options.negativeInf) { + Float.NegativeInfinity + } else if (datum == options.positiveInf) { + Float.PositiveInfinity + } else { + Try(datum.toFloat) + .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) + } + case _: DoubleType => + if (datum == options.nanValue) { + Double.NaN + } else if (datum == options.negativeInf) { + Double.NegativeInfinity + } else if (datum == options.positiveInf) { + Double.PositiveInfinity + } else { + Try(datum.toDouble) + .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) + } + case _: BooleanType => datum.toBoolean + case dt: DecimalType => + if (datum == options.nullValue && nullable) { + null + } else { + val value = new BigDecimal(datum.replaceAll(",", "")) + Decimal(value, dt.precision, dt.scale) + } + case _: TimestampType if options.dateFormat != null => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + options.dateFormat.parse(datum).getTime * 1000L + case _: TimestampType => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + DateTimeUtils.stringToTime(datum).getTime * 1000L + case _: DateType if options.dateFormat != null => + DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime) + case _: DateType => + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) + case _: StringType => UTF8String.fromString(datum) + case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b6cdc8cfabe3..8b3026121998 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -431,7 +431,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("nullable fields with user defined null value of \"null\"") { - // year,make,model,comment,blank val dataSchema = StructType(List( StructField("year", IntegerType, nullable = true), @@ -447,7 +446,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { verifyCars(cars, withHeader = true, checkValues = false) val results = cars.collect() - assert(results(0).toSeq === Array(2012, "Tesla", "S", "null", "null")) + assert(results(0).toSeq === Array(2012, "Tesla", "S", null, null)) assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala index 26b33b24efc3..af95411817a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala @@ -73,10 +73,10 @@ class CSVTypeCastSuite extends SparkFunSuite { test("String type should always return the same as the input") { assert( - CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()) == + CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions("nullValue", null)) == UTF8String.fromString("")) assert( - CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions()) == + CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions("nullValue", null)) == UTF8String.fromString("")) } @@ -180,5 +180,13 @@ class CSVTypeCastSuite extends SparkFunSuite { CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-"))) assertNull( CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", StringType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", TimestampType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", DateType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", BooleanType, nullable = true, CSVOptions("nullValue", "-"))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CSVHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CSVHadoopFsRelationSuite.scala new file mode 100644 index 000000000000..f82f9671ffe0 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CSVHadoopFsRelationSuite.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.types._ + +class CSVHadoopFsRelationSuite extends HadoopFsRelationTest { + override val dataSourceName: String = "csv" + + override val extraReadOptions: Map[String, String] = + Map("header" -> "true", "inferSchema" -> "true") + + override val extraWriteOptions: Map[String, String] = Map("header" -> "true") + + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: NullType => false + // `StringType` test is too flaky. Seems random generate data affects delimiter + // for writing and CSV parse does not recognize this. + case _: StringType => false + case _: BinaryType => false + case _: CalendarIntervalType => false + case _: ArrayType => false + case _: MapType => false + case _: StructType => false + // Currently, this writes `DateType` and `TimestampType` as a long value. + // Since `dateFormat` is not yet supported for writing, this is disabled for now. + case _: DateType => false + case _: TimestampType => false + case _: UserDefinedType[_] => false + case _ => true + } + + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + val header = Seq("a,b") + val data = (1 to 3).map(i => s"""$i,val_$i""") + sparkContext + .parallelize(header ++ data) + .saveAsTextFile(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + hiveContext.read.format(dataSourceName) + .option("dataSchema", dataSchemaWithPartition.json) + .option("inferSchema", "true") + .option("header", "true") + .load(file.getCanonicalPath)) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index f4d63334b657..485166fe9402 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -41,6 +41,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val dataSourceName: String + // This options below will be applied for the tests for reading in `HadoopFsRelationTest`. + val extraReadOptions = Map.empty[String, String] + + // This options below will be applied for the tests for writing in `HadoopFsRelationTest`. + val extraWriteOptions = Map.empty[String, String] + protected def supportsDataType(dataType: DataType): Boolean = true val dataSchema = @@ -170,13 +176,22 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes test("save()/load() - non-partitioned table - Overwrite") { withTempPath { file => - testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) - testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) + testDF.write + .mode(SaveMode.Overwrite) + .format(dataSourceName) + .options(extraWriteOptions) + .save(file.getCanonicalPath) + testDF.write + .mode(SaveMode.Overwrite) + .format(dataSourceName) + .options(extraWriteOptions) + .save(file.getCanonicalPath) checkAnswer( spark.read.format(dataSourceName) .option("path", file.getCanonicalPath) .option("dataSchema", dataSchema.json) + .options(extraReadOptions) .load(), testDF.collect()) } @@ -184,12 +199,21 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes test("save()/load() - non-partitioned table - Append") { withTempPath { file => - testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) - testDF.write.mode(SaveMode.Append).format(dataSourceName).save(file.getCanonicalPath) + testDF.write + .mode(SaveMode.Overwrite) + .format(dataSourceName) + .options(extraWriteOptions) + .save(file.getCanonicalPath) + testDF.write + .mode(SaveMode.Append) + .format(dataSourceName) + .options(extraWriteOptions) + .save(file.getCanonicalPath) checkAnswer( spark.read.format(dataSourceName) .option("dataSchema", dataSchema.json) + .options(extraReadOptions) .load(file.getCanonicalPath).orderBy("a"), testDF.union(testDF).orderBy("a").collect()) } @@ -217,6 +241,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes withTempPath { file => partitionedTestDF.write .format(dataSourceName) + .options(extraWriteOptions) .mode(SaveMode.ErrorIfExists) .partitionBy("p1", "p2") .save(file.getCanonicalPath) @@ -224,6 +249,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes checkQueries( spark.read.format(dataSourceName) .option("dataSchema", dataSchema.json) + .options(extraReadOptions) .load(file.getCanonicalPath)) } } @@ -232,12 +258,14 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes withTempPath { file => partitionedTestDF.write .format(dataSourceName) + .options(extraWriteOptions) .mode(SaveMode.Overwrite) .partitionBy("p1", "p2") .save(file.getCanonicalPath) partitionedTestDF.write .format(dataSourceName) + .options(extraWriteOptions) .mode(SaveMode.Overwrite) .partitionBy("p1", "p2") .save(file.getCanonicalPath) @@ -245,6 +273,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes checkAnswer( spark.read.format(dataSourceName) .option("dataSchema", dataSchema.json) + .options(extraReadOptions) .load(file.getCanonicalPath), partitionedTestDF.collect()) } @@ -254,12 +283,14 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes withTempPath { file => partitionedTestDF.write .format(dataSourceName) + .options(extraWriteOptions) .mode(SaveMode.Overwrite) .partitionBy("p1", "p2") .save(file.getCanonicalPath) partitionedTestDF.write .format(dataSourceName) + .options(extraWriteOptions) .mode(SaveMode.Append) .partitionBy("p1", "p2") .save(file.getCanonicalPath) @@ -267,6 +298,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes checkAnswer( spark.read.format(dataSourceName) .option("dataSchema", dataSchema.json) + .options(extraReadOptions) .load(file.getCanonicalPath), partitionedTestDF.union(partitionedTestDF).collect()) } @@ -276,12 +308,14 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes withTempPath { file => partitionedTestDF1.write .format(dataSourceName) + .options(extraWriteOptions) .mode(SaveMode.Overwrite) .partitionBy("p1", "p2") .save(file.getCanonicalPath) partitionedTestDF2.write .format(dataSourceName) + .options(extraWriteOptions) .mode(SaveMode.Append) .partitionBy("p1", "p2") .save(file.getCanonicalPath) @@ -289,6 +323,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes checkAnswer( spark.read.format(dataSourceName) .option("dataSchema", dataSchema.json) + .options(extraReadOptions) .load(file.getCanonicalPath), partitionedTestDF.collect()) } @@ -512,17 +547,25 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes dataInSubdir.write .format(dataSourceName) .mode(SaveMode.Overwrite) + .options(extraWriteOptions) .save(subdir.getCanonicalPath) // Inferring schema should throw error as it should not find any file to infer val e = intercept[Exception] { - spark.read.format(dataSourceName).load(dir.getCanonicalPath) + spark.read + .format(dataSourceName) + .options(extraReadOptions) + .load(dir.getCanonicalPath) } e match { case _: AnalysisException => assert(e.getMessage.contains("infer")) + case _: IllegalArgumentException => + // CSV data source throws this exception when it cannot find any file to infer. + assert(e.getMessage.contains("Can not create a Path from an empty string")) + case _: java.util.NoSuchElementException if e.getMessage.contains("dataSchema") => // Ignore error, the source format requires schema to be provided by user // This is needed for SimpleTextHadoopFsRelationSuite as SimpleTextSource needs schema @@ -536,6 +579,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val df = spark.read .format(dataSourceName) .schema(dataInDir.schema) // avoid schema inference for any format + .options(extraReadOptions) .load(path.getCanonicalPath) checkAnswer(df, expectedAnswer) } @@ -549,6 +593,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes // Verify that if there is data in dir, then reading by path 'dir/' reads only dataInDir dataInDir.write .format(dataSourceName) + .options(extraWriteOptions) .mode(SaveMode.Append) // append to prevent subdir from being deleted .save(dir.getCanonicalPath) require(dir.listFiles().exists(!_.isDirectory)) @@ -573,16 +618,19 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes dataInSubdir.write .format (dataSourceName) + .options(extraWriteOptions) .mode (SaveMode.Overwrite) .save (subdir.getCanonicalPath) dataInSubsubdir.write .format (dataSourceName) + .options(extraWriteOptions) .mode (SaveMode.Overwrite) .save (subsubdir.getCanonicalPath) dataInAnotherSubsubdir.write .format (dataSourceName) + .options(extraWriteOptions) .mode (SaveMode.Overwrite) .save (anotherSubsubdir.getCanonicalPath) @@ -620,6 +668,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes def check(path: String, expectedDf: DataFrame): Unit = { val df = spark.read .format(dataSourceName) + .options(extraReadOptions) .schema(schema) // avoid schema inference for any format, expected to be same format .load(path) checkAnswer(df, expectedDf) @@ -644,6 +693,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val dir = path.getCanonicalPath partitionedTestDF.write .format(dataSourceName) + .options(extraWriteOptions) .mode(SaveMode.Overwrite) .partitionBy("p1", "p2") .save(dir) @@ -658,6 +708,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes basePath.foreach(reader.option("basePath", _)) val testDf = reader .format(dataSourceName) + .options(extraReadOptions) .load(path) assert(expectedResult.isLeft, s"Error was expected with $path but result found") checkAnswer(testDf, expectedResult.left.get) @@ -796,23 +847,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val df = spark.range(1, 10).toDF("i") withTempPath { dir => - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + df.write + .mode("append") + .format(dataSourceName) + .options(extraWriteOptions) + .save(dir.getCanonicalPath) // Because there data already exists, // this append should succeed because we will use the output committer associated // with file format and AlwaysFailOutputCommitter will not be used. - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + df.write + .mode("append") + .format(dataSourceName) + .options(extraWriteOptions) + .save(dir.getCanonicalPath) checkAnswer( spark.read .format(dataSourceName) .option("dataSchema", df.schema.json) - .options(extraOptions) + .options(extraOptions ++ extraReadOptions) .load(dir.getCanonicalPath), df.union(df)) // This will fail because AlwaysFailOutputCommitter is used when we do append. intercept[Exception] { df.write.mode("overwrite") - .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath) + .options(extraOptions ++ extraWriteOptions) + .format(dataSourceName) + .save(dir.getCanonicalPath) } } withTempPath { dir => @@ -821,7 +882,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes // and there is no existing data. intercept[Exception] { df.write.mode("append") - .options(extraOptions) + .options(extraOptions ++ extraWriteOptions) .format(dataSourceName) .save(dir.getCanonicalPath) }