From 8d151b56936882edd5dfa69d4bf3d1340f0336ca Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 12 Nov 2015 16:18:46 +0800 Subject: [PATCH 1/8] [SPARK-11691][SQL] Allow to specify compression codec in HadoopFsRelation when saving --- .../apache/spark/sql/DataFrameWriter.scala | 14 +++++++++++ .../datasources/DataSourceStrategy.scala | 2 +- .../InsertIntoHadoopFsRelation.scala | 10 ++++---- .../datasources/ResolvedDataSource.scala | 10 +++++++- .../datasources/WriterContainer.scala | 23 +++++++++++++++---- .../datasources/text/TextSuite.scala | 13 ++++++++++- 6 files changed, 60 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 12eb2393634a..7f5e3c28cd6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -21,6 +21,8 @@ import java.util.Properties import scala.collection.JavaConverters._ +import org.apache.hadoop.io.compress.CompressionCodec + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -29,6 +31,8 @@ import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingA import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.sources.HadoopFsRelation + + /** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, @@ -129,6 +133,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** +<<<<<<< b72611f20a03c790b6fd341b6ffdb3b5437609ee * Buckets the output by the given columns. If specified, the output is laid out on the file * system similar to Hive's bucketing scheme. * @@ -155,6 +160,15 @@ final class DataFrameWriter private[sql](df: DataFrame) { this.sortColumnNames = Option(colName +: colNames) this } + /* + * Specify the compression codec when saving it on hdfs + * + * @since 1.7.0 + */ + def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = { + this.extraOptions += ("compression.codec" -> codec.getCanonicalName) + this + } /** * Saves the content of the [[DataFrame]] at the specified path. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index da9320ffb61c..85d14f8d395e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -141,7 +141,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case i @ logical.InsertIntoTable( l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil + execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode, None)) :: Nil case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 2d3e1714d2b7..a7aadbd24bf9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat @@ -32,7 +33,6 @@ import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution} import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils - /** * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a @@ -58,7 +58,8 @@ import org.apache.spark.util.Utils private[sql] case class InsertIntoHadoopFsRelation( @transient relation: HadoopFsRelation, @transient query: LogicalPlan, - mode: SaveMode) + mode: SaveMode, + codec: Option[Class[_ <: CompressionCodec]]) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { @@ -126,7 +127,7 @@ private[sql] case class InsertIntoHadoopFsRelation( """.stripMargin) val writerContainer = if (partitionColumns.isEmpty && relation.maybeBucketSpec.isEmpty) { - new DefaultWriterContainer(relation, job, isAppend) + new DefaultWriterContainer(relation, job, isAppend, codec) } else { val output = df.queryExecution.executedPlan.output val (partitionOutput, dataOutput) = @@ -140,7 +141,8 @@ private[sql] case class InsertIntoHadoopFsRelation( output, PartitioningUtils.DEFAULT_PARTITION_NAME, sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES), - isAppend) + isAppend, + codec) } // This call shouldn't be put into the `try` block below because it only initializes and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index cc8dcf59307f..776d52c0ab9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.hadoop.util.StringUtils import org.apache.spark.Logging @@ -253,11 +254,18 @@ object ResolvedDataSource extends Logging { // For partitioned relation r, r.schema's column ordering can be different from the column // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. + + val codec = options.get("compression.codec").flatMap(e => + Some(new CompressionCodecFactory(sqlContext.sparkContext.hadoopConfiguration) + .getCodecClassByName(e).asInstanceOf[Class[CompressionCodec]]) + ) + sqlContext.executePlan( InsertIntoHadoopFsRelation( r, data.logicalPlan, - mode)).toRdd + mode, + codec)).toRdd r case _ => sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 6340229dbb78..06321d9a1e61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -19,7 +19,11 @@ package org.apache.spark.sql.execution.datasources import java.util.{Date, UUID} +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.SequenceFile.CompressionType +import org.apache.hadoop.io.compress.{CompressionCodec, SnappyCodec} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -39,7 +43,8 @@ import org.apache.spark.util.SerializableConfiguration private[sql] abstract class BaseWriterContainer( @transient val relation: HadoopFsRelation, @transient private val job: Job, - isAppend: Boolean) + isAppend: Boolean, + codec: Option[Class[_ <: CompressionCodec]]) extends Logging with Serializable { protected val dataSchema = relation.dataSchema @@ -207,6 +212,11 @@ private[sql] abstract class BaseWriterContainer( serializableConf.value.set("mapred.task.id", taskAttemptId.toString) serializableConf.value.setBoolean("mapred.task.is.map", true) serializableConf.value.setInt("mapred.task.partition", 0) + for (c <- codec) { + serializableConf.value.set("mapred.output.compress", "true") + serializableConf.value.set("mapred.output.compression.codec", c.getCanonicalName) + serializableConf.value.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + } } def commitTask(): Unit = { @@ -239,8 +249,10 @@ private[sql] abstract class BaseWriterContainer( private[sql] class DefaultWriterContainer( relation: HadoopFsRelation, job: Job, - isAppend: Boolean) - extends BaseWriterContainer(relation, job, isAppend) { + isAppend: Boolean, + codec: Option[Class[_ <: CompressionCodec]]) + extends BaseWriterContainer(relation, job, isAppend, + codec: Option[Class[_ <: CompressionCodec]]) { def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { executorSideSetup(taskContext) @@ -308,8 +320,9 @@ private[sql] class DynamicPartitionWriterContainer( inputSchema: Seq[Attribute], defaultPartitionName: String, maxOpenFiles: Int, - isAppend: Boolean) - extends BaseWriterContainer(relation, job, isAppend) { + isAppend: Boolean, + codec: Option[Class[_ <: CompressionCodec]]) + extends BaseWriterContainer(relation, job, isAppend, codec) { private val bucketSpec = relation.maybeBucketSpec diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index f95272530d58..d42ae2e12770 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.execution.datasources.text -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import com.google.common.io.Files +import org.apache.hadoop.io.compress.GzipCodec + +import org.apache.spark.sql._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.Utils @@ -57,6 +60,14 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("compression") { + val tempDirPath = Files.createTempDir().getAbsolutePath; + val df = sqlContext.read.text(testFile) + df.show() + df.write.compress(classOf[GzipCodec]).mode(SaveMode.Overwrite).text(tempDirPath) + verifyFrame(sqlContext.read.text(tempDirPath)) + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString } From 320247455fdc7fea60a2cd59893c51987b716677 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 12 Nov 2015 17:50:28 +0800 Subject: [PATCH 2/8] fix compilation issue --- .../test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index c997453803b0..929b154dd541 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -306,7 +306,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.sparkPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + @@ -336,7 +336,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.sparkPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + From f6c0074f36a9d9ca055e337e30cadab781262be6 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 2 Dec 2015 11:24:32 +0800 Subject: [PATCH 3/8] address comments --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 3 +-- .../spark/sql/execution/datasources/ResolvedDataSource.scala | 1 + .../spark/sql/execution/datasources/text/TextSuite.scala | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 7f5e3c28cd6d..b19e55cd5642 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.sources.HadoopFsRelation - /** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, @@ -163,7 +162,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { /* * Specify the compression codec when saving it on hdfs * - * @since 1.7.0 + * @since 1.6.0 */ def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = { this.extraOptions += ("compression.codec" -> codec.getCanonicalName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 776d52c0ab9a..3a12e0ceeb03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -26,6 +26,7 @@ import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.hadoop.util.StringUtils +import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index d42ae2e12770..77b787e0789d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -63,7 +63,6 @@ class TextSuite extends QueryTest with SharedSQLContext { test("compression") { val tempDirPath = Files.createTempDir().getAbsolutePath; val df = sqlContext.read.text(testFile) - df.show() df.write.compress(classOf[GzipCodec]).mode(SaveMode.Overwrite).text(tempDirPath) verifyFrame(sqlContext.read.text(tempDirPath)) } From d470aeab8c2ffaff305c691d859ae7c878387c4d Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 2 Dec 2015 13:03:08 +0800 Subject: [PATCH 4/8] minor change --- .../spark/sql/execution/datasources/WriterContainer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 06321d9a1e61..a22d1eab40f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -212,7 +212,7 @@ private[sql] abstract class BaseWriterContainer( serializableConf.value.set("mapred.task.id", taskAttemptId.toString) serializableConf.value.setBoolean("mapred.task.is.map", true) serializableConf.value.setInt("mapred.task.partition", 0) - for (c <- codec) { + codec.foreach { c => serializableConf.value.set("mapred.output.compress", "true") serializableConf.value.set("mapred.output.compression.codec", c.getCanonicalName) serializableConf.value.set("mapred.output.compression.type", CompressionType.BLOCK.toString) From f47d8cc9adb72b43ab8795246aca8ed5df55aa67 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 3 Dec 2015 13:20:07 +0800 Subject: [PATCH 5/8] change back to 1.7.0 --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index b19e55cd5642..de195790f9a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -162,7 +162,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { /* * Specify the compression codec when saving it on hdfs * - * @since 1.6.0 + * @since 1.7.0 */ def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = { this.extraOptions += ("compression.codec" -> codec.getCanonicalName) From ea70b40c66eb63a8f626051a4a14ff12ae4bca4c Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 29 Jan 2016 12:57:24 +0800 Subject: [PATCH 6/8] fix review comments --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 1 - .../spark/sql/execution/datasources/DataSourceStrategy.scala | 2 +- .../sql/execution/datasources/InsertIntoHadoopFsRelation.scala | 2 +- .../spark/sql/execution/datasources/ResolvedDataSource.scala | 1 - 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index de195790f9a8..cd0b90f2132e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -132,7 +132,6 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** -<<<<<<< b72611f20a03c790b6fd341b6ffdb3b5437609ee * Buckets the output by the given columns. If specified, the output is laid out on the file * system similar to Hive's bucketing scheme. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 85d14f8d395e..da9320ffb61c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -141,7 +141,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case i @ logical.InsertIntoTable( l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode, None)) :: Nil + execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index a7aadbd24bf9..53d5e441d748 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -59,7 +59,7 @@ private[sql] case class InsertIntoHadoopFsRelation( @transient relation: HadoopFsRelation, @transient query: LogicalPlan, mode: SaveMode, - codec: Option[Class[_ <: CompressionCodec]]) + codec: Option[Class[_ <: CompressionCodec]] = None) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 3a12e0ceeb03..776d52c0ab9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -26,7 +26,6 @@ import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.hadoop.util.StringUtils -import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil From a8ed421b53ff3046f8ac8e4cbe367574d692b339 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 29 Jan 2016 13:36:14 +0800 Subject: [PATCH 7/8] fix code style --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index cd0b90f2132e..aebe47ed5b68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingA import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.sources.HadoopFsRelation - /** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, From 3645dbcf6f752b6ddd2b3a9613a0d409b8a7e76c Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 29 Jan 2016 16:35:01 +0800 Subject: [PATCH 8/8] minor fix --- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 4 ++-- .../execution/datasources/InsertIntoHadoopFsRelation.scala | 1 + .../spark/sql/execution/datasources/WriterContainer.scala | 5 ++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index aebe47ed5b68..f5d5f1defeb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -158,9 +158,9 @@ final class DataFrameWriter private[sql](df: DataFrame) { this } /* - * Specify the compression codec when saving it on hdfs + * Specify the compression codec when saving it on hdfs. * - * @since 1.7.0 + * @since 2.0.0 */ def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = { this.extraOptions += ("compression.codec" -> codec.getCanonicalName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 53d5e441d748..95aaa3541dd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution} import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils + /** * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index a22d1eab40f7..8665f1d3a0aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -44,7 +44,7 @@ private[sql] abstract class BaseWriterContainer( @transient val relation: HadoopFsRelation, @transient private val job: Job, isAppend: Boolean, - codec: Option[Class[_ <: CompressionCodec]]) + codec: Option[Class[_ <: CompressionCodec]] = None) extends Logging with Serializable { protected val dataSchema = relation.dataSchema @@ -251,8 +251,7 @@ private[sql] class DefaultWriterContainer( job: Job, isAppend: Boolean, codec: Option[Class[_ <: CompressionCodec]]) - extends BaseWriterContainer(relation, job, isAppend, - codec: Option[Class[_ <: CompressionCodec]]) { + extends BaseWriterContainer(relation, job, isAppend, codec) { def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { executorSideSetup(taskContext)