From 9a43e721b0dd57c3b2919aac6d6eead78c38f769 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Mar 2016 09:10:51 +0900 Subject: [PATCH 01/15] Add the support to specify compression codec for Parquet and Orc --- .../datasources/parquet/ParquetRelation.scala | 16 ++- .../spark/sql/hive/orc/OrcRelation.scala | 36 ++++++- .../hive/orc/OrcHadoopFsRelationSuite.scala | 97 ++++++++++++------- .../ParquetHadoopFsRelationSuite.scala | 19 ++++ 4 files changed, 129 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 184cbb2f296b..85d3d75d050f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -148,6 +148,19 @@ private[sql] class ParquetRelation( .get(ParquetRelation.METASTORE_SCHEMA) .map(DataType.fromJson(_).asInstanceOf[StructType]) + private val compressionCodec: Option[String] = parameters + .get("compression") + .map { codecName => + // Validate if given compression codec is supported or not. + val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames + if (!shortParquetCompressionCodecNames.contains(codecName.toUpperCase)) { + val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase) + throw new IllegalArgumentException(s"Codec [$codecName] " + + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + } + codecName.toUpperCase + } + private lazy val metadataCache: MetadataCache = { val meta = new MetadataCache meta.refresh() @@ -286,7 +299,8 @@ private[sql] class ParquetRelation( ParquetRelation .shortParquetCompressionCodecNames .getOrElse( - sqlContext.conf.parquetCompressionCodec.toUpperCase, + compressionCodec + .getOrElse(sqlContext.conf.parquetCompressionCodec.toUpperCase), CompressionCodecName.UNCOMPRESSED).name()) new BucketedOutputWriterFactory { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 800823febab2..58b9ba0f07b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -23,13 +23,15 @@ import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct} +import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties +import org.apache.hadoop.hive.ql.io.orc._ import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.Logging import org.apache.spark.broadcast.Broadcast @@ -162,6 +164,19 @@ private[sql] class OrcRelation( extends HadoopFsRelation(maybePartitionSpec, parameters) with Logging { + private val compressionCodec: Option[String] = parameters + .get("compression") + .map { codecName => + // Validate if given compression codec is supported or not. + val shortOrcCompressionCodecNames = OrcRelation.shortOrcCompressionCodecNames + if (!shortOrcCompressionCodecNames.contains(codecName.toUpperCase)) { + val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase) + throw new IllegalArgumentException(s"Codec [$codecName] " + + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + } + codecName.toUpperCase + } + private[sql] def this( paths: Array[String], maybeDataSchema: Option[StructType], @@ -211,6 +226,15 @@ private[sql] class OrcRelation( } override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = { + // Sets compression scheme + compressionCodec.foreach { codecName => + job.getConfiguration.set( + OrcTableProperties.COMPRESSION.getPropName, + OrcRelation + .shortOrcCompressionCodecNames + .getOrElse(codecName, CompressionKind.NONE).name()) + } + job.getConfiguration match { case conf: JobConf => conf.setOutputFormat(classOf[OrcOutputFormat]) @@ -337,3 +361,13 @@ private[orc] object OrcTableScan { // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. private[orc] val SARG_PUSHDOWN = "sarg.pushdown" } + +private[orc] object OrcRelation { + // The ORC compression short names + val shortOrcCompressionCodecNames = Map( + "NONE" -> CompressionKind.NONE, + "SNAPPY" -> CompressionKind.SNAPPY, + "ZLIB" -> CompressionKind.ZLIB, + "LZO" -> CompressionKind.LZO) +} + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 528f40b002be..932d4a424332 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.orc +import java.io.File + import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -37,48 +39,69 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { case _: UserDefinedType[_] => false case _ => true } +// +// test("save()/load() - partitioned table - simple queries - partition columns in data") { +// withTempDir { file => +// val basePath = new Path(file.getCanonicalPath) +// val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) +// val qualifiedBasePath = fs.makeQualified(basePath) +// +// for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { +// val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") +// sparkContext +// .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) +// .toDF("a", "b", "p1") +// .write +// .orc(partitionDir.toString) +// } +// +// val dataSchemaWithPartition = +// StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) +// +// checkQueries( +// hiveContext.read.options(Map( +// "path" -> file.getCanonicalPath, +// "dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load()) +// } +// } +// +// test("SPARK-12218: 'Not' is included in ORC filter pushdown") { +// import testImplicits._ +// +// withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { +// withTempPath { dir => +// val path = s"${dir.getCanonicalPath}/table1" +// (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path) +// +// checkAnswer( +// sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"), +// (1 to 5).map(i => Row(i, (i % 2).toString))) +// +// checkAnswer( +// sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"), +// (1 to 5).map(i => Row(i, (i % 2).toString))) +// } +// } +// } - test("save()/load() - partitioned table - simple queries - partition columns in data") { - withTempDir { file => - val basePath = new Path(file.getCanonicalPath) - val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) - val qualifiedBasePath = fs.makeQualified(basePath) - - for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { - val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") - sparkContext - .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) - .toDF("a", "b", "p1") - .write - .orc(partitionDir.toString) - } - - val dataSchemaWithPartition = - StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) - - checkQueries( - hiveContext.read.options(Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load()) - } - } - - test("SPARK-12218: 'Not' is included in ORC filter pushdown") { + test("SPARK-13543: Support for specifying compression codec for ORC via option()") { import testImplicits._ - withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/table1" - (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path) + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") + df.write + .option("compression", "ZlIb") + .orc(path) + + val compressedFiles = new File(path).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) - checkAnswer( - sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"), - (1 to 5).map(i => Row(i, (i % 2).toString))) + val copyDf = sqlContext + .read + .orc(path) - checkAnswer( - sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"), - (1 to 5).map(i => Row(i, (i % 2).toString))) - } + checkAnswer(df,copyDf) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index f2501d7ce359..5d309129df98 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -208,4 +208,23 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { checkAnswer(loadedDF, df) } } + + test("SPARK-13543: Support for specifying compression codec for ORC via option()") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") + df.write + .option("compression", "sNaPpy") + .parquet(path) + + val compressedFiles = new File(path).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".snappy"))) + + val copyDf = sqlContext + .read + .parquet(path) + + checkAnswer(df,copyDf) + } + } } From 9ce0ab707a9e54769b2d55feacd366893a0daf6e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Mar 2016 14:26:58 +0900 Subject: [PATCH 02/15] Add some tests for testing compressions --- .../datasources/parquet/ParquetIOSuite.scala | 20 ++++ .../hive/orc/OrcHadoopFsRelationSuite.scala | 104 +++++++++--------- 2 files changed, 74 insertions(+), 50 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index c85eeddc2c6d..29117d294c18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.io.File + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag @@ -742,6 +744,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } } + + test("SPARK-13543: Support for specifying compression codec for Parquet via option()") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") + df.write + .option("compression", "gZiP") + .parquet(path) + + val compressedFiles = new File(path).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".gz.parquet"))) + + val copyDf = sqlContext + .read + .parquet(path) + checkAnswer(df, copyDf) + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 932d4a424332..823b52af1b87 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.hive.orc import java.io.File -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.ql.io.orc.{CompressionKind, OrcFile} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.Row @@ -39,54 +40,52 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { case _: UserDefinedType[_] => false case _ => true } -// -// test("save()/load() - partitioned table - simple queries - partition columns in data") { -// withTempDir { file => -// val basePath = new Path(file.getCanonicalPath) -// val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) -// val qualifiedBasePath = fs.makeQualified(basePath) -// -// for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { -// val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") -// sparkContext -// .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) -// .toDF("a", "b", "p1") -// .write -// .orc(partitionDir.toString) -// } -// -// val dataSchemaWithPartition = -// StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) -// -// checkQueries( -// hiveContext.read.options(Map( -// "path" -> file.getCanonicalPath, -// "dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load()) -// } -// } -// -// test("SPARK-12218: 'Not' is included in ORC filter pushdown") { -// import testImplicits._ -// -// withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { -// withTempPath { dir => -// val path = s"${dir.getCanonicalPath}/table1" -// (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path) -// -// checkAnswer( -// sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"), -// (1 to 5).map(i => Row(i, (i % 2).toString))) -// -// checkAnswer( -// sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"), -// (1 to 5).map(i => Row(i, (i % 2).toString))) -// } -// } -// } - test("SPARK-13543: Support for specifying compression codec for ORC via option()") { + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + sparkContext + .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) + .toDF("a", "b", "p1") + .write + .orc(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + hiveContext.read.options(Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load()) + } + } + + test("SPARK-12218: 'Not' is included in ORC filter pushdown") { import testImplicits._ + withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path) + + checkAnswer( + sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"), + (1 to 5).map(i => Row(i, (i % 2).toString))) + + checkAnswer( + sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"), + (1 to 5).map(i => Row(i, (i % 2).toString))) + } + } + } + + test("SPARK-13543: Support for specifying compression codec for ORC via option()") { withTempPath { dir => val path = s"${dir.getCanonicalPath}/table1" val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") @@ -94,14 +93,19 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { .option("compression", "ZlIb") .orc(path) - val compressedFiles = new File(path).listFiles() - assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + // Check if this is compressed as ZLIB. + val conf = sparkContext.hadoopConfiguration + val fs = FileSystem.getLocal(conf) + val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".orc")) + assert(maybeOrcFile.isDefined) + val orcFilePath = new Path(maybeOrcFile.get.toPath.toString) + val orcReader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)) + assert(orcReader.getCompression == CompressionKind.ZLIB) val copyDf = sqlContext .read .orc(path) - - checkAnswer(df,copyDf) + checkAnswer(df, copyDf) } } } From 04e4a5167c49b6359f0cfac29d7f2f0c84bd497d Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Mar 2016 16:27:43 +0900 Subject: [PATCH 03/15] Add tests and some comments --- python/pyspark/sql/readwriter.py | 12 ++++++++ .../apache/spark/sql/DataFrameWriter.scala | 10 +++++++ .../datasources/parquet/ParquetIOSuite.scala | 30 ++++++++++--------- .../spark/sql/hive/orc/OrcRelation.scala | 6 ++-- .../ParquetHadoopFsRelationSuite.scala | 2 +- 5 files changed, 42 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 7f5368d8bdbb..90f0dc9f445e 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -487,6 +487,12 @@ def parquet(self, path, mode=None, partitionBy=None): * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns + You can set the following Parquet-specific option(s) for writing Parquet files: + * ``compression`` (default ``None``): compression codec to use when saving to file. + This can be one of the known case-insensitive shorten names + (``uncompressed``, ``snappy``,``gzip``, and ``lzo``). + This will overwrite ``orc.compress``. + >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) @@ -547,6 +553,12 @@ def orc(self, path, mode=None, partitionBy=None): * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns + You can set the following ORC-specific option(s) for writing ORC files: + * ``compression`` (default ``None``): compression codec to use when saving to file. + This can be one of the known case-insensitive shorten names + (``uncompressed``, ``snappy``,``zlib``, and ``lzo``). + This will overwrite ``orc.compress``. + >>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) """ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 093504c765ee..dc5c5c144300 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -468,6 +468,11 @@ final class DataFrameWriter private[sql](df: DataFrame) { * format("parquet").save(path) * }}} * + * You can set the following Parquet-specific option(s) for writing Parquet files: + *
  • `compression` (default `null`): compression codec to use when saving to file. This can be + * one of the known case-insensitive shorten names(`uncompressed`, `snappy`,`gzip`, and + * `lzo`). This will overwrite `spark.sql.parquet.compression.codec`.
  • + * * @since 1.4.0 */ def parquet(path: String): Unit = format("parquet").save(path) @@ -479,6 +484,11 @@ final class DataFrameWriter private[sql](df: DataFrame) { * format("orc").save(path) * }}} * + * You can set the following ORC-specific option(s) for writing ORC files: + *
  • `compression` (default `null`): compression codec to use when saving to file. This can be + * one of the known case-insensitive shorten names(`uncompressed`, `snappy`, `zlib`, and + * `lzo`). This will overwrite `orc.compress`.
  • + * * @since 1.5.0 * @note Currently, this method can only be used together with `HiveContext`. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 29117d294c18..9d8bfa20a321 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -746,20 +746,22 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-13543: Support for specifying compression codec for Parquet via option()") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/table1" - val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") - df.write - .option("compression", "gZiP") - .parquet(path) - - val compressedFiles = new File(path).listFiles() - assert(compressedFiles.exists(_.getName.endsWith(".gz.parquet"))) - - val copyDf = sqlContext - .read - .parquet(path) - checkAnswer(df, copyDf) + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") + df.write + .option("compression", "GzIP") + .parquet(path) + + val compressedFiles = new File(path).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".gz.parquet"))) + + val copyDf = sqlContext + .read + .parquet(path) + checkAnswer(df, copyDf) + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 58b9ba0f07b9..5de0a66fe0a3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -23,15 +23,14 @@ import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties import org.apache.hadoop.hive.ql.io.orc._ +import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.Logging import org.apache.spark.broadcast.Broadcast @@ -234,7 +233,7 @@ private[sql] class OrcRelation( .shortOrcCompressionCodecNames .getOrElse(codecName, CompressionKind.NONE).name()) } - + job.getConfiguration match { case conf: JobConf => conf.setOutputFormat(classOf[OrcOutputFormat]) @@ -366,6 +365,7 @@ private[orc] object OrcRelation { // The ORC compression short names val shortOrcCompressionCodecNames = Map( "NONE" -> CompressionKind.NONE, + "UNCOMPRESSED" -> CompressionKind.NONE, "SNAPPY" -> CompressionKind.SNAPPY, "ZLIB" -> CompressionKind.ZLIB, "LZO" -> CompressionKind.LZO) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 5d309129df98..ea2d2c63260f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -224,7 +224,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { .read .parquet(path) - checkAnswer(df,copyDf) + checkAnswer(df, copyDf) } } } From a96e510295af1ed2bae25a05ee36d8f97b3e6b41 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Mar 2016 16:30:58 +0900 Subject: [PATCH 04/15] Correct comments --- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 90f0dc9f445e..cd14da49607f 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -491,7 +491,7 @@ def parquet(self, path, mode=None, partitionBy=None): * ``compression`` (default ``None``): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (``uncompressed``, ``snappy``,``gzip``, and ``lzo``). - This will overwrite ``orc.compress``. + This will overwrite ``spark.sql.parquet.compression.codec``. >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ From d107d50bd2bc659d3f5441f8e1741c8356b6cc6e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Mar 2016 16:33:36 +0900 Subject: [PATCH 05/15] Remove the duplicated test --- .../datasources/parquet/ParquetIOSuite.scala | 20 ----------- .../ParquetHadoopFsRelationSuite.scala | 33 ++++++++++--------- 2 files changed, 17 insertions(+), 36 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 9d8bfa20a321..149df689cff7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -744,26 +744,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } } - - test("SPARK-13543: Support for specifying compression codec for Parquet via option()") { - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/table1" - val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") - df.write - .option("compression", "GzIP") - .parquet(path) - - val compressedFiles = new File(path).listFiles() - assert(compressedFiles.exists(_.getName.endsWith(".gz.parquet"))) - - val copyDf = sqlContext - .read - .parquet(path) - checkAnswer(df, copyDf) - } - } - } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index ea2d2c63260f..8856148a95b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -209,22 +209,23 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { } } - test("SPARK-13543: Support for specifying compression codec for ORC via option()") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/table1" - val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") - df.write - .option("compression", "sNaPpy") - .parquet(path) - - val compressedFiles = new File(path).listFiles() - assert(compressedFiles.exists(_.getName.endsWith(".snappy"))) - - val copyDf = sqlContext - .read - .parquet(path) - - checkAnswer(df, copyDf) + test("SPARK-13543: Support for specifying compression codec for Parquet via option()") { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") + df.write + .option("compression", "GzIP") + .parquet(path) + + val compressedFiles = new File(path).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".gz.parquet"))) + + val copyDf = sqlContext + .read + .parquet(path) + checkAnswer(df, copyDf) + } } } } From 9c53d10f1a5ba136b6599d33c4b9d9070c720287 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Mar 2016 16:39:30 +0900 Subject: [PATCH 06/15] Remove unused imports --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 149df689cff7..c85eeddc2c6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.io.File - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag From 12e7275047cf13d2e4a529a2e2c34a5867078758 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Mar 2016 18:56:03 +0900 Subject: [PATCH 07/15] Add compression as an argument and make shorten names as lower cases --- python/pyspark/sql/readwriter.py | 64 +++++++++---------- .../spark/sql/hive/orc/OrcRelation.scala | 14 ++-- 2 files changed, 38 insertions(+), 40 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index cd14da49607f..877f9ba58b68 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -454,7 +454,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options) self._jwrite.saveAsTable(name) @since(1.4) - def json(self, path, mode=None): + def json(self, path, mode=None, compression=None): """Saves the content of the :class:`DataFrame` in JSON format at the specified path. :param path: the path in any Hadoop supported file system @@ -464,18 +464,18 @@ def json(self, path, mode=None): * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. - - You can set the following JSON-specific option(s) for writing JSON files: - * ``compression`` (default ``None``): compression codec to use when saving to file. - This can be one of the known case-insensitive shorten names - (``bzip2``, ``gzip``, ``lz4``, and ``snappy``). + :param compression: compression codec to use when saving to file. This can be one of the + known case-insensitive shorten names (bzip2, gzip, lz4, and snappy). >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ - self.mode(mode)._jwrite.json(path) + self.mode(mode) + if compression is not None: + self.option("compression", compression) + self._jwrite.json(path) @since(1.4) - def parquet(self, path, mode=None, partitionBy=None): + def parquet(self, path, mode=None, partitionBy=None, compression=None): """Saves the content of the :class:`DataFrame` in Parquet format at the specified path. :param path: the path in any Hadoop supported file system @@ -486,38 +486,36 @@ def parquet(self, path, mode=None, partitionBy=None): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns - - You can set the following Parquet-specific option(s) for writing Parquet files: - * ``compression`` (default ``None``): compression codec to use when saving to file. - This can be one of the known case-insensitive shorten names - (``uncompressed``, ``snappy``,``gzip``, and ``lzo``). - This will overwrite ``spark.sql.parquet.compression.codec``. + :param compression: compression codec to use when saving to file. This can be one of the + known case-insensitive shorten names (uncompressed, snappy, gzip, and + lzo). This will overwrite ``spark.sql.parquet.compression.codec``. >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) if partitionBy is not None: self.partitionBy(partitionBy) + if compression is not None: + self.option("compression", compression) self._jwrite.parquet(path) @since(1.6) - def text(self, path): + def text(self, path, compression=None): """Saves the content of the DataFrame in a text file at the specified path. :param path: the path in any Hadoop supported file system + :param compression: compression codec to use when saving to file. This can be one of the + known case-insensitive shorten names (bzip2, gzip, lz4, and snappy). The DataFrame must have only one column that is of string type. Each row becomes a new line in the output file. - - You can set the following option(s) for writing text files: - * ``compression`` (default ``None``): compression codec to use when saving to file. - This can be one of the known case-insensitive shorten names - (``bzip2``, ``gzip``, ``lz4``, and ``snappy``). """ + if compression is not None: + self.option("compression", compression) self._jwrite.text(path) @since(2.0) - def csv(self, path, mode=None): + def csv(self, path, mode=None, compression=None): """Saves the content of the [[DataFrame]] in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -528,17 +526,18 @@ def csv(self, path, mode=None): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. - You can set the following CSV-specific option(s) for writing CSV files: - * ``compression`` (default ``None``): compression codec to use when saving to file. - This can be one of the known case-insensitive shorten names - (``bzip2``, ``gzip``, ``lz4``, and ``snappy``). + :param compression: compression codec to use when saving to file. This can be one of the + known case-insensitive shorten names (bzip2, gzip, lz4, and snappy). >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ - self.mode(mode)._jwrite.csv(path) + self.mode(mode) + if compression is not None: + self.option("compression", compression) + self._jwrite.csv(path) @since(1.5) - def orc(self, path, mode=None, partitionBy=None): + def orc(self, path, mode=None, partitionBy=None, compression=None): """Saves the content of the :class:`DataFrame` in ORC format at the specified path. ::Note: Currently ORC support is only available together with @@ -552,12 +551,9 @@ def orc(self, path, mode=None, partitionBy=None): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns - - You can set the following ORC-specific option(s) for writing ORC files: - * ``compression`` (default ``None``): compression codec to use when saving to file. - This can be one of the known case-insensitive shorten names - (``uncompressed``, ``snappy``,``zlib``, and ``lzo``). - This will overwrite ``orc.compress``. + :param compression: compression codec to use when saving to file. This can be one of the + known case-insensitive shorten names (uncompressed, snappy, zlib, and + lzo). This will overwrite ``orc.compress``. >>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) @@ -565,6 +561,8 @@ def orc(self, path, mode=None, partitionBy=None): self.mode(mode) if partitionBy is not None: self.partitionBy(partitionBy) + if compression is not None: + self.option("compression", compression) self._jwrite.orc(path) @since(1.4) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 5de0a66fe0a3..2b06e1a12c54 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -168,12 +168,12 @@ private[sql] class OrcRelation( .map { codecName => // Validate if given compression codec is supported or not. val shortOrcCompressionCodecNames = OrcRelation.shortOrcCompressionCodecNames - if (!shortOrcCompressionCodecNames.contains(codecName.toUpperCase)) { + if (!shortOrcCompressionCodecNames.contains(codecName.toLowerCase)) { val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase) throw new IllegalArgumentException(s"Codec [$codecName] " + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") } - codecName.toUpperCase + codecName.toLowerCase } private[sql] def this( @@ -364,10 +364,10 @@ private[orc] object OrcTableScan { private[orc] object OrcRelation { // The ORC compression short names val shortOrcCompressionCodecNames = Map( - "NONE" -> CompressionKind.NONE, - "UNCOMPRESSED" -> CompressionKind.NONE, - "SNAPPY" -> CompressionKind.SNAPPY, - "ZLIB" -> CompressionKind.ZLIB, - "LZO" -> CompressionKind.LZO) + "none" -> CompressionKind.NONE, + "uncompressed" -> CompressionKind.NONE, + "snappy" -> CompressionKind.SNAPPY, + "zlib" -> CompressionKind.ZLIB, + "lzo" -> CompressionKind.LZO) } From baf7a63337c49c4104240ad89a7cd257dca83eca Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Mar 2016 10:56:49 +0900 Subject: [PATCH 08/15] Add some comments for consistent compression names and add none/uncompressed for test-based datasources --- python/pyspark/sql/readwriter.py | 17 ++++++++++------- .../org/apache/spark/sql/DataFrameWriter.scala | 17 ++++++++++------- .../datasources/CompressionCodecs.scala | 6 +++++- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 877f9ba58b68..438662bb157f 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -465,7 +465,8 @@ def json(self, path, mode=None, compression=None): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. :param compression: compression codec to use when saving to file. This can be one of the - known case-insensitive shorten names (bzip2, gzip, lz4, and snappy). + known case-insensitive shorten names (none, bzip2, gzip, lz4, + snappy and deflate). >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ @@ -487,8 +488,8 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns :param compression: compression codec to use when saving to file. This can be one of the - known case-insensitive shorten names (uncompressed, snappy, gzip, and - lzo). This will overwrite ``spark.sql.parquet.compression.codec``. + known case-insensitive shorten names (none, snappy, gzip, and lzo). + This will overwrite ``spark.sql.parquet.compression.codec``. >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ @@ -505,7 +506,8 @@ def text(self, path, compression=None): :param path: the path in any Hadoop supported file system :param compression: compression codec to use when saving to file. This can be one of the - known case-insensitive shorten names (bzip2, gzip, lz4, and snappy). + known case-insensitive shorten names (none, bzip2, gzip, lz4, + snappy and deflate). The DataFrame must have only one column that is of string type. Each row becomes a new line in the output file. @@ -527,7 +529,8 @@ def csv(self, path, mode=None, compression=None): * ``error`` (default case): Throw an exception if data already exists. :param compression: compression codec to use when saving to file. This can be one of the - known case-insensitive shorten names (bzip2, gzip, lz4, and snappy). + known case-insensitive shorten names (none, bzip2, gzip, lz4, + snappy and deflate). >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ @@ -552,8 +555,8 @@ def orc(self, path, mode=None, partitionBy=None, compression=None): * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns :param compression: compression codec to use when saving to file. This can be one of the - known case-insensitive shorten names (uncompressed, snappy, zlib, and - lzo). This will overwrite ``orc.compress``. + known case-insensitive shorten names (none, snappy, zlib, and lzo). + This will overwrite ``orc.compress``. >>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index dcbce4a7b8c7..c373606a2e07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -455,7 +455,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * You can set the following JSON-specific option(s) for writing JSON files: *
  • `compression` (default `null`): compression codec to use when saving to file. This can be - * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`).
  • + * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, + * `snappy` and `deflate`). * * @since 1.4.0 */ @@ -470,8 +471,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * You can set the following Parquet-specific option(s) for writing Parquet files: *
  • `compression` (default `null`): compression codec to use when saving to file. This can be - * one of the known case-insensitive shorten names(`uncompressed`, `snappy`,`gzip`, and - * `lzo`). This will overwrite `spark.sql.parquet.compression.codec`.
  • + * one of the known case-insensitive shorten names(`none`, `snappy`, `gzip`, and `lzo`). + * This will overwrite `spark.sql.parquet.compression.codec`. * * @since 1.4.0 */ @@ -486,8 +487,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * You can set the following ORC-specific option(s) for writing ORC files: *
  • `compression` (default `null`): compression codec to use when saving to file. This can be - * one of the known case-insensitive shorten names(`uncompressed`, `snappy`, `zlib`, and - * `lzo`). This will overwrite `orc.compress`.
  • + * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`). + * This will overwrite `orc.compress`. * * @since 1.5.0 * @note Currently, this method can only be used together with `HiveContext`. @@ -508,7 +509,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * You can set the following option(s) for writing text files: *
  • `compression` (default `null`): compression codec to use when saving to file. This can be - * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`).
  • + * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, + * `snappy` and `deflate`). * * @since 1.6.0 */ @@ -523,7 +525,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { * * You can set the following CSV-specific option(s) for writing CSV files: *
  • `compression` (default `null`): compression codec to use when saving to file. This can be - * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`).
  • + * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, + * `snappy` and `deflate`). * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala index 9e913de48f72..44bb895daac0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala @@ -25,6 +25,8 @@ import org.apache.spark.util.Utils private[datasources] object CompressionCodecs { private val shortCompressionCodecNames = Map( + "none" -> null, + "uncompressed" -> null, "bzip2" -> classOf[BZip2Codec].getName, "deflate" -> classOf[DeflateCodec].getName, "gzip" -> classOf[GzipCodec].getName, @@ -39,7 +41,9 @@ private[datasources] object CompressionCodecs { val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name) try { // Validate the codec name - Utils.classForName(codecName) + if (codecName != null) { + Utils.classForName(codecName) + } codecName } catch { case e: ClassNotFoundException => From 99c39c6f8a3b041bb6563042def29ca552b9f9a6 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Mar 2016 12:46:54 +0900 Subject: [PATCH 09/15] Use lower-cases for compression codec names --- .../sql/execution/datasources/parquet/ParquetRelation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 9b4f2adcdc69..7ea098c72bf4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -153,12 +153,12 @@ private[sql] class ParquetRelation( .map { codecName => // Validate if given compression codec is supported or not. val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames - if (!shortParquetCompressionCodecNames.contains(codecName.toUpperCase)) { + if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) { val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase) throw new IllegalArgumentException(s"Codec [$codecName] " + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") } - codecName.toUpperCase + codecName.toLowerCase } private lazy val metadataCache: MetadataCache = { From 4e212b16eb484563ca5b56dd9ae970b6a8337c21 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Mar 2016 12:49:12 +0900 Subject: [PATCH 10/15] Update error meesages for TextSuite --- .../spark/sql/execution/datasources/text/TextSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 59e0e6a7cfa0..65f6b4fb3fee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -71,8 +71,8 @@ class TextSuite extends QueryTest with SharedSQLContext { val tempDirPath = Utils.createTempDir().getAbsolutePath() testDf.write.option("compression", "illegal").mode(SaveMode.Overwrite).text(tempDirPath) } - assert(errMsg.getMessage === "Codec [illegal] is not available. " + - "Known codecs are bzip2, deflate, lz4, gzip, snappy.") + assert(errMsg.getMessage.contains("Codec [illegal] is not available. " + + "Known codecs are")) } private def testFile: String = { From 4c1ffc52f20d29e6a300d5665d7ae53ce0335456 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Mar 2016 13:32:57 +0900 Subject: [PATCH 11/15] Add some tests and functionality for expliclty setting no compression --- .../datasources/CompressionCodecs.scala | 16 +++-- .../datasources/json/JSONOptions.scala | 3 +- .../execution/datasources/csv/CSVSuite.scala | 46 +++++++++++++ .../datasources/json/JsonSuite.scala | 66 ++++++++++++------- .../datasources/text/TextSuite.scala | 51 ++++++++++++-- 5 files changed, 144 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala index 44bb895daac0..032ba61d9dc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala @@ -57,10 +57,16 @@ private[datasources] object CompressionCodecs { * `codec` should be a full class path */ def setCodecConfiguration(conf: Configuration, codec: String): Unit = { - conf.set("mapreduce.output.fileoutputformat.compress", "true") - conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) - conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) - conf.set("mapreduce.map.output.compress", "true") - conf.set("mapreduce.map.output.compress.codec", codec) + if (codec != null){ + conf.set("mapreduce.output.fileoutputformat.compress", "true") + conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) + conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) + conf.set("mapreduce.map.output.compress", "true") + conf.set("mapreduce.map.output.compress.codec", codec) + } else { + // This infers the option `compression` is set to `uncompressed` or `none`. + conf.set("mapreduce.output.fileoutputformat.compress", "false") + conf.set("mapreduce.map.output.compress", "false") + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index e59dbd6b3d43..1fa4e71bf5af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -48,7 +48,8 @@ private[sql] class JSONOptions( parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true) val allowBackslashEscapingAnyCharacter = parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) - val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) + private val compressionName = parameters.get("compression") + val compressionCodec = compressionName.map(CompressionCodecs.getCodecClassName) /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 3ecbb14f2ea6..e17991b04504 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -21,6 +21,12 @@ import java.io.File import java.nio.charset.UnsupportedCharsetException import java.sql.Timestamp +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.SequenceFile.CompressionType +import org.apache.hadoop.io.compress.GzipCodec + import org.apache.spark.SparkException import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} @@ -396,6 +402,46 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("SPARK-13543 Set explicitly the output as uncompressed") { + val clonedConf = new Configuration(hadoopConfiguration) + hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") + hadoopConfiguration + .set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) + hadoopConfiguration + .set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName) + hadoopConfiguration.set("mapreduce.map.output.compress", "true") + hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName) + withTempDir { dir => + try { + val csvDir = new File(dir, "csv").getCanonicalPath + val cars = sqlContext.read + .format("csv") + .option("header", "true") + .load(testFile(carsFile)) + + cars.coalesce(1).write + .format("csv") + .option("header", "true") + .option("compression", "none") + .save(csvDir) + + val compressedFiles = new File(csvDir).listFiles() + assert(compressedFiles.exists(!_.getName.endsWith(".gz"))) + + val carsCopy = sqlContext.read + .format("csv") + .option("header", "true") + .load(csvDir) + + verifyCars(carsCopy, withHeader = true) + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) + } + } + } + test("Schema inference correctly identifies the datatype when data is sparse.") { val df = sqlContext.read .format("csv") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index c7f33e17465b..ef9d0e54f5df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -23,9 +23,10 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import com.fasterxml.jackson.core.JsonFactory -import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.io.SequenceFile.CompressionType +import org.apache.hadoop.io.compress.GzipCodec import org.scalactic.Tolerance._ import org.apache.spark.rdd.RDD @@ -1496,31 +1497,46 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - test("SPARK-12872 Support to specify the option for compression codec") { + test("SPARK-13543 Set explicitly the output as uncompressed") { + val clonedConf = new Configuration(hadoopConfiguration) + hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") + hadoopConfiguration + .set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) + hadoopConfiguration + .set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName) + hadoopConfiguration.set("mapreduce.map.output.compress", "true") + hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName) withTempDir { dir => - val dir = Utils.createTempDir() - dir.delete() - val path = dir.getCanonicalPath - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) - - val jsonDF = sqlContext.read.json(path) - val jsonDir = new File(dir, "json").getCanonicalPath - jsonDF.coalesce(1).write - .format("json") - .option("compression", "gZiP") - .save(jsonDir) - - val compressedFiles = new File(jsonDir).listFiles() - assert(compressedFiles.exists(_.getName.endsWith(".gz"))) - - val jsonCopy = sqlContext.read - .format("json") - .load(jsonDir) - - assert(jsonCopy.count == jsonDF.count) - val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") - val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") - checkAnswer(jsonCopySome, jsonDFSome) + try { + val dir = Utils.createTempDir() + dir.delete() + + val path = dir.getCanonicalPath + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + + val jsonDF = sqlContext.read.json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write + .format("json") + .option("compression", "none") + .save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(!_.getName.endsWith(".gz"))) + + val jsonCopy = sqlContext.read + .format("json") + .load(jsonDir) + + assert(jsonCopy.count == jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 65f6b4fb3fee..060bc576c14c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -17,6 +17,14 @@ package org.apache.spark.sql.execution.datasources.text +import java.io.File + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.SequenceFile.CompressionType +import org.apache.hadoop.io.compress.GzipCodec + import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructType} @@ -59,22 +67,51 @@ class TextSuite extends QueryTest with SharedSQLContext { test("SPARK-13503 Support to specify the option for compression codec for TEXT") { val testDf = sqlContext.read.text(testFile) - - Seq("bzip2", "deflate", "gzip").foreach { codecName => - val tempDir = Utils.createTempDir() - val tempDirPath = tempDir.getAbsolutePath() - testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) - verifyFrame(sqlContext.read.text(tempDirPath)) + val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", "gzip" -> ".gz") + extensionNameMap.foreach { + case (codecName, extension) => + val tempDir = Utils.createTempDir() + val tempDirPath = tempDir.getAbsolutePath + testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) + val compressedFiles = new File(tempDirPath).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(extension))) + verifyFrame(sqlContext.read.text(tempDirPath)) } val errMsg = intercept[IllegalArgumentException] { - val tempDirPath = Utils.createTempDir().getAbsolutePath() + val tempDirPath = Utils.createTempDir().getAbsolutePath testDf.write.option("compression", "illegal").mode(SaveMode.Overwrite).text(tempDirPath) } assert(errMsg.getMessage.contains("Codec [illegal] is not available. " + "Known codecs are")) } + test("SPARK-13543 Set explicitly the output as uncompressed") { + val clonedConf = new Configuration(hadoopConfiguration) + hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") + hadoopConfiguration + .set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) + hadoopConfiguration + .set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName) + hadoopConfiguration.set("mapreduce.map.output.compress", "true") + hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName) + withTempDir { dir => + try { + val testDf = sqlContext.read.text(testFile) + val tempDir = Utils.createTempDir() + val tempDirPath = tempDir.getAbsolutePath + testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath) + val compressedFiles = new File(tempDirPath).listFiles() + assert(compressedFiles.exists(!_.getName.endsWith(".gz"))) + verifyFrame(sqlContext.read.text(tempDirPath)) + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) + } + } + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString } From 29a8d6168f41bb52c8b5206c67174537a81e311c Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Mar 2016 13:34:01 +0900 Subject: [PATCH 12/15] Remove unused variable --- .../spark/sql/execution/datasources/json/JSONOptions.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index 1fa4e71bf5af..e59dbd6b3d43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -48,8 +48,7 @@ private[sql] class JSONOptions( parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true) val allowBackslashEscapingAnyCharacter = parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) - private val compressionName = parameters.get("compression") - val compressionCodec = compressionName.map(CompressionCodecs.getCodecClassName) + val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { From 2817c9a774f54c5f79828137bdb5a95c391c90c3 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Mar 2016 13:35:36 +0900 Subject: [PATCH 13/15] Add removed test --- .../datasources/json/JsonSuite.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index ef9d0e54f5df..7ed298a8f5ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1497,6 +1497,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("SPARK-12872 Support to specify the option for compression codec") { + withTempDir { dir => + val dir = Utils.createTempDir() + dir.delete() + val path = dir.getCanonicalPath + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + + val jsonDF = sqlContext.read.json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write + .format("json") + .option("compression", "gZiP") + .save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + + val jsonCopy = sqlContext.read + .format("json") + .load(jsonDir) + + assert(jsonCopy.count == jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) + } + } + test("SPARK-13543 Set explicitly the output as uncompressed") { val clonedConf = new Configuration(hadoopConfiguration) hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") From d2ace37bea1cd8a4c8e1cfb4b1b7fb2ace67c005 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Mar 2016 17:06:43 +0900 Subject: [PATCH 14/15] Update test names --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- .../apache/spark/sql/execution/datasources/json/JsonSuite.scala | 2 +- .../apache/spark/sql/execution/datasources/text/TextSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e17991b04504..c6f861110851 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -402,7 +402,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("SPARK-13543 Set explicitly the output as uncompressed") { + test("SPARK-13543 Write the output as uncompressed via option") { val clonedConf = new Configuration(hadoopConfiguration) hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") hadoopConfiguration diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 7ed298a8f5ed..3c2956345845 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1525,7 +1525,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - test("SPARK-13543 Set explicitly the output as uncompressed") { + test("SPARK-13543 Write the output as uncompressed via option") { val clonedConf = new Configuration(hadoopConfiguration) hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") hadoopConfiguration diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 060bc576c14c..ab26507e0e18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -86,7 +86,7 @@ class TextSuite extends QueryTest with SharedSQLContext { "Known codecs are")) } - test("SPARK-13543 Set explicitly the output as uncompressed") { + test("SPARK-13543 Write the output as uncompressed via option") { val clonedConf = new Configuration(hadoopConfiguration) hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") hadoopConfiguration From e4d4cfbc5a65ad95c4eee31ea5a2f09de7e5934a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 3 Mar 2016 17:07:52 +0900 Subject: [PATCH 15/15] Make the options function --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- .../apache/spark/sql/execution/datasources/json/JsonSuite.scala | 2 +- .../apache/spark/sql/execution/datasources/text/TextSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c6f861110851..9cd3a9ab952b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -402,7 +402,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("SPARK-13543 Write the output as uncompressed via option") { + test("SPARK-13543 Write the output as uncompressed via option()") { val clonedConf = new Configuration(hadoopConfiguration) hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") hadoopConfiguration diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 3c2956345845..3a335541431f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1525,7 +1525,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - test("SPARK-13543 Write the output as uncompressed via option") { + test("SPARK-13543 Write the output as uncompressed via option()") { val clonedConf = new Configuration(hadoopConfiguration) hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") hadoopConfiguration diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index ab26507e0e18..9eb1016b6415 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -86,7 +86,7 @@ class TextSuite extends QueryTest with SharedSQLContext { "Known codecs are")) } - test("SPARK-13543 Write the output as uncompressed via option") { + test("SPARK-13543 Write the output as uncompressed via option()") { val clonedConf = new Configuration(hadoopConfiguration) hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") hadoopConfiguration