From 06774ad552f8cc39a437a1caa1ef2aceb174b0de Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 18 Jan 2016 18:53:35 +0900 Subject: [PATCH 01/13] Support for compress option --- .../datasources/csv/CSVParameters.scala | 8 +++++-- .../datasources/csv/CSVRelation.scala | 23 ++++++++++++++++++- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index ec16bdbd8bfb3..2c154f4eea52e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -44,9 +44,11 @@ private[sql] case class CSVParameters(parameters: Map[String, String]) extends L } } - val delimiter = CSVTypeCast.toChar(parameters.getOrElse("delimiter", ",")) + val delimiter = CSVTypeCast.toChar( + parameters.getOrElse("delimiter", parameters.getOrElse("sep", ","))) val parseMode = parameters.getOrElse("mode", "PERMISSIVE") - val charset = parameters.getOrElse("charset", Charset.forName("UTF-8").name()) + val charset = parameters.getOrElse("charset", + parameters.getOrElse("encoding", Charset.forName("UTF-8").name())) val quote = getChar("quote", '\"') val escape = getChar("escape", '\\') @@ -71,6 +73,8 @@ private[sql] case class CSVParameters(parameters: Map[String, String]) extends L val nullValue = parameters.getOrElse("nullValue", "") + val codec = parameters.getOrElse("codec", null) + val maxColumns = 20480 val maxCharsPerColumn = 100000 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 9267479755e82..f9132ecbac9fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -19,6 +19,10 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.Charset +import org.apache.hadoop.io.SequenceFile.CompressionType +import org.apache.hadoop.io.compress.CompressionCodec +import org.apache.spark.util.Utils + import scala.util.control.NonFatal import com.google.common.base.Objects @@ -58,9 +62,10 @@ private[csv] class CSVRelation( if (Charset.forName(params.charset) == Charset.forName("UTF-8")) { sqlContext.sparkContext.textFile(location) } else { + val charset = params.charset sqlContext.sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](location) .mapPartitions { _.map { pair => - new String(pair._2.getBytes, 0, pair._2.getLength, params.charset) + new String(pair._2.getBytes, 0, pair._2.getLength, charset) } } } @@ -98,6 +103,22 @@ private[csv] class CSVRelation( } override def prepareJobForWrite(job: Job): OutputWriterFactory = { + val conf = job.getConfiguration + Option(params.codec).foreach { codec => + // Hadoop 1.x + conf.set("mapred.output.compress", "true") + conf.set("mapred.output.compression.codec", codec) + conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + conf.set("mapred.compress.map.output", "true") + conf.set("mapred.map.output.compression.codec", codec) + // Hadoop 2.x + conf.set("mapreduce.output.fileoutputformat.compress", "true") + conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) + conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) + conf.set("mapreduce.map.output.compress", "true") + conf.set("mapreduce.map.output.compress.codec", codec) + } + new CSVOutputWriterFactory(params) } From 5e1611d74f3ccc33705c3aa134f41382e2150508 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 18 Jan 2016 19:26:01 +0900 Subject: [PATCH 02/13] Correct Scala style and add an alias for codec as compression. --- .../datasources/csv/CSVParameters.scala | 2 +- .../datasources/csv/CSVRelation.scala | 5 +--- .../execution/datasources/csv/CSVSuite.scala | 27 +++++++++++++++++++ 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index 2c154f4eea52e..3eabacdce9e7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -73,7 +73,7 @@ private[sql] case class CSVParameters(parameters: Map[String, String]) extends L val nullValue = parameters.getOrElse("nullValue", "") - val codec = parameters.getOrElse("codec", null) + val codec = parameters.getOrElse("codec", parameters.getOrElse("compression", null)) val maxColumns = 20480 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index f9132ecbac9fa..060ab16f81217 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -19,15 +19,12 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.Charset -import org.apache.hadoop.io.SequenceFile.CompressionType -import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.spark.util.Utils - import scala.util.control.NonFatal import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} +import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.hadoop.mapreduce.RecordWriter diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 8fdd31aa4334f..39e924f0ff932 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -21,6 +21,8 @@ import java.io.File import java.nio.charset.UnsupportedCharsetException import java.sql.Timestamp +import org.apache.hadoop.io.compress.GzipCodec + import org.apache.spark.SparkException import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} @@ -338,4 +340,29 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null)) } + test("save csv with compression codec option") { + withTempDir { dir => + val csvDir = new File(dir, "csv").getCanonicalPath + val cars = sqlContext.read + .format("csv") + .option("header", "true") + .load(testFile(carsFile)) + + cars.coalesce(1).write + .format("csv") + .option("header", "true") + .option("compression", classOf[GzipCodec].getCanonicalName) + .save(csvDir) + + val compressedFiles = new File(csvDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + + val carsCopy = sqlContext.read + .format("csv") + .option("header", "true") + .load(csvDir) + + verifyCars(carsCopy, withHeader = true) + } + } } From d154f025ccfb46c809598e1d688754e1e9603f0e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 18 Jan 2016 19:29:31 +0900 Subject: [PATCH 03/13] Move back some codes changed unintentionally. --- .../spark/sql/execution/datasources/csv/CSVParameters.scala | 6 ++---- .../spark/sql/execution/datasources/csv/CSVRelation.scala | 3 +-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index 3eabacdce9e7c..4654a912b24bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -44,11 +44,9 @@ private[sql] case class CSVParameters(parameters: Map[String, String]) extends L } } - val delimiter = CSVTypeCast.toChar( - parameters.getOrElse("delimiter", parameters.getOrElse("sep", ","))) + val delimiter = CSVTypeCast.toChar(parameters.getOrElse("delimiter", ",")) val parseMode = parameters.getOrElse("mode", "PERMISSIVE") - val charset = parameters.getOrElse("charset", - parameters.getOrElse("encoding", Charset.forName("UTF-8").name())) + val charset = parameters.getOrElse("charset", Charset.forName("UTF-8").name()) val quote = getChar("quote", '\"') val escape = getChar("escape", '\\') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 060ab16f81217..23171c0a0997b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -59,10 +59,9 @@ private[csv] class CSVRelation( if (Charset.forName(params.charset) == Charset.forName("UTF-8")) { sqlContext.sparkContext.textFile(location) } else { - val charset = params.charset sqlContext.sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](location) .mapPartitions { _.map { pair => - new String(pair._2.getBytes, 0, pair._2.getLength, charset) + new String(pair._2.getBytes, 0, pair._2.getLength, params.charset) } } } From 5b57fc246b11f90f082104dc04e53f194123ab35 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 18 Jan 2016 19:33:46 +0900 Subject: [PATCH 04/13] Remove Hadoop 1.x configurations --- .../spark/sql/execution/datasources/csv/CSVRelation.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 23171c0a0997b..42cb9214a2a53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -101,13 +101,6 @@ private[csv] class CSVRelation( override def prepareJobForWrite(job: Job): OutputWriterFactory = { val conf = job.getConfiguration Option(params.codec).foreach { codec => - // Hadoop 1.x - conf.set("mapred.output.compress", "true") - conf.set("mapred.output.compression.codec", codec) - conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) - conf.set("mapred.compress.map.output", "true") - conf.set("mapred.map.output.compression.codec", codec) - // Hadoop 2.x conf.set("mapreduce.output.fileoutputformat.compress", "true") conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) From e7ebddd68c3b772b7476432b4e1ba30d6ba2eb22 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 19 Jan 2016 09:21:39 +0900 Subject: [PATCH 05/13] Look up compression first. --- .../spark/sql/execution/datasources/csv/CSVParameters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index 4654a912b24bc..edfe8dd402a32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -71,7 +71,7 @@ private[sql] case class CSVParameters(parameters: Map[String, String]) extends L val nullValue = parameters.getOrElse("nullValue", "") - val codec = parameters.getOrElse("codec", parameters.getOrElse("compression", null)) + val codec = parameters.getOrElse("compression", parameters.getOrElse("codec", null)) val maxColumns = 20480 From adb9eb22a256895ad4bad11893222c485c7afa37 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 19 Jan 2016 16:42:00 +0900 Subject: [PATCH 06/13] Support for shorten names and change the variable from codec to compressionCodec --- .../datasources/csv/CSVParameters.scala | 17 +++++++++++++++-- .../execution/datasources/csv/CSVRelation.scala | 2 +- .../execution/datasources/csv/CSVSuite.scala | 4 +--- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index 47e98e15bcf29..ef354c743e7f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.Charset +import org.apache.hadoop.io.compress._ + import org.apache.spark.Logging private[sql] case class CSVParameters(@transient parameters: Map[String, String]) extends Logging { @@ -35,7 +37,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] private def getBool(paramName: String, default: Boolean = false): Boolean = { val param = parameters.getOrElse(paramName, default.toString) - if (param.toLowerCase() == "true") { + if (param.toLowerCase == "true") { true } else if (param.toLowerCase == "false") { false @@ -44,6 +46,13 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] } } + // Available compression codec list + val shortCompressionCodecNames = Map( + "bzip2" -> classOf[BZip2Codec].getName, + "gzip" -> classOf[GzipCodec].getName, + "lz4" -> classOf[Lz4Codec].getName, + "snappy" -> classOf[SnappyCodec].getName) + val delimiter = CSVTypeCast.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) val parseMode = parameters.getOrElse("mode", "PERMISSIVE") @@ -73,7 +82,11 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] val nullValue = parameters.getOrElse("nullValue", "") - val codec = parameters.getOrElse("compression", parameters.getOrElse("codec", null)) + val compressionCodec: String = { + val maybeCodecName = + Option(parameters.getOrElse("compression", parameters.getOrElse("codec", null))) + maybeCodecName.map(_.toLowerCase).map(shortCompressionCodecNames).orNull + } val maxColumns = 20480 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index b5d4993b0ee89..2d2c842882532 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -101,7 +101,7 @@ private[csv] class CSVRelation( override def prepareJobForWrite(job: Job): OutputWriterFactory = { val conf = job.getConfiguration - Option(params.codec).foreach { codec => + Option(params.compressionCodec).foreach { codec => conf.set("mapreduce.output.fileoutputformat.compress", "true") conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 332da61f40c25..a79566b1f3658 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -21,8 +21,6 @@ import java.io.File import java.nio.charset.UnsupportedCharsetException import java.sql.Timestamp -import org.apache.hadoop.io.compress.GzipCodec - import org.apache.spark.SparkException import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} @@ -363,7 +361,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { cars.coalesce(1).write .format("csv") .option("header", "true") - .option("compression", classOf[GzipCodec].getCanonicalName) + .option("compression", "gZiP") .save(csvDir) val compressedFiles = new File(csvDir).listFiles() From 432da5d0cf37b1d2a686e8d57da0e3e14cf4903a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 19 Jan 2016 17:39:08 +0900 Subject: [PATCH 07/13] Correct several nits and validate class names. --- .../datasources/csv/CSVParameters.scala | 45 +++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index ef354c743e7f4..ef761f8dbb6af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -22,6 +22,7 @@ import java.nio.charset.Charset import org.apache.hadoop.io.compress._ import org.apache.spark.Logging +import org.apache.spark.util.Utils private[sql] case class CSVParameters(@transient parameters: Map[String, String]) extends Logging { @@ -45,14 +46,6 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] throw new Exception(s"$paramName flag can be true or false") } } - - // Available compression codec list - val shortCompressionCodecNames = Map( - "bzip2" -> classOf[BZip2Codec].getName, - "gzip" -> classOf[GzipCodec].getName, - "lz4" -> classOf[Lz4Codec].getName, - "snappy" -> classOf[SnappyCodec].getName) - val delimiter = CSVTypeCast.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) val parseMode = parameters.getOrElse("mode", "PERMISSIVE") @@ -82,10 +75,12 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] val nullValue = parameters.getOrElse("nullValue", "") - val compressionCodec: String = { - val maybeCodecName = - Option(parameters.getOrElse("compression", parameters.getOrElse("codec", null))) - maybeCodecName.map(_.toLowerCase).map(shortCompressionCodecNames).orNull + val compressionCodecName = + parameters.getOrElse("compression", parameters.getOrElse("codec", null)) + val compressionCodec = if (compressionCodecName != null) { + CSVCompressionCodecs.getCodecClassName(compressionCodecName) + } else { + null } val maxColumns = 20480 @@ -100,7 +95,6 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] } private[csv] object ParseModes { - val PERMISSIVE_MODE = "PERMISSIVE" val DROP_MALFORMED_MODE = "DROPMALFORMED" val FAIL_FAST_MODE = "FAILFAST" @@ -122,3 +116,28 @@ private[csv] object ParseModes { true // We default to permissive is the mode string is not valid } } + +private[csv] object CSVCompressionCodecs { + val shortCompressionCodecNames = Map( + "bzip2" -> classOf[BZip2Codec].getName, + "gzip" -> classOf[GzipCodec].getName, + "lz4" -> classOf[Lz4Codec].getName, + "snappy" -> classOf[SnappyCodec].getName) + + /** + * Return the full version of the given codec class. + * If it is already a class name, just return it. + */ + def getCodecClassName(name: String): String = { + val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name) + val codecClassName = try { + // Validate the codec name + Utils.classForName(codecName) + Some(codecName) + } catch { + case e: ClassNotFoundException => None + } + codecClassName.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] " + + s"is not available. Available codecs are ${shortCompressionCodecNames.keys.mkString(",")}.")) + } +} From 4388fe59dd847009a11aa632c954b861bb992a42 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 19 Jan 2016 17:42:33 +0900 Subject: [PATCH 08/13] Add a newline between function and variables --- .../spark/sql/execution/datasources/csv/CSVParameters.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index ef761f8dbb6af..dc49b08ea7c7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -46,6 +46,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] throw new Exception(s"$paramName flag can be true or false") } } + val delimiter = CSVTypeCast.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) val parseMode = parameters.getOrElse("mode", "PERMISSIVE") From 6400b767bd3ad6235b3b9f2291e4135a09f5d7ae Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 19 Jan 2016 17:43:14 +0900 Subject: [PATCH 09/13] Remove whitespace --- .../spark/sql/execution/datasources/csv/CSVParameters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index dc49b08ea7c7d..f080fead302f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -46,7 +46,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] throw new Exception(s"$paramName flag can be true or false") } } - + val delimiter = CSVTypeCast.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) val parseMode = parameters.getOrElse("mode", "PERMISSIVE") From 56316a8a8dd3623fee0dcfd81cdbab7e7d644bb7 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 19 Jan 2016 18:04:57 +0900 Subject: [PATCH 10/13] Several nits and reduce complexity --- .../execution/datasources/csv/CSVParameters.scala | 15 ++++++--------- .../execution/datasources/csv/CSVRelation.scala | 2 +- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index f080fead302f8..575d8c89dd629 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -76,12 +76,9 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] val nullValue = parameters.getOrElse("nullValue", "") - val compressionCodecName = - parameters.getOrElse("compression", parameters.getOrElse("codec", null)) - val compressionCodec = if (compressionCodecName != null) { - CSVCompressionCodecs.getCodecClassName(compressionCodecName) - } else { - null + val compressionCodec: Option[String] = { + val name = parameters.get("compression").orElse(parameters.get("codec")) + name.map(CSVCompressionCodecs.getCodecClassName) } val maxColumns = 20480 @@ -119,7 +116,7 @@ private[csv] object ParseModes { } private[csv] object CSVCompressionCodecs { - val shortCompressionCodecNames = Map( + private val shortCompressionCodecNames = Map( "bzip2" -> classOf[BZip2Codec].getName, "gzip" -> classOf[GzipCodec].getName, "lz4" -> classOf[Lz4Codec].getName, @@ -138,7 +135,7 @@ private[csv] object CSVCompressionCodecs { } catch { case e: ClassNotFoundException => None } - codecClassName.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs are ${shortCompressionCodecNames.keys.mkString(",")}.")) + codecClassName.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName]" + + s" is not available. Available codecs are ${shortCompressionCodecNames.keys.mkString(",")}.")) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 2d2c842882532..1502501c3b89e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -101,7 +101,7 @@ private[csv] class CSVRelation( override def prepareJobForWrite(job: Job): OutputWriterFactory = { val conf = job.getConfiguration - Option(params.compressionCodec).foreach { codec => + params.compressionCodec.foreach { codec => conf.set("mapreduce.output.fileoutputformat.compress", "true") conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) From c9217beedaaf83b5390365b65ac61b4bcec78c19 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 19 Jan 2016 18:07:07 +0900 Subject: [PATCH 11/13] Update exception message --- .../spark/sql/execution/datasources/csv/CSVParameters.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index 575d8c89dd629..664eb4516ea8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -135,7 +135,7 @@ private[csv] object CSVCompressionCodecs { } catch { case e: ClassNotFoundException => None } - codecClassName.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName]" + - s" is not available. Available codecs are ${shortCompressionCodecNames.keys.mkString(",")}.")) + codecClassName.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] " + + s"is not available. Available codecs: ${shortCompressionCodecNames.keys.mkString(", ")}")) } } From 0245eea2508f9cde30b91e2619d79dbafa18f845 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 19 Jan 2016 18:08:14 +0900 Subject: [PATCH 12/13] Simply add a space after comma. --- .../spark/sql/execution/datasources/csv/CSVParameters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index 664eb4516ea8a..3c283cee9397f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -136,6 +136,6 @@ private[csv] object CSVCompressionCodecs { case e: ClassNotFoundException => None } codecClassName.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs: ${shortCompressionCodecNames.keys.mkString(", ")}")) + s"is not available. Available codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")) } } From cd9f7429cfb2de4f0e82cc5134fe221fb135c01f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 20 Jan 2016 11:35:24 +0900 Subject: [PATCH 13/13] Update exception message and get rid of Option. --- .../sql/execution/datasources/csv/CSVParameters.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index 3c283cee9397f..676a3d3bca9f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -128,14 +128,14 @@ private[csv] object CSVCompressionCodecs { */ def getCodecClassName(name: String): String = { val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name) - val codecClassName = try { + try { // Validate the codec name Utils.classForName(codecName) - Some(codecName) + codecName } catch { - case e: ClassNotFoundException => None + case e: ClassNotFoundException => + throw new IllegalArgumentException(s"Codec [$codecName] " + + s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.") } - codecClassName.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")) } }