diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 12eb2393634a..f5d5f1defeb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -21,6 +21,8 @@ import java.util.Properties import scala.collection.JavaConverters._ +import org.apache.hadoop.io.compress.CompressionCodec + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -155,6 +157,15 @@ final class DataFrameWriter private[sql](df: DataFrame) { this.sortColumnNames = Option(colName +: colNames) this } + /* + * Specify the compression codec when saving it on hdfs. + * + * @since 2.0.0 + */ + def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = { + this.extraOptions += ("compression.codec" -> codec.getCanonicalName) + this + } /** * Saves the content of the [[DataFrame]] at the specified path. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 2d3e1714d2b7..95aaa3541dd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat @@ -58,7 +59,8 @@ import org.apache.spark.util.Utils private[sql] case class InsertIntoHadoopFsRelation( @transient relation: HadoopFsRelation, @transient query: LogicalPlan, - mode: SaveMode) + mode: SaveMode, + codec: Option[Class[_ <: CompressionCodec]] = None) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { @@ -126,7 +128,7 @@ private[sql] case class InsertIntoHadoopFsRelation( """.stripMargin) val writerContainer = if (partitionColumns.isEmpty && relation.maybeBucketSpec.isEmpty) { - new DefaultWriterContainer(relation, job, isAppend) + new DefaultWriterContainer(relation, job, isAppend, codec) } else { val output = df.queryExecution.executedPlan.output val (partitionOutput, dataOutput) = @@ -140,7 +142,8 @@ private[sql] case class InsertIntoHadoopFsRelation( output, PartitioningUtils.DEFAULT_PARTITION_NAME, sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES), - isAppend) + isAppend, + codec) } // This call shouldn't be put into the `try` block below because it only initializes and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index cc8dcf59307f..776d52c0ab9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.hadoop.util.StringUtils import org.apache.spark.Logging @@ -253,11 +254,18 @@ object ResolvedDataSource extends Logging { // For partitioned relation r, r.schema's column ordering can be different from the column // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. + + val codec = options.get("compression.codec").flatMap(e => + Some(new CompressionCodecFactory(sqlContext.sparkContext.hadoopConfiguration) + .getCodecClassByName(e).asInstanceOf[Class[CompressionCodec]]) + ) + sqlContext.executePlan( InsertIntoHadoopFsRelation( r, data.logicalPlan, - mode)).toRdd + mode, + codec)).toRdd r case _ => sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 6340229dbb78..8665f1d3a0aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -19,7 +19,11 @@ package org.apache.spark.sql.execution.datasources import java.util.{Date, UUID} +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.SequenceFile.CompressionType +import org.apache.hadoop.io.compress.{CompressionCodec, SnappyCodec} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -39,7 +43,8 @@ import org.apache.spark.util.SerializableConfiguration private[sql] abstract class BaseWriterContainer( @transient val relation: HadoopFsRelation, @transient private val job: Job, - isAppend: Boolean) + isAppend: Boolean, + codec: Option[Class[_ <: CompressionCodec]] = None) extends Logging with Serializable { protected val dataSchema = relation.dataSchema @@ -207,6 +212,11 @@ private[sql] abstract class BaseWriterContainer( serializableConf.value.set("mapred.task.id", taskAttemptId.toString) serializableConf.value.setBoolean("mapred.task.is.map", true) serializableConf.value.setInt("mapred.task.partition", 0) + codec.foreach { c => + serializableConf.value.set("mapred.output.compress", "true") + serializableConf.value.set("mapred.output.compression.codec", c.getCanonicalName) + serializableConf.value.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + } } def commitTask(): Unit = { @@ -239,8 +249,9 @@ private[sql] abstract class BaseWriterContainer( private[sql] class DefaultWriterContainer( relation: HadoopFsRelation, job: Job, - isAppend: Boolean) - extends BaseWriterContainer(relation, job, isAppend) { + isAppend: Boolean, + codec: Option[Class[_ <: CompressionCodec]]) + extends BaseWriterContainer(relation, job, isAppend, codec) { def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { executorSideSetup(taskContext) @@ -308,8 +319,9 @@ private[sql] class DynamicPartitionWriterContainer( inputSchema: Seq[Attribute], defaultPartitionName: String, maxOpenFiles: Int, - isAppend: Boolean) - extends BaseWriterContainer(relation, job, isAppend) { + isAppend: Boolean, + codec: Option[Class[_ <: CompressionCodec]]) + extends BaseWriterContainer(relation, job, isAppend, codec) { private val bucketSpec = relation.maybeBucketSpec diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index f95272530d58..77b787e0789d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.execution.datasources.text -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import com.google.common.io.Files +import org.apache.hadoop.io.compress.GzipCodec + +import org.apache.spark.sql._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.Utils @@ -57,6 +60,13 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("compression") { + val tempDirPath = Files.createTempDir().getAbsolutePath; + val df = sqlContext.read.text(testFile) + df.write.compress(classOf[GzipCodec]).mode(SaveMode.Overwrite).text(tempDirPath) + verifyFrame(sqlContext.read.text(tempDirPath)) + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index c997453803b0..929b154dd541 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -306,7 +306,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.sparkPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + @@ -336,7 +336,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.sparkPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +