From 2753af5c2adbbb0c27decda3afb7a06e8ff1a31f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 12 Sep 2019 22:21:10 +0800 Subject: [PATCH] ResolveInsertInto should not do table look up --- .../sql/catalyst/analysis/Analyzer.scala | 58 +++++++++---------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8e6be32bcf70e..413c75d2581d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -671,6 +671,15 @@ class Analyzer( case scala.Right(tableOpt) => tableOpt } v2TableOpt.map(DataSourceV2Relation.create).getOrElse(u) + + case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved => + val v2TableOpt = lookupV2Relation(u.multipartIdentifier) match { + case scala.Left((_, _, tableOpt)) => tableOpt + case scala.Right(tableOpt) => tableOpt + } + v2TableOpt.map(DataSourceV2Relation.create).map { v2Relation => + i.copy(table = v2Relation) + }.getOrElse(i) } } @@ -785,41 +794,28 @@ class Analyzer( object ResolveInsertInto extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved => - lookupV2Relation(u.multipartIdentifier) match { - case scala.Left((_, _, Some(v2Table: Table))) => - resolveV2Insert(i, v2Table) - case scala.Right(Some(v2Table: Table)) => - resolveV2Insert(i, v2Table) - case _ => - i + case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _) if i.query.resolved => + // ifPartitionNotExists is append with validation, but validation is not supported + if (i.ifPartitionNotExists) { + throw new AnalysisException( + s"Cannot write, IF NOT EXISTS is not supported for table: ${r.table.name}") } - } - - private def resolveV2Insert(i: InsertIntoStatement, table: Table): LogicalPlan = { - val relation = DataSourceV2Relation.create(table) - // ifPartitionNotExists is append with validation, but validation is not supported - if (i.ifPartitionNotExists) { - throw new AnalysisException( - s"Cannot write, IF NOT EXISTS is not supported for table: ${relation.table.name}") - } - val partCols = partitionColumnNames(relation.table) - validatePartitionSpec(partCols, i.partitionSpec) + val partCols = partitionColumnNames(r.table) + validatePartitionSpec(partCols, i.partitionSpec) - val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get) - val query = addStaticPartitionColumns(relation, i.query, staticPartitions) - val dynamicPartitionOverwrite = partCols.size > staticPartitions.size && - conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC + val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get) + val query = addStaticPartitionColumns(r, i.query, staticPartitions) + val dynamicPartitionOverwrite = partCols.size > staticPartitions.size && + conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC - if (!i.overwrite) { - AppendData.byPosition(relation, query) - } else if (dynamicPartitionOverwrite) { - OverwritePartitionsDynamic.byPosition(relation, query) - } else { - OverwriteByExpression.byPosition( - relation, query, staticDeleteExpression(relation, staticPartitions)) - } + if (!i.overwrite) { + AppendData.byPosition(r, query) + } else if (dynamicPartitionOverwrite) { + OverwritePartitionsDynamic.byPosition(r, query) + } else { + OverwriteByExpression.byPosition(r, query, staticDeleteExpression(r, staticPartitions)) + } } private def partitionColumnNames(table: Table): Seq[String] = {