Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_PASS_PARTITION_BY_AS_OPTIONS =
buildConf("spark.sql.legacy.sources.write.passPartitionByAsOptions")
.internal()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this must be internal?

I see some of the previous ones with LEGACY_ prefix are internal but before them there are a few externals: LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED and LEGACY_SIZE_OF_NULL. So what is the reason defining this as internal?

I assume if we expect the users to set it it then it must be external.
What is your (and the others) opinion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to document this flag for users. This is just in case we break some existing production workloads. For other users, they don't need to know anything about this flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see. Thanks for the explanation.

.doc("Whether to pass the partitionBy columns as options in DataFrameWriter. " +
"Data source V1 now silently drops partitionBy columns for non-file-format sources; " +
"turning the flag on provides a way for these sources to see these partitionBy columns.")
.booleanConf
.createWithDefault(false)

val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE =
buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
Expand Down Expand Up @@ -313,6 +314,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}

private def saveToV1Source(): Unit = {
if (SparkSession.active.sessionState.conf.getConf(
SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS)) {
partitioningColumns.foreach { columns =>
extraOptions += (DataSourceUtils.PARTITIONING_COLUMNS_KEY ->
DataSourceUtils.encodePartitioningColumns(columns))
}
}

// Code path for data source v1.
runCommand(df.sparkSession, "save") {
DataSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,32 @@
package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.types._


object DataSourceUtils {
/**
* The key to use for storing partitionBy columns as options.
*/
val PARTITIONING_COLUMNS_KEY = "__partition_columns"

/**
* Utility methods for converting partitionBy columns to options and back.
*/
private implicit val formats = Serialization.formats(NoTypeHints)

def encodePartitioningColumns(columns: Seq[String]): String = {
Serialization.write(columns)
}

def decodePartitioningColumns(str: String): Seq[String] = {
Serialization.read[Seq[String]](str)
}

/**
* Verify if the schema is supported in datasource. This verification should be done
* in a driver side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -219,6 +220,24 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
assert(LastOptions.parameters("opt3") == "3")
}

test("pass partitionBy as options") {
Seq(true, false).foreach { flag =>
withSQLConf(SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS.key -> s"$flag") {
Seq(1).toDF.write
.format("org.apache.spark.sql.test")
.partitionBy("col1", "col2")
.save()

if (flag) {
val partColumns = LastOptions.parameters(DataSourceUtils.PARTITIONING_COLUMNS_KEY)
assert(DataSourceUtils.decodePartitioningColumns(partColumns) === Seq("col1", "col2"))
} else {
assert(!LastOptions.parameters.contains(DataSourceUtils.PARTITIONING_COLUMNS_KEY))
}
}
}
}

test("save mode") {
val df = spark.read
.format("org.apache.spark.sql.test")
Expand Down