From 7a9b72dcd22af9a45111ce5b460fbd3620943318 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 24 Nov 2020 11:54:50 -0800 Subject: [PATCH 1/3] SPARK-33492 followup --- .../datasources/v2/DataSourceV2Strategy.scala | 18 ++++++++++------ .../datasources/v2/V1FallbackWriters.scala | 14 ++++++------- .../v2/WriteToDataSourceV2Exec.scala | 21 ++++++++----------- 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 30d976524bfa8..15c4c2b539e25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -170,11 +170,13 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } case AppendData(r: DataSourceV2Relation, query, writeOptions, _) => + val refreshCache = () => session.sharedState.cacheManager.recacheByPlan(session, r) r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => - AppendDataExecV1(v1, writeOptions.asOptions, query, r) :: Nil + AppendDataExecV1(v1, writeOptions.asOptions, query, afterWrite = refreshCache) :: Nil case v2 => - AppendDataExec(session, v2, r, writeOptions.asOptions, planLater(query)) :: Nil + AppendDataExec(v2, writeOptions.asOptions, planLater(query), + afterWrite = refreshCache) :: Nil } case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) => @@ -184,17 +186,21 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat supportNestedPredicatePushdown = true).getOrElse( throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) }.toArray + val refreshCache = () => session.sharedState.cacheManager.recacheByPlan(session, r) r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => - OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, query, r) :: Nil + OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, + query, afterWrite = refreshCache) :: Nil case v2 => - OverwriteByExpressionExec(session, v2, r, filters, - writeOptions.asOptions, planLater(query)) :: Nil + OverwriteByExpressionExec(v2, filters, + writeOptions.asOptions, planLater(query), afterWrite = refreshCache) :: Nil } case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) => + val refreshCache = () => session.sharedState.cacheManager.recacheByPlan(session, r) OverwritePartitionsDynamicExec( - session, r.table.asWritable, r, writeOptions.asOptions, planLater(query)) :: Nil + r.table.asWritable, writeOptions.asOptions, planLater(query), + afterWrite = refreshCache) :: Nil case DeleteFromTable(relation, condition) => relation match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index af7721588edeb..41b0a7fc69ba1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -38,10 +38,10 @@ case class AppendDataExecV1( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan, - v2Relation: DataSourceV2Relation) extends V1FallbackWriters { + afterWrite: () => Unit = () => ()) extends V1FallbackWriters { override protected def run(): Seq[InternalRow] = { - writeWithV1(newWriteBuilder().buildForV1Write(), Some(v2Relation)) + writeWithV1(newWriteBuilder().buildForV1Write(), afterWrite = afterWrite) } } @@ -61,7 +61,7 @@ case class OverwriteByExpressionExecV1( deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan, - v2Relation: DataSourceV2Relation) extends V1FallbackWriters { + afterWrite: () => Unit = () => ()) extends V1FallbackWriters { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] @@ -70,10 +70,10 @@ case class OverwriteByExpressionExecV1( override protected def run(): Seq[InternalRow] = { newWriteBuilder() match { case builder: SupportsTruncate if isTruncate(deleteWhere) => - writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), Some(v2Relation)) + writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), afterWrite) case builder: SupportsOverwrite => - writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(), Some(v2Relation)) + writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(), afterWrite) case _ => throw new SparkException(s"Table does not support overwrite by expression: $table") @@ -116,11 +116,11 @@ trait SupportsV1Write extends SparkPlan { protected def writeWithV1( relation: InsertableRelation, - v2Relation: Option[DataSourceV2Relation] = None): Seq[InternalRow] = { + afterWrite: () => Unit = () => ()): Seq[InternalRow] = { val session = sqlContext.sparkSession // The `plan` is already optimized, we should not analyze and optimize it again. relation.insert(AlreadyOptimized.dataFrame(session, plan), overwrite = false) - v2Relation.foreach(r => session.sharedState.cacheManager.recacheByPlan(session, r)) + afterWrite() Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 1648134d0a1b2..ee53a7288d569 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -213,15 +213,14 @@ case class AtomicReplaceTableAsSelectExec( * Rows in the output data set are appended. */ case class AppendDataExec( - session: SparkSession, table: SupportsWrite, - relation: DataSourceV2Relation, writeOptions: CaseInsensitiveStringMap, - query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + query: SparkPlan, + afterWrite: () => Unit = () => ()) extends V2TableWriteExec with BatchWriteHelper { override protected def run(): Seq[InternalRow] = { val writtenRows = writeWithV2(newWriteBuilder().buildForBatch()) - session.sharedState.cacheManager.recacheByPlan(session, relation) + afterWrite() writtenRows } } @@ -237,12 +236,11 @@ case class AppendDataExec( * AlwaysTrue to delete all rows. */ case class OverwriteByExpressionExec( - session: SparkSession, table: SupportsWrite, - relation: DataSourceV2Relation, deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, - query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + query: SparkPlan, + afterWrite: () => Unit = () => ()) extends V2TableWriteExec with BatchWriteHelper { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] @@ -259,7 +257,7 @@ case class OverwriteByExpressionExec( case _ => throw new SparkException(s"Table does not support overwrite by expression: $table") } - session.sharedState.cacheManager.recacheByPlan(session, relation) + afterWrite() writtenRows } } @@ -275,11 +273,10 @@ case class OverwriteByExpressionExec( * are not modified. */ case class OverwritePartitionsDynamicExec( - session: SparkSession, table: SupportsWrite, - relation: DataSourceV2Relation, writeOptions: CaseInsensitiveStringMap, - query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + query: SparkPlan, + afterWrite: () => Unit = () => ()) extends V2TableWriteExec with BatchWriteHelper { override protected def run(): Seq[InternalRow] = { val writtenRows = newWriteBuilder() match { @@ -289,7 +286,7 @@ case class OverwritePartitionsDynamicExec( case _ => throw new SparkException(s"Table does not support dynamic partition overwrite: $table") } - session.sharedState.cacheManager.recacheByPlan(session, relation) + afterWrite() writtenRows } } From c754b85a4de50f04f5cf2116e5021855724adac8 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 25 Nov 2020 14:55:35 -0800 Subject: [PATCH 2/3] handle DropTableExec & RefreshTableExec --- .../datasources/v2/DataSourceV2Strategy.scala | 28 +++++++++++-------- .../datasources/v2/DropTableExec.scala | 11 +++----- .../datasources/v2/RefreshTableExec.scala | 11 +++----- .../datasources/v2/V1FallbackWriters.scala | 9 +++--- .../v2/WriteToDataSourceV2Exec.scala | 6 ++-- 5 files changed, 32 insertions(+), 33 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 15c4c2b539e25..f37fffca999ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -52,6 +52,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } } + private def refreshCache(r: DataSourceV2Relation)(): Unit = { + session.sharedState.cacheManager.recacheByPlan(session, r) + } + + private def invalidateCache(r: ResolvedTable)(): Unit = { + val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier)) + session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) + } + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(project, filters, relation @ DataSourceV2ScanRelation(_, V1ScanWrapper(scan, translated, pushed), output)) => @@ -128,7 +137,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } case RefreshTable(r: ResolvedTable) => - RefreshTableExec(session, r.catalog, r.table, r.identifier) :: Nil + RefreshTableExec(r.catalog, r.identifier, invalidateCache(r)) :: Nil case ReplaceTable(catalog, ident, schema, parts, props, orCreate) => val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) @@ -170,13 +179,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } case AppendData(r: DataSourceV2Relation, query, writeOptions, _) => - val refreshCache = () => session.sharedState.cacheManager.recacheByPlan(session, r) r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => - AppendDataExecV1(v1, writeOptions.asOptions, query, afterWrite = refreshCache) :: Nil + AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r)) :: Nil case v2 => - AppendDataExec(v2, writeOptions.asOptions, planLater(query), - afterWrite = refreshCache) :: Nil + AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil } case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) => @@ -186,21 +193,18 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat supportNestedPredicatePushdown = true).getOrElse( throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) }.toArray - val refreshCache = () => session.sharedState.cacheManager.recacheByPlan(session, r) r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, - query, afterWrite = refreshCache) :: Nil + query, refreshCache(r)) :: Nil case v2 => OverwriteByExpressionExec(v2, filters, - writeOptions.asOptions, planLater(query), afterWrite = refreshCache) :: Nil + writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil } case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) => - val refreshCache = () => session.sharedState.cacheManager.recacheByPlan(session, r) OverwritePartitionsDynamicExec( - r.table.asWritable, writeOptions.asOptions, planLater(query), - afterWrite = refreshCache) :: Nil + r.table.asWritable, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil case DeleteFromTable(relation, condition) => relation match { @@ -238,7 +242,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat throw new AnalysisException("Describing columns is not supported for v2 tables.") case DropTable(r: ResolvedTable, ifExists, purge) => - DropTableExec(session, r.catalog, r.table, r.identifier, ifExists, purge) :: Nil + DropTableExec(r.catalog, r.identifier, ifExists, purge, invalidateCache(r)) :: Nil case _: NoopDropTable => LocalTableScanExec(Nil, Nil) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala index 068475fc56f47..f89b89096772a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -17,27 +17,24 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} /** * Physical plan node for dropping a table. */ case class DropTableExec( - session: SparkSession, catalog: TableCatalog, - table: Table, ident: Identifier, ifExists: Boolean, - purge: Boolean) extends V2CommandExec { + purge: Boolean, + invalidateCache: () => Unit) extends V2CommandExec { override def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { - val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) - session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) + invalidateCache() catalog.dropTable(ident, purge) } else if (!ifExists) { throw new NoSuchTableException(ident) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala index 52836de5a926b..994583c1e338f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala @@ -17,23 +17,20 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} case class RefreshTableExec( - session: SparkSession, catalog: TableCatalog, - table: Table, - ident: Identifier) extends V2CommandExec { + ident: Identifier, + invalidateCache: () => Unit) extends V2CommandExec { override protected def run(): Seq[InternalRow] = { catalog.invalidateTable(ident) // invalidate all caches referencing the given table // TODO(SPARK-33437): re-cache the table itself once we support caching a DSv2 table - val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) - session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) + invalidateCache() Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index 41b0a7fc69ba1..e5847181a0e06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -38,7 +38,7 @@ case class AppendDataExecV1( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan, - afterWrite: () => Unit = () => ()) extends V1FallbackWriters { + afterWrite: () => Unit) extends V1FallbackWriters { override protected def run(): Seq[InternalRow] = { writeWithV1(newWriteBuilder().buildForV1Write(), afterWrite = afterWrite) @@ -61,7 +61,7 @@ case class OverwriteByExpressionExecV1( deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan, - afterWrite: () => Unit = () => ()) extends V1FallbackWriters { + afterWrite: () => Unit) extends V1FallbackWriters { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] @@ -70,10 +70,11 @@ case class OverwriteByExpressionExecV1( override protected def run(): Seq[InternalRow] = { newWriteBuilder() match { case builder: SupportsTruncate if isTruncate(deleteWhere) => - writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), afterWrite) + writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), afterWrite = afterWrite) case builder: SupportsOverwrite => - writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(), afterWrite) + writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(), + afterWrite = afterWrite) case _ => throw new SparkException(s"Table does not support overwrite by expression: $table") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index ee53a7288d569..939611bfd40a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -216,7 +216,7 @@ case class AppendDataExec( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, query: SparkPlan, - afterWrite: () => Unit = () => ()) extends V2TableWriteExec with BatchWriteHelper { + afterWrite: () => Unit) extends V2TableWriteExec with BatchWriteHelper { override protected def run(): Seq[InternalRow] = { val writtenRows = writeWithV2(newWriteBuilder().buildForBatch()) @@ -240,7 +240,7 @@ case class OverwriteByExpressionExec( deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, query: SparkPlan, - afterWrite: () => Unit = () => ()) extends V2TableWriteExec with BatchWriteHelper { + afterWrite: () => Unit) extends V2TableWriteExec with BatchWriteHelper { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] @@ -276,7 +276,7 @@ case class OverwritePartitionsDynamicExec( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, query: SparkPlan, - afterWrite: () => Unit = () => ()) extends V2TableWriteExec with BatchWriteHelper { + afterWrite: () => Unit) extends V2TableWriteExec with BatchWriteHelper { override protected def run(): Seq[InternalRow] = { val writtenRows = newWriteBuilder() match { From 98e9edeabc7df0f5b01c8f8cd9c886134bd0d85a Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Sat, 28 Nov 2020 10:36:06 -0800 Subject: [PATCH 3/3] comments --- .../datasources/v2/V1FallbackWriters.scala | 14 +++++++------- .../datasources/v2/WriteToDataSourceV2Exec.scala | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index e5847181a0e06..9d2cea9fbaff3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -38,10 +38,10 @@ case class AppendDataExecV1( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan, - afterWrite: () => Unit) extends V1FallbackWriters { + refreshCache: () => Unit) extends V1FallbackWriters { override protected def run(): Seq[InternalRow] = { - writeWithV1(newWriteBuilder().buildForV1Write(), afterWrite = afterWrite) + writeWithV1(newWriteBuilder().buildForV1Write(), refreshCache = refreshCache) } } @@ -61,7 +61,7 @@ case class OverwriteByExpressionExecV1( deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan, - afterWrite: () => Unit) extends V1FallbackWriters { + refreshCache: () => Unit) extends V1FallbackWriters { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] @@ -70,11 +70,11 @@ case class OverwriteByExpressionExecV1( override protected def run(): Seq[InternalRow] = { newWriteBuilder() match { case builder: SupportsTruncate if isTruncate(deleteWhere) => - writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), afterWrite = afterWrite) + writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), refreshCache = refreshCache) case builder: SupportsOverwrite => writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(), - afterWrite = afterWrite) + refreshCache = refreshCache) case _ => throw new SparkException(s"Table does not support overwrite by expression: $table") @@ -117,11 +117,11 @@ trait SupportsV1Write extends SparkPlan { protected def writeWithV1( relation: InsertableRelation, - afterWrite: () => Unit = () => ()): Seq[InternalRow] = { + refreshCache: () => Unit = () => ()): Seq[InternalRow] = { val session = sqlContext.sparkSession // The `plan` is already optimized, we should not analyze and optimize it again. relation.insert(AlreadyOptimized.dataFrame(session, plan), overwrite = false) - afterWrite() + refreshCache() Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 939611bfd40a6..47aad2bcb2c56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -216,11 +216,11 @@ case class AppendDataExec( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, query: SparkPlan, - afterWrite: () => Unit) extends V2TableWriteExec with BatchWriteHelper { + refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper { override protected def run(): Seq[InternalRow] = { val writtenRows = writeWithV2(newWriteBuilder().buildForBatch()) - afterWrite() + refreshCache() writtenRows } } @@ -240,7 +240,7 @@ case class OverwriteByExpressionExec( deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, query: SparkPlan, - afterWrite: () => Unit) extends V2TableWriteExec with BatchWriteHelper { + refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] @@ -257,7 +257,7 @@ case class OverwriteByExpressionExec( case _ => throw new SparkException(s"Table does not support overwrite by expression: $table") } - afterWrite() + refreshCache() writtenRows } } @@ -276,7 +276,7 @@ case class OverwritePartitionsDynamicExec( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, query: SparkPlan, - afterWrite: () => Unit) extends V2TableWriteExec with BatchWriteHelper { + refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper { override protected def run(): Seq[InternalRow] = { val writtenRows = newWriteBuilder() match { @@ -286,7 +286,7 @@ case class OverwritePartitionsDynamicExec( case _ => throw new SparkException(s"Table does not support dynamic partition overwrite: $table") } - afterWrite() + refreshCache() writtenRows } }