From 1bd02603fa24e6790994105cd47c8b55a4c1de40 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Fri, 3 Mar 2017 23:30:59 +0800 Subject: [PATCH 1/3] detect cyclic view references. --- .../spark/sql/execution/command/views.scala | 46 ++++++++++++++++++- .../spark/sql/execution/SQLViewSuite.scala | 21 +++++++-- 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 921c84895598..e01a1cc7b8a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRel import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} import org.apache.spark.sql.types.MetadataBuilder @@ -283,6 +283,18 @@ case class AlterViewAsCommand( throw new AnalysisException(s"${viewMeta.identifier} is not a view.") } + // Detect cyclic view references, a cyclic view reference may be created by the following + // queries: + // CREATE VIEW testView AS SELECT id FROM tbl + // CREATE VIEW testView2 AS SELECT id FROM testView + // ALTER VIEW testView AS SELECT * FROM testView2 + // In the above example, a reference cycle (testView -> testView2 -> testView) exsits. + // + // We disallow cyclic view references by checking that in ALTER VIEW command, when the + // `analyzedPlan` contains the same `View` node with the altered view, we should prevent the + // behavior and throw an AnalysisException. + checkCyclicViewReference(analyzedPlan, Seq(viewMeta.identifier), viewMeta.identifier) + val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan) val updatedViewMeta = viewMeta.copy( @@ -292,6 +304,38 @@ case class AlterViewAsCommand( session.sessionState.catalog.alterTable(updatedViewMeta) } + + /** + * Recursively search the logical plan to detect cyclic view references, throw an + * AnalysisException if cycle detected. + * + * @param plan the logical plan we detect cyclic view references from. + * @param path the path between the altered view and current node. + * @param viewIdent the table identifier of the altered view, we compare two views by the + * `desc.identifier`. + */ + private def checkCyclicViewReference( + plan: LogicalPlan, + path: Seq[TableIdentifier], + viewIdent: TableIdentifier): Unit = { + plan match { + case v: View => + val ident = v.desc.identifier + val newPath = path :+ ident + // If the table identifier equals to the `viewIdent`, current view node is the same with + // the altered view. We detect a view reference cycle, should throw an AnalysisException. + if (ident == viewIdent) { + throw new AnalysisException(s"Recursive view $viewIdent detected " + + s"(cycle: ${newPath.mkString(" -> ")})") + } else { + v.children.foreach { child => + checkCyclicViewReference(child, newPath, viewIdent) + } + } + case _ => + plan.children.foreach(child => checkCyclicViewReference(child, path, viewIdent)) + } + } } object ViewHelper { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 2d95cb6d64a8..675ec55a15dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -609,12 +609,25 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { } } - // TODO: Check for cyclic view references on ALTER VIEW. - ignore("correctly handle a cyclic view reference") { - withView("view1", "view2") { + test("correctly handle a cyclic view reference") { + withView("view1", "view2", "view3") { sql("CREATE VIEW view1 AS SELECT * FROM jt") sql("CREATE VIEW view2 AS SELECT * FROM view1") - intercept[AnalysisException](sql("ALTER VIEW view1 AS SELECT * FROM view2")) + sql("CREATE VIEW view3 AS SELECT * FROM view2") + + // Detect cyclic view reference on ALTER VIEW. + val e1 = intercept[AnalysisException] { + sql("ALTER VIEW view1 AS SELECT * FROM view2") + }.getMessage + assert(e1.contains("Recursive view `default`.`view1` detected (cycle: `default`.`view1` " + + "-> `default`.`view2` -> `default`.`view1`)")) + + // Detect the most left cycle when there exists multiple cyclic view references. + val e2 = intercept[AnalysisException] { + sql("ALTER VIEW view1 AS SELECT * FROM view3 JOIN view2") + }.getMessage + assert(e2.contains("Recursive view `default`.`view1` detected (cycle: `default`.`view1` " + + "-> `default`.`view3` -> `default`.`view2` -> `default`.`view1`)")) } } } From f487af3732045a1d461f8a8d0981931db4a4c47c Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Tue, 7 Mar 2017 18:28:32 +0800 Subject: [PATCH 2/3] detect cyclic references for CREATE OR REPLACE VIEW and subqueries. --- .../spark/sql/execution/command/views.scala | 99 ++++++++++--------- .../spark/sql/execution/SQLViewSuite.scala | 14 +++ 2 files changed, 69 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index e01a1cc7b8a0..ad357c59753d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.{Alias, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} import org.apache.spark.sql.types.MetadataBuilder @@ -154,6 +154,10 @@ case class CreateViewCommand( } else if (tableMetadata.tableType != CatalogTableType.VIEW) { throw new AnalysisException(s"$name is not a view") } else if (replace) { + // Detect cyclic view reference on CREATE OR REPLACE VIEW. + val viewIdent = tableMetadata.identifier + checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) + // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` catalog.alterTable(prepareTable(sparkSession, analyzedPlan)) } else { @@ -283,17 +287,9 @@ case class AlterViewAsCommand( throw new AnalysisException(s"${viewMeta.identifier} is not a view.") } - // Detect cyclic view references, a cyclic view reference may be created by the following - // queries: - // CREATE VIEW testView AS SELECT id FROM tbl - // CREATE VIEW testView2 AS SELECT id FROM testView - // ALTER VIEW testView AS SELECT * FROM testView2 - // In the above example, a reference cycle (testView -> testView2 -> testView) exsits. - // - // We disallow cyclic view references by checking that in ALTER VIEW command, when the - // `analyzedPlan` contains the same `View` node with the altered view, we should prevent the - // behavior and throw an AnalysisException. - checkCyclicViewReference(analyzedPlan, Seq(viewMeta.identifier), viewMeta.identifier) + // Detect cyclic view reference on ALTER VIEW. + val viewIdent = viewMeta.identifier + checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan) @@ -304,38 +300,6 @@ case class AlterViewAsCommand( session.sessionState.catalog.alterTable(updatedViewMeta) } - - /** - * Recursively search the logical plan to detect cyclic view references, throw an - * AnalysisException if cycle detected. - * - * @param plan the logical plan we detect cyclic view references from. - * @param path the path between the altered view and current node. - * @param viewIdent the table identifier of the altered view, we compare two views by the - * `desc.identifier`. - */ - private def checkCyclicViewReference( - plan: LogicalPlan, - path: Seq[TableIdentifier], - viewIdent: TableIdentifier): Unit = { - plan match { - case v: View => - val ident = v.desc.identifier - val newPath = path :+ ident - // If the table identifier equals to the `viewIdent`, current view node is the same with - // the altered view. We detect a view reference cycle, should throw an AnalysisException. - if (ident == viewIdent) { - throw new AnalysisException(s"Recursive view $viewIdent detected " + - s"(cycle: ${newPath.mkString(" -> ")})") - } else { - v.children.foreach { child => - checkCyclicViewReference(child, newPath, viewIdent) - } - } - case _ => - plan.children.foreach(child => checkCyclicViewReference(child, path, viewIdent)) - } - } } object ViewHelper { @@ -402,4 +366,51 @@ object ViewHelper { generateViewDefaultDatabase(viewDefaultDatabase) ++ generateQueryColumnNames(queryOutput) } + + /** + * Recursively search the logical plan to detect cyclic view references, throw an + * AnalysisException if cycle detected. + * + * A cyclic view reference is a cycle of reference dependencies, for example, if the following + * statements are executed: + * CREATE VIEW testView AS SELECT id FROM tbl + * CREATE VIEW testView2 AS SELECT id FROM testView + * ALTER VIEW testView AS SELECT * FROM testView2 + * The view `testView` references `testView2`, and `testView2` also references `testView`, + * therefore a reference cycle (testView -> testView2 -> testView) exists. + * + * @param plan the logical plan we detect cyclic view references from. + * @param path the path between the altered view and current node. + * @param viewIdent the table identifier of the altered view, we compare two views by the + * `desc.identifier`. + */ + def checkCyclicViewReference( + plan: LogicalPlan, + path: Seq[TableIdentifier], + viewIdent: TableIdentifier): Unit = { + plan match { + case v: View => + val ident = v.desc.identifier + val newPath = path :+ ident + // If the table identifier equals to the `viewIdent`, current view node is the same with + // the altered view. We detect a view reference cycle, should throw an AnalysisException. + if (ident == viewIdent) { + throw new AnalysisException(s"Recursive view $viewIdent detected " + + s"(cycle: ${newPath.mkString(" -> ")})") + } else { + v.children.foreach { child => + checkCyclicViewReference(child, newPath, viewIdent) + } + } + case _ => + plan.children.foreach(child => checkCyclicViewReference(child, path, viewIdent)) + } + + // Detect cyclic references from subqueries. + plan.expressions.foreach { expr => + if (expr.isInstanceOf[SubqueryExpression]) { + checkCyclicViewReference(expr.asInstanceOf[SubqueryExpression].plan, path, viewIdent) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 675ec55a15dc..79c2575ee1f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -628,6 +628,20 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { }.getMessage assert(e2.contains("Recursive view `default`.`view1` detected (cycle: `default`.`view1` " + "-> `default`.`view3` -> `default`.`view2` -> `default`.`view1`)")) + + // Detect cyclic view reference on CREATE OR REPLACE VIEW. + val e3 = intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW view1 AS SELECT * FROM view2") + }.getMessage + assert(e3.contains("Recursive view `default`.`view1` detected (cycle: `default`.`view1` " + + "-> `default`.`view2` -> `default`.`view1`)")) + + // Detect cyclic view reference from subqueries. + val e4 = intercept[AnalysisException] { + sql("ALTER VIEW view1 AS SELECT * FROM jt WHERE EXISTS (SELECT 1 FROM view2)") + }.getMessage + assert(e4.contains("Recursive view `default`.`view1` detected (cycle: `default`.`view1` " + + "-> `default`.`view2` -> `default`.`view1`)")) } } } From 46da41e2b74ef08945b6d147a182ad7940533ceb Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Wed, 8 Mar 2017 10:22:04 +0800 Subject: [PATCH 3/3] code refactor. --- .../org/apache/spark/sql/execution/command/views.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index ad357c59753d..00f0acab21aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -408,8 +408,10 @@ object ViewHelper { // Detect cyclic references from subqueries. plan.expressions.foreach { expr => - if (expr.isInstanceOf[SubqueryExpression]) { - checkCyclicViewReference(expr.asInstanceOf[SubqueryExpression].plan, path, viewIdent) + expr match { + case s: SubqueryExpression => + checkCyclicViewReference(s.plan, path, viewIdent) + case _ => // Do nothing. } } }