From ca08d3be00475d678e3e7b4fcd4fd478eb0f4659 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 11 Sep 2019 17:29:11 -0700 Subject: [PATCH 1/6] V1_BATCH_WRITE should also pass BATCH_WRITE checks --- .../datasources/v2/TableCapabilityCheck.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala index 660b6e763e056..72050e39d91ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2} +import org.apache.spark.sql.sources.v2.{SupportsWrite, Table} import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.BooleanType @@ -32,6 +33,11 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) { private def failAnalysis(msg: String): Unit = throw new AnalysisException(msg) + private def supportsBatchWrite(table: Table): Boolean = table match { + case supportsWrite: SupportsWrite => supportsWrite.supportsAny(BATCH_WRITE, V1_BATCH_WRITE) + case _ => false + } + override def apply(plan: LogicalPlan): Unit = { plan foreach { case r: DataSourceV2Relation if !r.table.supports(BATCH_READ) => @@ -44,7 +50,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) { // TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a // a logical plan for streaming write. - case AppendData(r: DataSourceV2Relation, _, _) if !r.table.supports(BATCH_WRITE) => + case AppendData(r: DataSourceV2Relation, _, _) if !supportsBatchWrite(r.table) => failAnalysis(s"Table ${r.table.name()} does not support append in batch mode.") case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _) @@ -54,13 +60,13 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) { case OverwriteByExpression(r: DataSourceV2Relation, expr, _, _) => expr match { case Literal(true, BooleanType) => - if (!r.table.supports(BATCH_WRITE) || - !r.table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER)) { + if (!supportsBatchWrite(r.table) || + !r.table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER)) { failAnalysis( s"Table ${r.table.name()} does not support truncate in batch mode.") } case _ => - if (!r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_BY_FILTER)) { + if (!supportsBatchWrite(r.table) || !r.table.supports(OVERWRITE_BY_FILTER)) { failAnalysis(s"Table ${r.table.name()} does not support " + "overwrite by filter in batch mode.") } From a3f87bf69968a7f6ebe4fd75e0f904c9db024db2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 11 Sep 2019 17:37:59 -0700 Subject: [PATCH 2/6] move partitioning check --- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 9 ++++----- .../spark/sql/sources/v2/V1WriteFallbackSuite.scala | 4 +--- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 04a611024eb24..5f04955aa0b3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -254,11 +254,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val maybeV2Provider = lookupV2Provider() if (maybeV2Provider.isDefined) { - if (partitioningColumns.nonEmpty) { - throw new AnalysisException( - "Cannot write data to TableProvider implementation if partition columns are specified.") - } - val provider = maybeV2Provider.get val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, df.sparkSession.sessionState.conf) @@ -268,6 +263,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ provider.getTable(dsOptions) match { case table: SupportsWrite if table.supports(BATCH_WRITE) => + if (partitioningColumns.nonEmpty) { + throw new AnalysisException("Cannot write data to TableProvider implementation " + + "if partition columns are specified.") + } lazy val relation = DataSourceV2Relation.create(table, dsOptions) modeForDSV2 match { case SaveMode.Append => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala index 9002775bce211..a9f46deff74d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala @@ -24,10 +24,9 @@ import scala.collection.mutable import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode, SparkSession} +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.connector.InMemoryTable -import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.sources.{DataSourceRegister, Filter, InsertableRelation} import org.apache.spark.sql.sources.v2.utils.TestV2SessionCatalogBase import org.apache.spark.sql.sources.v2.writer.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} @@ -144,7 +143,6 @@ class InMemoryTableWithV1Fallback( } override def capabilities: util.Set[TableCapability] = Set( - TableCapability.BATCH_WRITE, TableCapability.V1_BATCH_WRITE, TableCapability.OVERWRITE_BY_FILTER, TableCapability.TRUNCATE).asJava From 0f783474fdc176c48e75d5982a82f4cb57db824b Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 12 Sep 2019 12:32:14 -0700 Subject: [PATCH 3/6] save --- .../sql/sources/v2/V1WriteFallbackSuite.scala | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala index a9f46deff74d0..5962756d33d43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala @@ -24,10 +24,10 @@ import scala.collection.mutable import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode, SparkSession} +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.connector.InMemoryTable -import org.apache.spark.sql.sources.{DataSourceRegister, Filter, InsertableRelation} +import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources.v2.utils.TestV2SessionCatalogBase import org.apache.spark.sql.sources.v2.writer.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} import org.apache.spark.sql.test.SharedSparkSession @@ -117,11 +117,11 @@ private object InMemoryV1Provider { class InMemoryV1Provider extends TableProvider with DataSourceRegister { override def getTable(options: CaseInsensitiveStringMap): Table = { - InMemoryV1Provider.tables.getOrElseUpdate(options.get("name"), { + InMemoryV1Provider.tables.getOrElse(options.get("name"), { new InMemoryTableWithV1Fallback( "InMemoryTableWithV1Fallback", - new StructType().add("a", IntegerType).add("b", StringType), - Array(IdentityTransform(FieldReference(Seq("a")))), + new StructType(), + Array.empty, options.asCaseSensitiveMap() ) }) @@ -134,7 +134,10 @@ class InMemoryTableWithV1Fallback( override val name: String, override val schema: StructType, override val partitioning: Array[Transform], - override val properties: util.Map[String, String]) extends Table with SupportsWrite { + override val properties: util.Map[String, String]) + extends Table + with SupportsWrite + with CreatableRelationProvider { partitioning.foreach { t => if (!t.isInstanceOf[IdentityTransform]) { @@ -157,6 +160,14 @@ class InMemoryTableWithV1Fallback( new FallbackWriteBuilder(options) } + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + + } + private class FallbackWriteBuilder(options: CaseInsensitiveStringMap) extends WriteBuilder with V1WriteBuilder From cef1705a6f9356f9a0edbf575780fc16363ef5e3 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 17 Sep 2019 12:26:42 -0700 Subject: [PATCH 4/6] save so far --- .../datasources/v2/TableCapabilityCheck.scala | 11 +-- .../sql/connector/V1WriteFallbackSuite.scala | 67 +++++++++++++------ 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala index ef17496276f80..6b61c3f3da507 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala @@ -20,13 +20,9 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} +import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2} -<<<<<<< HEAD -import org.apache.spark.sql.sources.v2.{SupportsWrite, Table} -import org.apache.spark.sql.sources.v2.TableCapability._ -======= ->>>>>>> c56a012bc839cd2f92c2be41faea91d1acfba4eb import org.apache.spark.sql.types.BooleanType /** @@ -37,9 +33,8 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) { private def failAnalysis(msg: String): Unit = throw new AnalysisException(msg) - private def supportsBatchWrite(table: Table): Boolean = table match { - case supportsWrite: SupportsWrite => supportsWrite.supportsAny(BATCH_WRITE, V1_BATCH_WRITE) - case _ => false + private def supportsBatchWrite(table: Table): Boolean = { + table.supportsAny(BATCH_WRITE, V1_BATCH_WRITE) } override def apply(plan: LogicalPlan): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index 683bc23ebb49e..c04fce34c6ee9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SQLCo import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.connector.write.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} +import org.apache.spark.sql.execution.datasources.{DataSource, DataSourceUtils} import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructType} @@ -114,8 +115,12 @@ private object InMemoryV1Provider { } } -class InMemoryV1Provider extends TableProvider with DataSourceRegister { +class InMemoryV1Provider + extends TableProvider + with DataSourceRegister + with CreatableRelationProvider { override def getTable(options: CaseInsensitiveStringMap): Table = { + InMemoryV1Provider.tables.getOrElse(options.get("name"), { new InMemoryTableWithV1Fallback( "InMemoryTableWithV1Fallback", @@ -127,6 +132,45 @@ class InMemoryV1Provider extends TableProvider with DataSourceRegister { } override def shortName(): String = "in-memory" + + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + val _sqlContext = sqlContext + + val partitioning = parameters.get(DataSourceUtils.PARTITIONING_COLUMNS_KEY).map { value => + DataSourceUtils.decodePartitioningColumns(value).map { partitioningColumn => + + } + } + + val table = new InMemoryTableWithV1Fallback( + "InMemoryTableWithV1Fallback", + data.schema.asNullable, + Array.empty, + Map.empty[String, String].asJava + ) + + def getRelation: BaseRelation = new BaseRelation { + override def sqlContext: SQLContext = _sqlContext + override def schema: StructType = table.schema + } + + if (mode == SaveMode.ErrorIfExists && dataMap.nonEmpty) { + throw new AnalysisException("Table already exists") + } else if (mode == SaveMode.Ignore && dataMap.nonEmpty) { + // do nothing + return getRelation + } + val writer = new FallbackWriteBuilder(new CaseInsensitiveStringMap(parameters.asJava)) + if (mode == SaveMode.Overwrite) { + writer.truncate() + } + writer.buildForV1Write().insert(data, overwrite = false) + getRelation + } } class InMemoryTableWithV1Fallback( @@ -135,8 +179,7 @@ class InMemoryTableWithV1Fallback( override val partitioning: Array[Transform], override val properties: util.Map[String, String]) extends Table - with SupportsWrite - with CreatableRelationProvider { + with SupportsWrite { partitioning.foreach { t => if (!t.isInstanceOf[IdentityTransform]) { @@ -159,24 +202,6 @@ class InMemoryTableWithV1Fallback( new FallbackWriteBuilder(options) } - override def createRelation( - sqlContext: SQLContext, - mode: SaveMode, - parameters: Map[String, String], - data: DataFrame): BaseRelation = { - if (mode == SaveMode.ErrorIfExists && dataMap.nonEmpty) { - throw new AnalysisException("Table already exists") - } else if (mode == SaveMode.Ignore && dataMap.nonEmpty) { - // do nothing - } else if (mode == SaveMode.Overwrite) { - val writer = new FallbackWriteBuilder(new CaseInsensitiveStringMap(parameters.asJava)) - writer.truncate() - writer.buildForV1Write().insert(data, overwrite = false) - } else { - - } - } - private class FallbackWriteBuilder(options: CaseInsensitiveStringMap) extends WriteBuilder with V1WriteBuilder From 39cb1a27c494bcbe1c7c54ad775381175f19bc5e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 19 Sep 2019 08:46:28 -0700 Subject: [PATCH 5/6] fall back to saveV1Source --- .../sql/connector/V1WriteFallbackSuite.scala | 91 ++++++++++++++++--- 1 file changed, 80 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index c04fce34c6ee9..de843ba4375d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SQLContext, SaveMode, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.connector.write.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} @@ -53,7 +53,11 @@ class V1WriteFallbackSuite extends QueryTest with SharedSparkSession with Before test("append fallback") { val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") df.write.mode("append").option("name", "t1").format(v2Format).save() + checkAnswer(InMemoryV1Provider.getTableData(spark, "t1"), df) + assert(InMemoryV1Provider.tables("t1").schema === df.schema.asNullable) + assert(InMemoryV1Provider.tables("t1").partitioning.isEmpty) + df.write.mode("append").option("name", "t1").format(v2Format).save() checkAnswer(InMemoryV1Provider.getTableData(spark, "t1"), df.union(df)) } @@ -66,6 +70,59 @@ class V1WriteFallbackSuite extends QueryTest with SharedSparkSession with Before df2.write.mode("overwrite").option("name", "t1").format(v2Format).save() checkAnswer(InMemoryV1Provider.getTableData(spark, "t1"), df2) } + + SaveMode.values().foreach { mode => + test(s"save: new table creations with partitioning for table - mode: $mode") { + val format = classOf[InMemoryV1Provider].getName + val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") + df.write.mode(mode).option("name", "t1").format(format).partitionBy("a").save() + + checkAnswer(InMemoryV1Provider.getTableData(spark, "t1"), df) + assert(InMemoryV1Provider.tables("t1").schema === df.schema.asNullable) + assert(InMemoryV1Provider.tables("t1").partitioning.sameElements( + Array(IdentityTransform(FieldReference(Seq("a")))))) + } + } + + test("save: default mode is ErrorIfExists") { + val format = classOf[InMemoryV1Provider].getName + val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") + + df.write.option("name", "t1").format(format).partitionBy("a").save() + // default is ErrorIfExists, and since a table already exists we throw an exception + val e = intercept[AnalysisException] { + df.write.option("name", "t1").format(format).partitionBy("a").save() + } + assert(e.getMessage.contains("already exists")) + } + + test("save: Ignore mode") { + val format = classOf[InMemoryV1Provider].getName + val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") + + df.write.option("name", "t1").format(format).partitionBy("a").save() + // no-op + df.write.option("name", "t1").format(format).mode("ignore").partitionBy("a").save() + + checkAnswer(InMemoryV1Provider.getTableData(spark, "t1"), df) + } + + test("save: tables can perform schema and partitioning checks if they already exist") { + val format = classOf[InMemoryV1Provider].getName + val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") + + df.write.option("name", "t1").format(format).partitionBy("a").save() + val e2 = intercept[IllegalArgumentException] { + df.write.mode("append").option("name", "t1").format(format).partitionBy("b").save() + } + assert(e2.getMessage.contains("partitioning")) + + val e3 = intercept[IllegalArgumentException] { + Seq((1, "x")).toDF("c", "d").write.mode("append").option("name", "t1").format(format) + .save() + } + assert(e3.getMessage.contains("schema")) + } } class V1WriteFallbackSessionCatalogSuite @@ -142,33 +199,45 @@ class InMemoryV1Provider val partitioning = parameters.get(DataSourceUtils.PARTITIONING_COLUMNS_KEY).map { value => DataSourceUtils.decodePartitioningColumns(value).map { partitioningColumn => - + IdentityTransform(FieldReference(partitioningColumn)) } - } + }.getOrElse(Nil) - val table = new InMemoryTableWithV1Fallback( + val tableName = parameters("name") + val tableOpt = InMemoryV1Provider.tables.get(tableName) + val table = tableOpt.getOrElse(new InMemoryTableWithV1Fallback( "InMemoryTableWithV1Fallback", data.schema.asNullable, - Array.empty, + partitioning.toArray, Map.empty[String, String].asJava - ) + )) + if (tableOpt.isEmpty) { + InMemoryV1Provider.tables.put(tableName, table) + } else { + if (data.schema.asNullable != table.schema) { + throw new IllegalArgumentException("Wrong schema provided") + } + if (!partitioning.sameElements(table.partitioning)) { + throw new IllegalArgumentException("Wrong partitioning provided") + } + } def getRelation: BaseRelation = new BaseRelation { override def sqlContext: SQLContext = _sqlContext override def schema: StructType = table.schema } - if (mode == SaveMode.ErrorIfExists && dataMap.nonEmpty) { + if (mode == SaveMode.ErrorIfExists && tableOpt.isDefined) { throw new AnalysisException("Table already exists") - } else if (mode == SaveMode.Ignore && dataMap.nonEmpty) { + } else if (mode == SaveMode.Ignore && tableOpt.isDefined) { // do nothing return getRelation } - val writer = new FallbackWriteBuilder(new CaseInsensitiveStringMap(parameters.asJava)) + val writer = table.newWriteBuilder(new CaseInsensitiveStringMap(parameters.asJava)) if (mode == SaveMode.Overwrite) { - writer.truncate() + writer.asInstanceOf[SupportsTruncate].truncate() } - writer.buildForV1Write().insert(data, overwrite = false) + writer.asInstanceOf[V1WriteBuilder].buildForV1Write().insert(data, overwrite = false) getRelation } } From e8b5942830302511f89fa870f97b5ba574ad3a85 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 19 Sep 2019 09:05:08 -0700 Subject: [PATCH 6/6] add more tests --- .../connector/TableCapabilityCheckSuite.scala | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala index 39f4085a9baf9..ce6d56cf84df1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala @@ -98,16 +98,19 @@ class TableCapabilityCheckSuite extends AnalysisSuite with SharedSparkSession { } test("AppendData: check correct capabilities") { - val plan = AppendData.byName( - DataSourceV2Relation.create(CapabilityTable(BATCH_WRITE), CaseInsensitiveStringMap.empty), - TestRelation) + Seq(BATCH_WRITE, V1_BATCH_WRITE).foreach { write => + val plan = AppendData.byName( + DataSourceV2Relation.create(CapabilityTable(write), CaseInsensitiveStringMap.empty), + TestRelation) - TableCapabilityCheck.apply(plan) + TableCapabilityCheck.apply(plan) + } } test("Truncate: check missing capabilities") { Seq(CapabilityTable(), CapabilityTable(BATCH_WRITE), + CapabilityTable(V1_BATCH_WRITE), CapabilityTable(TRUNCATE), CapabilityTable(OVERWRITE_BY_FILTER)).foreach { table => @@ -125,7 +128,9 @@ class TableCapabilityCheckSuite extends AnalysisSuite with SharedSparkSession { test("Truncate: check correct capabilities") { Seq(CapabilityTable(BATCH_WRITE, TRUNCATE), - CapabilityTable(BATCH_WRITE, OVERWRITE_BY_FILTER)).foreach { table => + CapabilityTable(V1_BATCH_WRITE, TRUNCATE), + CapabilityTable(BATCH_WRITE, OVERWRITE_BY_FILTER), + CapabilityTable(V1_BATCH_WRITE, OVERWRITE_BY_FILTER)).foreach { table => val plan = OverwriteByExpression.byName( DataSourceV2Relation.create(table, CaseInsensitiveStringMap.empty), TestRelation, @@ -137,6 +142,7 @@ class TableCapabilityCheckSuite extends AnalysisSuite with SharedSparkSession { test("OverwriteByExpression: check missing capabilities") { Seq(CapabilityTable(), + CapabilityTable(V1_BATCH_WRITE), CapabilityTable(BATCH_WRITE), CapabilityTable(OVERWRITE_BY_FILTER)).foreach { table => @@ -153,12 +159,14 @@ class TableCapabilityCheckSuite extends AnalysisSuite with SharedSparkSession { } test("OverwriteByExpression: check correct capabilities") { - val table = CapabilityTable(BATCH_WRITE, OVERWRITE_BY_FILTER) - val plan = OverwriteByExpression.byName( - DataSourceV2Relation.create(table, CaseInsensitiveStringMap.empty), TestRelation, - EqualTo(AttributeReference("x", LongType)(), Literal(5))) + Seq(BATCH_WRITE, V1_BATCH_WRITE).foreach { write => + val table = CapabilityTable(write, OVERWRITE_BY_FILTER) + val plan = OverwriteByExpression.byName( + DataSourceV2Relation.create(table, CaseInsensitiveStringMap.empty), TestRelation, + EqualTo(AttributeReference("x", LongType)(), Literal(5))) - TableCapabilityCheck.apply(plan) + TableCapabilityCheck.apply(plan) + } } test("OverwritePartitionsDynamic: check missing capabilities") {