Skip to content

Commit dff28ea

Browse files
wangyumcloud-fan
authored andcommitted
[HADP-56882][SPARK-51281][SQL] DataFrameWriterV2 should respect the path option (apache#705)
* [SPARK-51281][SQL] DataFrameWriterV2 should respect the path option Unlike `DataFrameWriter.saveAsTable` where we explicitly get the "path" option and treat it as table location, `DataFrameWriterV2` doesn't do it and treats the "path" option as a normal option which doesn't have any real impact. This PR fixes it, and adds a legacy config to restore the old behavior. bug fix Yes, now `DataFrameWriterV2` can correctly write data to the specified path for file source tables. new test no Closes apache#50040 from cloud-fan/prop. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit a3671e5) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 25b586e commit dff28ea

File tree

3 files changed

+53
-18
lines changed

3 files changed

+53
-18
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5097,6 +5097,15 @@ object SQLConf {
50975097
.booleanConf
50985098
.createWithDefault(false)
50995099

5100+
val LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION =
5101+
buildConf("spark.sql.legacy.dataFrameWriterV2IgnorePathOption")
5102+
.internal()
5103+
.doc("When set to true, DataFrameWriterV2 ignores the 'path' option and always write data " +
5104+
"to the default table location.")
5105+
.version("3.5.0")
5106+
.booleanConf
5107+
.createWithDefault(false)
5108+
51005109
val MAX_REPAIR_PARTITION_NUM =
51015110
buildConf("spark.carmel.sql.repairPartition.maxNum")
51025111
.internal()

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ import org.apache.spark.annotation.Experimental
2424
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation}
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
2626
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OptionList, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
27+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2728
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
2829
import org.apache.spark.sql.errors.QueryCompilationErrors
2930
import org.apache.spark.sql.execution.QueryExecution
31+
import org.apache.spark.sql.internal.SQLConf
3032
import org.apache.spark.sql.types.IntegerType
3133

3234
/**
@@ -108,24 +110,30 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
108110
}
109111

110112
override def create(): Unit = {
111-
val tableSpec = UnresolvedTableSpec(
112-
properties = properties.toMap,
113-
provider = provider,
114-
optionExpression = OptionList(Seq.empty),
115-
location = None,
116-
comment = None,
117-
serde = None,
118-
external = false)
119113
runCommand(
120114
CreateTableAsSelect(
121115
UnresolvedIdentifier(tableName),
122116
partitioning.getOrElse(Seq.empty),
123117
logicalPlan,
124-
tableSpec,
118+
buildTableSpec(),
125119
options.toMap,
126120
false))
127121
}
128122

123+
private def buildTableSpec(): UnresolvedTableSpec = {
124+
val ignorePathOption = sparkSession.sessionState.conf.getConf(
125+
SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION)
126+
UnresolvedTableSpec(
127+
properties = properties.toMap,
128+
provider = provider,
129+
optionExpression = OptionList(Seq.empty),
130+
location = if (ignorePathOption) None else CaseInsensitiveMap(options.toMap).get("path"),
131+
comment = None,
132+
serde = None,
133+
external = false)
134+
}
135+
136+
/** @inheritdoc */
129137
override def replace(): Unit = {
130138
internalReplace(orCreate = false)
131139
}
@@ -197,19 +205,11 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
197205
}
198206

199207
private def internalReplace(orCreate: Boolean): Unit = {
200-
val tableSpec = UnresolvedTableSpec(
201-
properties = properties.toMap,
202-
provider = provider,
203-
optionExpression = OptionList(Seq.empty),
204-
location = None,
205-
comment = None,
206-
serde = None,
207-
external = false)
208208
runCommand(ReplaceTableAsSelect(
209209
UnresolvedIdentifier(tableName),
210210
partitioning.getOrElse(Seq.empty),
211211
logicalPlan,
212-
tableSpec,
212+
buildTableSpec(),
213213
writeOptions = options.toMap,
214214
orCreate = orCreate))
215215
}

sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,4 +789,30 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
789789
errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
790790
parameters = Map("methodName" -> "`writeTo`"))
791791
}
792+
793+
test("SPARK-51281: DataFrameWriterV2 should respect the path option") {
794+
def checkResults(df: DataFrame): Unit = {
795+
checkAnswer(df, spark.range(10).toDF())
796+
}
797+
798+
Seq(true, false).foreach { ignorePath =>
799+
withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) {
800+
withTable("t1", "t2") {
801+
spark.range(10).writeTo("t1").using("json").create()
802+
checkResults(spark.table("t1"))
803+
804+
withTempPath { p =>
805+
val path = p.getCanonicalPath
806+
spark.range(10).writeTo("t2").using("json").option("path", path).create()
807+
checkResults(spark.table("t2"))
808+
if (ignorePath) {
809+
assert(!p.exists())
810+
} else {
811+
checkResults(spark.read.json(path))
812+
}
813+
}
814+
}
815+
}
816+
}
817+
}
792818
}

0 commit comments

Comments
 (0)