Skip to content
81 changes: 75 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ import java.util.{Locale, Properties, UUID}
import scala.collection.JavaConverters._

import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
Expand Down Expand Up @@ -360,6 +360,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*/
def insertInto(tableName: String): Unit = {
import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier}
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

assertNotBucketed("insertInto")

Expand All @@ -374,8 +375,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
df.sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
insertInto(catalog, ident)
// TODO(SPARK-28667): Support the V2SessionCatalog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but just a note. I've been working on V2SessionCatalog improvement for a while, and one issue I found is: it's not easy to implement "if the table provider is v1, go to v1 session catalog. if the table provider is v2, go to v2 session catalog".

It's easy to do it for CREATE TABLE, because the table provider is known at the beginning. But it's hard for SELECT and INSERT, as we need to look up the table from Hive catalog to get the table provider.

I'd expect to have 2 analyzer rules to do it:

  1. one rule in sql/catalyst, which resolves UnresolvedRelation to UnresolvedCatalogRelation by looking up the table from hive catalog
  2. one rule in sql/core, which resolves UnresolvedCatalogRelation to DataSourceV2Relation, if the table provider is v2. This has to be in sql/core because DataSource.lookupDataSource is in sql/core. We have a rule FindDataSourceTable that resolves UnresolvedCatalogRelation to v1 relation if the table provider is v1.

I think DataFrameWriter should produce ...Statement plans, to reuse the analyzer rules for all the things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the V2SessionCatalog already produces a CatalogTableAsV2, which gets unwrapped to the UnresolvedCatalogRelation already in DataSourceResolution, and then it can get resolved to the V1 code paths. I've also had a similar problem when trying to implement V2SessionCatalog support for ALTER TABLE. In that case I decided that it should reject and fallback to V1 plans if the loaded table is a CatalogTableAsV2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2SessionCatalog already produces a CatalogTableAsV2

Yes, for both v1 and v2 provider. So we still need a rule to check the table provider, and either unwrap it to a UnresolvedCatalogRelation, or create the actual v2 table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for both v1 and v2 provider. So we still need a rule to check the table provider, and either unwrap it to a UnresolvedCatalogRelation, or create the actual v2 table.

This should be done in loadTable. The original implementation called TableProvider to create the table, but this path is currently broken. I think that TableProvider needs to be improved so that it works instead of adding more rules to convert from CatalogTableAsV2.

The catalog should return the correct Table instance for all v2 tables. Spark shouldn't convert between v2 table instances.

case AsTableIdentifier(tableIdentifier) =>
insertInto(tableIdentifier)
case other =>
throw new AnalysisException(
s"Couldn't find a catalog to handle the identifier ${other.quoted}.")
}
}

Expand Down Expand Up @@ -485,7 +490,71 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def saveAsTable(tableName: String): Unit = {
saveAsTable(df.sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName))
import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier}
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
val session = df.sparkSession
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the local variable is used only once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use it in the follow up :)


session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
saveAsTable(catalog.asTableCatalog, ident, modeForDSV2)
// TODO(SPARK-28666): This should go through V2SessionCatalog

case AsTableIdentifier(tableIdentifier) =>
saveAsTable(tableIdentifier)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about cases where AsTableIdentifier won't match? For example, saveAsTable("ns1.ns2.table"). For those cases, we need a case _ => that throws an exception because the table name is not supported by the session catalog.


case other =>
throw new AnalysisException(
s"Couldn't find a catalog to handle the identifier ${other.quoted}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an analysis error. The catalog was None, so it belongs to the session catalog. But the session catalog doesn't support namespaces with more than one part, so it is an invalid identifier for that catalog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah gotcha, that's the 4th case... I'd like to make that change along with the V2SessionCatalog support. Right now we just couldn't find a catalog for it, so... (Maybe they forgot to specify a default catalog).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. saveAsTable("ns1.ns2.table") means we want to save the table to the builtin catalog, but ns1.ns2 is not a valid namespace to the builtin catalog. (or we can report namespace not found?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but that could also mean that they forgot to configure their default catalog too, right?
I can throw a better error when the SessionCatalog code path is implemented, since we need that before we can even say that ns1.ns2.table cannot be handled by the session catalog.

}
}


