From df82d510f140651e5b1c6f8101a66f84c2575b56 Mon Sep 17 00:00:00 2001 From: Mikko Kupsu Date: Tue, 11 Apr 2017 22:02:35 +0300 Subject: [PATCH] Add option to specify custom file extension with csv --- .../datasources/csv/CSVFileFormat.scala | 2 +- .../datasources/csv/CSVOptions.scala | 2 ++ .../execution/datasources/csv/CSVSuite.scala | 25 +++++++++++++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index a99bdfee5d6e6..ef3d5eb15aef0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -78,7 +78,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } override def getFileExtension(context: TaskAttemptContext): String = { - ".csv" + CodecStreams.getCompressionExtension(context) + csvOptions.fileExtension + CodecStreams.getCompressionExtension(context) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 62e4c6e4b4ea0..55c205bb6aa00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -140,6 +140,8 @@ class CSVOptions( val inputBufferSize = 128 + val fileExtension = parameters.getOrElse("fileExtension", ".csv") + val isCommentSet = this.comment != '\u0000' def asWriterSettings: CsvWriterSettings = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 352dba79a4c08..028f5a3a1ded5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -622,6 +622,31 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("save tsv with tsv suffix") { + withTempDir { dir => + val csvDir = new File(dir, "csv").getCanonicalPath + val cars = spark.read + .format("csv") + .option("header", "true") + .load(testFile(carsFile)) + + cars.coalesce(1).write + .option("header", "true") + .option("fileExtension", ".tsv") + .option("delimiter", "\t") + .csv(csvDir) + + val tsvFiles = new File(csvDir).listFiles() + assert(tsvFiles.exists(_.getName.endsWith(".tsv"))) + + val carsCopy = spark.read + .option("header", "true") + .option("delimiter", "\t") + .csv(csvDir) + + verifyCars(carsCopy, withHeader = true) + } + } test("SPARK-13543 Write the output as uncompressed via option()") { val extraOptions = Map( "mapreduce.output.fileoutputformat.compress" -> "true",