From 61a4c961b16990a235c97fc7d94699ea64b35055 Mon Sep 17 00:00:00 2001 From: liwensun Date: Fri, 12 Apr 2019 14:49:31 -0700 Subject: [PATCH 1/2] pass partitionBy as options --- .../apache/spark/sql/internal/SQLConf.scala | 9 +++++++++ .../apache/spark/sql/DataFrameWriter.scala | 10 +++++++++- .../datasources/DataSourceUtils.scala | 20 +++++++++++++++++++ .../sql/test/DataFrameReaderWriterSuite.scala | 19 ++++++++++++++++++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f33cc86a18a1b..036f4b513bf20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1687,6 +1687,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_PASS_PARTITION_BY_AS_OPTIONS = + buildConf("spark.sql.legacy.sources.write.passPartitionByAsOptions") + .internal() + .doc("Whether to pass the partitionBy columns as options in DataFrameWriter." + + " Data source V1 now silently drops partitionBy columns for non-file-format sources;" + + " turning the flag on provides a way for these sources to see these partitionBy columns.") + .booleanConf + .createWithDefault(true) + val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE = buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9371936047984..e83aa4b0fb5b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -29,8 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ @@ -313,6 +314,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } private def saveToV1Source(): Unit = { + if (df.sparkSession.sessionState.conf.getConf(SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS)) { + partitioningColumns.foreach { columns => + extraOptions += (DataSourceUtils.PARTITIONING_COLUMNS_KEY -> + DataSourceUtils.encodePartitioningColumns(columns)) + } + } + // Code path for data source v1. runCommand(df.sparkSession, "save") { DataSource( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 74eae94e65b00..0ad914e406107 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -18,12 +18,32 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.fs.Path +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.types._ object DataSourceUtils { + /** + * The key to use for storing partitionBy columns as options. + */ + val PARTITIONING_COLUMNS_KEY = "__partition_columns" + + /** + * Utility methods for converting partitionBy columns to options and back. + */ + private implicit val formats = Serialization.formats(NoTypeHints) + + def encodePartitioningColumns(columns: Seq[String]): String = { + Serialization.write(columns) + } + + def decodePartitioningColumns(str: String): Seq[String] = { + Serialization.read[Seq[String]](str) + } + /** * Verify if the schema is supported in datasource. This verification should be done * in a driver side. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 2569085bec086..deab313091591 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -219,6 +220,24 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be assert(LastOptions.parameters("opt3") == "3") } + test("pass partitionBy as options") { + Seq(true, false).foreach { flag => + withSQLConf(SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS.key -> s"$flag") { + Seq(1).toDF.write + .format("org.apache.spark.sql.test") + .partitionBy("col1", "col2") + .save() + + if (flag) { + val partColumns = LastOptions.parameters(DataSourceUtils.PARTITIONING_COLUMNS_KEY) + assert(DataSourceUtils.decodePartitioningColumns(partColumns) === Seq("col1", "col2")) + } else { + assert(!LastOptions.parameters.contains(DataSourceUtils.PARTITIONING_COLUMNS_KEY)) + } + } + } + } + test("save mode") { val df = spark.read .format("org.apache.spark.sql.test") From c8cdd014eabd657589ed3feefdf1693e9bbb1a7a Mon Sep 17 00:00:00 2001 From: liwensun Date: Mon, 15 Apr 2019 15:24:01 -0700 Subject: [PATCH 2/2] address comments --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++---- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 036f4b513bf20..7b9280031edbb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1690,11 +1690,11 @@ object SQLConf { val LEGACY_PASS_PARTITION_BY_AS_OPTIONS = buildConf("spark.sql.legacy.sources.write.passPartitionByAsOptions") .internal() - .doc("Whether to pass the partitionBy columns as options in DataFrameWriter." + - " Data source V1 now silently drops partitionBy columns for non-file-format sources;" + - " turning the flag on provides a way for these sources to see these partitionBy columns.") + .doc("Whether to pass the partitionBy columns as options in DataFrameWriter. " + + "Data source V1 now silently drops partitionBy columns for non-file-format sources; " + + "turning the flag on provides a way for these sources to see these partitionBy columns.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE = buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e83aa4b0fb5b5..3b8415121c275 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -314,7 +314,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } private def saveToV1Source(): Unit = { - if (df.sparkSession.sessionState.conf.getConf(SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS)) { + if (SparkSession.active.sessionState.conf.getConf( + SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS)) { partitioningColumns.foreach { columns => extraOptions += (DataSourceUtils.PARTITIONING_COLUMNS_KEY -> DataSourceUtils.encodePartitioningColumns(columns))