private def saveAsTable(catalog: TableCatalog, ident: Identifier, mode: SaveMode): Unit = {
val partitioning = partitioningColumns.map { colNames =>
colNames.map(name => IdentityTransform(FieldReference(name)))
}.getOrElse(Seq.empty[Transform])
val bucketing = bucketColumnNames.map { cols =>
Seq(BucketTransform(LiteralValue(numBuckets.get, IntegerType), cols.map(FieldReference(_))))
}.getOrElse(Seq.empty[Transform])
val partitionTransforms = partitioning ++ bucketing

val tableOpt = try Option(catalog.loadTable(ident)) catch {
case _: NoSuchTableException => None
}

val command = (mode, tableOpt) match {
case (SaveMode.Append, Some(table)) =>
AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan)

case (SaveMode.Overwrite, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the behavior here is to truncate and write, not replace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may want to change the table properties though, such as partitioning and schema, wouldn't we?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to match the behavior of v1 file sources. Do we know what that behavior is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in DSV2 sync, the old behavior was to drop the old table and create a new one, therefore the Replace behavior here works.

ReplaceTableAsSelect(
catalog,
ident,
partitionTransforms,
df.queryExecution.analyzed,
Map.empty, // properties can't be specified through this API
extraOptions.toMap,
orCreate = true) // Create the table if it doesn't exist

case (other, _) =>
// We have a potential race condition here in AppendMode, if the table suddenly gets
// created between our existence check and physical execution, but this can't be helped
// in any case.
Copy link
Contributor

@cloud-fan cloud-fan Aug 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we treat saveAsTable as one atomic operation, I think AppendMode means "create or append table". Since "create or append table" is not a standard SQL commannd, maybe it's fine to ignore this race condition.

CreateTableAsSelect(
catalog,
ident,
partitionTransforms,
df.queryExecution.analyzed,
Map.empty,
extraOptions.toMap,
ignoreIfExists = other == SaveMode.Ignore)
}

runCommand(df.sparkSession, "saveAsTable") {
command
}
}

private def saveAsTable(tableIdent: TableIdentifier): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.sources.v2

import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
import org.apache.spark.sql.test.SharedSQLContext

Expand Down Expand Up @@ -141,4 +141,66 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be
}
}
}

testQuietly("saveAsTable: table doesn't exist => create table") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for curiosity, why do we need testQuietly here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was before merging Gengliang's "Catalogs.load should work for inbuilt catalogs" PR. It may not be needed now, but I don't think it's a big deal

val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.saveAsTable(t1)
checkAnswer(spark.table(t1), df)
}
}

testQuietly("saveAsTable: table exists => append by name") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
// Default saveMode is append, therefore this doesn't throw a table already exists exception
df.write.saveAsTable(t1)
checkAnswer(spark.table(t1), df)

// also appends are by name not by position
df.select('data, 'id).write.saveAsTable(t1)
checkAnswer(spark.table(t1), df.union(df))
}
}

testQuietly("saveAsTable: table overwrite and table doesn't exist => create table") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.mode("overwrite").saveAsTable(t1)
checkAnswer(spark.table(t1), df)
}
}

testQuietly("saveAsTable: table overwrite and table exists => replace table") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.mode("overwrite").saveAsTable(t1)
checkAnswer(spark.table(t1), df)
}
}

testQuietly("saveAsTable: ignore mode and table doesn't exist => create table") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.mode("ignore").saveAsTable(t1)
checkAnswer(spark.table(t1), df)
}
}

testQuietly("saveAsTable: ignore mode and table exists => do nothing") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'")
df.write.mode("ignore").saveAsTable(t1)
checkAnswer(spark.table(t1), Seq(Row("c", "d")))
}
}
}