Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -899,24 +899,24 @@ class Analyzer(override val catalogManager: CatalogManager)
object ResolveTempViews extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(ident, _, isStreaming) =>
lookupTempView(ident, isStreaming, performCheck = true).getOrElse(u)
lookupAndResolveTempView(ident, isStreaming).getOrElse(u)
case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _, _) =>
lookupTempView(ident, performCheck = true)
lookupAndResolveTempView(ident)
.map(view => i.copy(table = view))
.getOrElse(i)
case c @ CacheTable(UnresolvedRelation(ident, _, false), _, _, _) =>
lookupTempView(ident, performCheck = true)
lookupAndResolveTempView(ident)
.map(view => c.copy(table = view))
.getOrElse(c)
case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) =>
lookupTempView(ident, performCheck = true)
lookupAndResolveTempView(ident)
.map(view => c.copy(table = view, isTempView = true))
.getOrElse(c)
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
case UnresolvedRelation(ident, _, false) =>
lookupTempView(ident, performCheck = true).map(unwrapRelationPlan).map {
lookupAndResolveTempView(ident).map(unwrapRelationPlan).map {
case r: DataSourceV2Relation => write.withNewTable(r)
case _ => throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted)
}.getOrElse(write)
Expand Down Expand Up @@ -948,10 +948,9 @@ class Analyzer(override val catalogManager: CatalogManager)
.getOrElse(u)
}

def lookupTempView(
private def lookupTempView(
identifier: Seq[String],
isStreaming: Boolean = false,
performCheck: Boolean = false): Option[LogicalPlan] = {
isStreaming: Boolean = false): Option[LogicalPlan] = {
// Permanent View can't refer to temp views, no need to lookup at all.
if (isResolvingView && !isReferredTempViewName(identifier)) return None

Expand All @@ -964,7 +963,13 @@ class Analyzer(override val catalogManager: CatalogManager)
if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) {
throw QueryCompilationErrors.readNonStreamingTempViewError(identifier.quoted)
}
tmpView.map(ResolveRelations.resolveViews(_, performCheck))
tmpView
}

private def lookupAndResolveTempView(
identifier: Seq[String],
isStreaming: Boolean = false): Option[LogicalPlan] = {
lookupTempView(identifier, isStreaming).map(ResolveRelations.resolveViews)
}
}

Expand Down Expand Up @@ -1128,7 +1133,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name.
// If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names
// with it, instead of current catalog and namespace.
def resolveViews(plan: LogicalPlan, performCheck: Boolean = false): LogicalPlan = plan match {
def resolveViews(plan: LogicalPlan): LogicalPlan = plan match {
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
// `viewText` should be defined, or else we throw an error on the generation of the View
// operator.
Expand All @@ -1147,16 +1152,10 @@ class Analyzer(override val catalogManager: CatalogManager)
}
// Fail the analysis eagerly because outside AnalysisContext, the unresolved operators
// inside a view maybe resolved incorrectly.
// But for commands like `DropViewCommand`, resolving view is unnecessary even though
// there is unresolved node. So use the `performCheck` flag to skip the analysis check
// for these commands.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trick doesn't always work. Sometimes the analyzer may fail eagerly instead of waiting for checkAnalysis.

// TODO(SPARK-34504): avoid unnecessary view resolving and remove the `performCheck` flag
if (performCheck) {
checkAnalysis(newChild)
}
checkAnalysis(newChild)
view.copy(child = newChild)
case p @ SubqueryAlias(_, view: View) =>
p.copy(child = resolveViews(view, performCheck))
p.copy(child = resolveViews(view))
case _ => plan
}

Expand All @@ -1179,14 +1178,14 @@ class Analyzer(override val catalogManager: CatalogManager)

case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(resolveViews(_, performCheck = true))
.map(resolveViews)
.map(EliminateSubqueryAliases(_))
.map(relation => c.copy(table = relation))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(resolveViews(_, performCheck = true))
.map(resolveViews)
.map(EliminateSubqueryAliases(_))
.map(relation => c.copy(table = relation))
.getOrElse(c)
Expand All @@ -1212,7 +1211,7 @@ class Analyzer(override val catalogManager: CatalogManager)

case u: UnresolvedRelation =>
lookupRelation(u.multipartIdentifier, u.options, u.isStreaming)
.map(resolveViews(_, performCheck = true)).getOrElse(u)
.map(resolveViews).getOrElse(u)

case u @ UnresolvedTable(identifier, cmd, relationTypeMismatchHint) =>
lookupTableOrView(identifier).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,25 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-34504: drop an invalid view") {
withTable("t") {
sql("CREATE TABLE t(s STRUCT<i: INT, j: INT>) USING json")
val viewName = createView("v", "SELECT s.i FROM t")
withView(viewName) {
assert(spark.table(viewName).collect().isEmpty)

// re-create the table without nested field `i` which is referred by the view.
sql("DROP TABLE t")
sql("CREATE TABLE t(s STRUCT<j: INT>) USING json")
val e = intercept[AnalysisException](spark.table(viewName))
assert(e.message.contains("No such struct field i in j"))

// drop invalid view should be fine
sql(s"DROP VIEW $viewName")
}
}
}
}

class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
Expand Down