Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 80 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import scala.collection.JavaConverters._

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.sources.HadoopFsRelation

Expand Down Expand Up @@ -128,6 +128,34 @@ final class DataFrameWriter private[sql](df: DataFrame) {
this
}

/**
* Buckets the output by the given columns. If specified, the output is laid out on the file
* system similar to Hive's bucketing scheme.
*
* This is applicable for Parquet, JSON and ORC.
*
* @since 2.0
*/
@scala.annotation.varargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the @SInCE annotations. and comments. These are public APIs

def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter = {
this.numBuckets = Option(numBuckets)
this.bucketColumnNames = Option(colName +: colNames)
this
}

/**
* Sorts the output in each bucket by the given columns.
*
* This is applicable for Parquet, JSON and ORC.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the limitation? The comment makes this seem bucketBy must be called first. If that is true, make that more clear.

*
* @since 2.0
*/
@scala.annotation.varargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intentionally not provide a way for sort order?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that needed?

def sortBy(colName: String, colNames: String*): DataFrameWriter = {
this.sortColumnNames = Option(colName +: colNames)
this
}

/**
* Saves the content of the [[DataFrame]] at the specified path.
*
Expand All @@ -144,10 +172,12 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 1.4.0
*/
def save(): Unit = {
assertNotBucketed()
ResolvedDataSource(
df.sqlContext,
source,
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
getBucketSpec,
mode,
extraOptions.toMap,
df)
Expand All @@ -166,6 +196,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}

private def insertInto(tableIdent: TableIdentifier): Unit = {
assertNotBucketed()
val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap)
val overwrite = mode == SaveMode.Overwrite

Expand All @@ -188,13 +219,47 @@ final class DataFrameWriter private[sql](df: DataFrame) {
ifNotExists = false)).toRdd
}

private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
parCols.map { col =>
df.logicalPlan.output
.map(_.name)
.find(df.sqlContext.analyzer.resolver(_, col))
.getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
cols.map(normalize(_, "Partition"))
}

private def normalizedBucketColNames: Option[Seq[String]] = bucketColumnNames.map { cols =>
cols.map(normalize(_, "Bucketing"))
}

private def normalizedSortColNames: Option[Seq[String]] = sortColumnNames.map { cols =>
cols.map(normalize(_, "Sorting"))
}

private def getBucketSpec: Option[BucketSpec] = {
if (sortColumnNames.isDefined) {
require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
}

for {
n <- numBuckets
} yield {
require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.")
BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be simplified to:

if (sortingColumns.isDefined) {
  require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
}

for {
  n <- numBuckets
  cols <- normalizedBucketCols
} yield {
  require(n > 0, "Bucket number must be greater than 0.")
  BucketSpec(n, cols, normalizedSortCols)
}

(require throws IllegalArgumentException when the condition is not met.)

}

/**
* The given column name may not be equal to any of the existing column names if we were in
* case-insensitive context. Normalize the given column name to the real one so that we don't
* need to care about case sensitivity afterwards.
*/
private def normalize(columnName: String, columnType: String): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add comment explaining what "normalize" does.

val validColumnNames = df.logicalPlan.output.map(_.name)
validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
.getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
s"existing columns (${validColumnNames.mkString(", ")})"))
}

private def assertNotBucketed(): Unit = {
if (numBuckets.isDefined || sortColumnNames.isDefined) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous discussion: #10498 (comment)

throw new IllegalArgumentException(
"Currently we don't support writing bucketed data to this data source.")
}
}

Expand Down Expand Up @@ -244,6 +309,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
source,
temporary = false,
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
getBucketSpec,
mode,
extraOptions.toMap,
df.logicalPlan)
Expand Down Expand Up @@ -372,4 +438,9 @@ final class DataFrameWriter private[sql](df: DataFrame) {

private var partitioningColumns: Option[Seq[String]] = None

private var bucketColumnNames: Option[Seq[String]] = None

private var numBuckets: Option[Int] = None

private var sortColumnNames: Option[Seq[String]] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case c: CreateTableUsing if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")

case CreateTableUsingAsSelect(tableIdent, provider, true, partitionsCols, mode, opts, query)
if partitionsCols.nonEmpty =>
case c: CreateTableUsingAsSelect if c.temporary && c.partitionColumns.nonEmpty =>
sys.error("Cannot create temporary partitioned table.")

case CreateTableUsingAsSelect(tableIdent, provider, true, _, mode, opts, query) =>
case c: CreateTableUsingAsSelect if c.temporary =>
val cmd = CreateTempTableUsingAsSelect(
tableIdent, provider, Array.empty[String], mode, opts, query)
c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child)
ExecutedCommand(cmd) :: Nil
case c: CreateTableUsingAsSelect if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class DDLParser(parseQuery: String => LogicalPlan)
provider,
temp.isDefined,
Array.empty[String],
bucketSpec = None,
mode,
options,
queryPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
|Actual: ${partitionColumns.mkString(", ")}
""".stripMargin)

val writerContainer = if (partitionColumns.isEmpty) {
val writerContainer = if (partitionColumns.isEmpty && relation.bucketSpec.isEmpty) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @davies, if we bucket without partition, we will go to the DynamicPartitionWriterContainer branch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks

new DefaultWriterContainer(relation, job, isAppend)
} else {
val output = df.queryExecution.executedPlan.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ object ResolvedDataSource extends Logging {
sqlContext: SQLContext,
provider: String,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
mode: SaveMode,
options: Map[String, String],
data: DataFrame): ResolvedDataSource = {
Expand Down Expand Up @@ -244,6 +245,7 @@ object ResolvedDataSource extends Logging {
Array(outputPath.toString),
Some(dataSchema.asNullable),
Some(partitionColumnsSchema(data.schema, partitionColumns, caseSensitive)),
bucketSpec,
caseInsensitiveOptions)

// For partitioned relation r, r.schema's column ordering can be different from the column
Expand Down
Loading