Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.hadoop.io.compress.CompressionCodec

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -155,6 +157,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {
this.sortColumnNames = Option(colName +: colNames)
this
}
/*
* Specify the compression codec when saving it on hdfs.
*
* @since 2.0.0
*/
def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this just be a normal option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we shouldn't depend on Hadoop APIs in options, which is a user facing API. Nobody outside the Hadoop world knows how to use the CompressionCodec API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

this.extraOptions += ("compression.codec" -> codec.getCanonicalName)
this
}

/**
* Saves the content of the [[DataFrame]] at the specified path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.IOException

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

Expand Down Expand Up @@ -58,7 +59,8 @@ import org.apache.spark.util.Utils
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
mode: SaveMode)
mode: SaveMode,
codec: Option[Class[_ <: CompressionCodec]] = None)
extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
Expand Down Expand Up @@ -126,7 +128,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
""".stripMargin)

val writerContainer = if (partitionColumns.isEmpty && relation.maybeBucketSpec.isEmpty) {
new DefaultWriterContainer(relation, job, isAppend)
new DefaultWriterContainer(relation, job, isAppend, codec)
} else {
val output = df.queryExecution.executedPlan.output
val (partitionOutput, dataOutput) =
Expand All @@ -140,7 +142,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
output,
PartitioningUtils.DEFAULT_PARTITION_NAME,
sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES),
isAppend)
isAppend,
codec)
}

// This call shouldn't be put into the `try` block below because it only initializes and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory}
import org.apache.hadoop.util.StringUtils

import org.apache.spark.Logging
Expand Down Expand Up @@ -253,11 +254,18 @@ object ResolvedDataSource extends Logging {
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.

val codec = options.get("compression.codec").flatMap(e =>
Some(new CompressionCodecFactory(sqlContext.sparkContext.hadoopConfiguration)
.getCodecClassByName(e).asInstanceOf[Class[CompressionCodec]])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need error handling if an unknown codec name is given.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since user can only set compression through DataFrameWriter.compress(codec), it is very unlikely to set a unknown codec name. Even user set a unknown codec, I think it is fine to just throw exception on driver side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users can easily set unknown codecs via DataFrameWriter#options.
Also, I think it'd better to throw an exception with meaningful messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they could, but we didn't expose the compression property to user. So user don't know what property to set for compression.

)

sqlContext.executePlan(
InsertIntoHadoopFsRelation(
r,
data.logicalPlan,
mode)).toRdd
mode,
codec)).toRdd
r
case _ =>
sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ package org.apache.spark.sql.execution.datasources

import java.util.{Date, UUID}

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.{CompressionCodec, SnappyCodec}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Expand All @@ -39,7 +43,8 @@ import org.apache.spark.util.SerializableConfiguration
private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
@transient private val job: Job,
isAppend: Boolean)
isAppend: Boolean,
codec: Option[Class[_ <: CompressionCodec]] = None)
extends Logging with Serializable {

protected val dataSchema = relation.dataSchema
Expand Down Expand Up @@ -207,6 +212,11 @@ private[sql] abstract class BaseWriterContainer(
serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
serializableConf.value.setBoolean("mapred.task.is.map", true)
serializableConf.value.setInt("mapred.task.partition", 0)
codec.foreach { c =>
serializableConf.value.set("mapred.output.compress", "true")
serializableConf.value.set("mapred.output.compression.codec", c.getCanonicalName)
serializableConf.value.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
}

def commitTask(): Unit = {
Expand Down Expand Up @@ -239,8 +249,9 @@ private[sql] abstract class BaseWriterContainer(
private[sql] class DefaultWriterContainer(
relation: HadoopFsRelation,
job: Job,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
isAppend: Boolean,
codec: Option[Class[_ <: CompressionCodec]])
extends BaseWriterContainer(relation, job, isAppend, codec) {

def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
executorSideSetup(taskContext)
Expand Down Expand Up @@ -308,8 +319,9 @@ private[sql] class DynamicPartitionWriterContainer(
inputSchema: Seq[Attribute],
defaultPartitionName: String,
maxOpenFiles: Int,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
isAppend: Boolean,
codec: Option[Class[_ <: CompressionCodec]])
extends BaseWriterContainer(relation, job, isAppend, codec) {

private val bucketSpec = relation.maybeBucketSpec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.sql.execution.datasources.text

import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import com.google.common.io.Files
import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.sql._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -57,6 +60,13 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
}

test("compression") {
val tempDirPath = Files.createTempDir().getAbsolutePath;
val df = sqlContext.read.text(testFile)
df.write.compress(classOf[GzipCodec]).mode(SaveMode.Overwrite).text(tempDirPath)
verifyFrame(sqlContext.read.text(tempDirPath))
}

private def testFile: String = {
Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.sparkPlan match {
case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _)) => // OK
case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
Expand Down Expand Up @@ -336,7 +336,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.sparkPlan match {
case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _)) => // OK
case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
Expand Down