Skip to content

Commit 35ded12

Browse files
luluortacloud-fan
authored andcommitted
[SPARK-33141][SQL] Capture SQL configs when creating permanent views
### What changes were proposed in this pull request? This PR makes CreateViewCommand/AlterViewAsCommand capturing runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. Users can set `spark.sql.legacy.useCurrentConfigsForView` to `true` to restore the behavior before. ### Why are the changes needed? This PR is a sub-task of [SPARK-33138](https://issues.apache.org/jira/browse/SPARK-33138) that proposes to unify temp view and permanent view behaviors. This PR makes permanent views mimicking the temp view behavior that "fixes" view semantic by directly storing resolved LogicalPlan. For example, if a user uses spark 2.4 to create a view that contains null values from division-by-zero expressions, she may not want that other users' queries which reference her view throw exceptions when running on spark 3.x with ansi mode on. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? added UT + existing UTs (improved) Closes #30289 from luluorta/SPARK-33141. Authored-by: luluorta <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent b9f2f78 commit 35ded12

File tree

9 files changed

+190
-22
lines changed

9 files changed

+190
-22
lines changed

docs/sql-migration-guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ license: |
5151
- In Spark 3.1, the `schema_of_json` and `schema_of_csv` functions return the schema in the SQL format in which field names are quoted. In Spark 3.0, the function returns a catalog string without field quoting and in lower case.
5252

5353
- In Spark 3.1, refreshing a table will trigger an uncache operation for all other caches that reference the table, even if the table itself is not cached. In Spark 3.0 the operation will only be triggered if the table itself is cached.
54+
55+
- In Spark 3.1, creating or altering a view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`.
5456

5557
## Upgrading from Spark SQL 3.0 to 3.0.1
5658

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1034,7 +1034,9 @@ class Analyzer(override val catalogManager: CatalogManager)
10341034
s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " +
10351035
"around this.")
10361036
}
1037-
executeSameContext(child)
1037+
SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewSQLConfigs)) {
1038+
executeSameContext(child)
1039+
}
10381040
}
10391041
view.copy(child = newChild)
10401042
case p @ SubqueryAlias(_, view: View) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -795,14 +795,19 @@ class SessionCatalog(
795795

796796
if (metadata.tableType == CatalogTableType.VIEW) {
797797
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
798-
logDebug(s"'$viewText' will be used for the view($table).")
798+
val viewConfigs = metadata.viewSQLConfigs
799+
val viewPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs)) {
800+
parser.parsePlan(viewText)
801+
}
802+
803+
logDebug(s"'$viewText' will be used for the view($table) with configs: $viewConfigs.")
799804
// The relation is a view, so we wrap the relation by:
800805
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
801806
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
802807
val child = View(
803808
desc = metadata,
804809
output = metadata.schema.toAttributes,
805-
child = parser.parsePlan(viewText))
810+
child = viewPlan)
806811
SubqueryAlias(multiParts, child)
807812
} else {
808813
SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata, options))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,22 @@ case class CatalogTable(
305305
}
306306
}
307307

308+
/**
309+
* Return the SQL configs of when the view was created, the configs are applied when parsing and
310+
* analyzing the view, should be empty if the CatalogTable is not a View or created by older
311+
* versions of Spark(before 3.1.0).
312+
*/
313+
def viewSQLConfigs: Map[String, String] = {
314+
try {
315+
for ((key, value) <- properties if key.startsWith(CatalogTable.VIEW_SQL_CONFIG_PREFIX))
316+
yield (key.substring(CatalogTable.VIEW_SQL_CONFIG_PREFIX.length), value)
317+
} catch {
318+
case e: Exception =>
319+
throw new AnalysisException(
320+
"Corrupted view SQL configs in catalog", cause = Some(e))
321+
}
322+
}
323+
308324
/**
309325
* Return the output column names of the query that creates a view, the column names are used to
310326
* resolve a view, should be empty if the CatalogTable is not a View or created by older versions
@@ -411,6 +427,8 @@ object CatalogTable {
411427
props.toMap
412428
}
413429

430+
val VIEW_SQL_CONFIG_PREFIX = VIEW_PREFIX + "sqlConfig."
431+
414432
val VIEW_QUERY_OUTPUT_PREFIX = VIEW_PREFIX + "query.out."
415433
val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols"
416434
val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col."

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,22 @@ case class View(
453453
}
454454
}
455455

456+
object View {
457+
def effectiveSQLConf(configs: Map[String, String]): SQLConf = {
458+
val activeConf = SQLConf.get
459+
if (activeConf.useCurrentSQLConfigsForView) return activeConf
460+
461+
val sqlConf = new SQLConf()
462+
for ((k, v) <- configs) {
463+
sqlConf.settings.put(k, v)
464+
}
465+
// We should respect the current maxNestedViewDepth cause the view resolving are executed
466+
// from top to down.
467+
sqlConf.setConf(SQLConf.MAX_NESTED_VIEW_DEPTH, activeConf.maxNestedViewDepth)
468+
sqlConf
469+
}
470+
}
471+
456472
/**
457473
* A container for holding named common table expressions (CTEs) and a query plan.
458474
* This operator will be removed during analysis and the relations will be substituted into child.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,6 +1481,15 @@ object SQLConf {
14811481
"must be positive.")
14821482
.createWithDefault(100)
14831483

1484+
val USE_CURRENT_SQL_CONFIGS_FOR_VIEW =
1485+
buildConf("spark.sql.legacy.useCurrentConfigsForView")
1486+
.internal()
1487+
.doc("When true, SQL Configs of the current active SparkSession instead of the captured " +
1488+
"ones will be applied during the parsing and analysis phases of the view resolution.")
1489+
.version("3.1.0")
1490+
.booleanConf
1491+
.createWithDefault(false)
1492+
14841493
val STREAMING_FILE_COMMIT_PROTOCOL_CLASS =
14851494
buildConf("spark.sql.streaming.commitProtocolClass")
14861495
.version("2.1.0")
@@ -3415,6 +3424,8 @@ class SQLConf extends Serializable with Logging {
34153424

34163425
def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)
34173426

3427+
def useCurrentSQLConfigsForView: Boolean = getConf(SQLConf.USE_CURRENT_SQL_CONFIGS_FOR_VIEW)
3428+
34183429
def starSchemaDetection: Boolean = getConf(STARSCHEMA_DETECTION)
34193430

34203431
def starSchemaFTRatio: Double = getConf(STARSCHEMA_FACT_TABLE_RATIO)

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
2727
import org.apache.spark.sql.catalyst.plans.QueryPlan
2828
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
2929
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
30-
import org.apache.spark.sql.internal.StaticSQLConf
30+
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
3131
import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType}
3232
import org.apache.spark.sql.util.SchemaUtils
3333

@@ -334,6 +334,18 @@ case class ShowViewsCommand(
334334

335335
object ViewHelper {
336336

337+
private val configPrefixDenyList = Seq(
338+
SQLConf.MAX_NESTED_VIEW_DEPTH.key,
339+
"spark.sql.optimizer.",
340+
"spark.sql.codegen.",
341+
"spark.sql.execution.",
342+
"spark.sql.shuffle.",
343+
"spark.sql.adaptive.")
344+
345+
private def shouldCaptureConfig(key: String): Boolean = {
346+
!configPrefixDenyList.exists(prefix => key.startsWith(prefix))
347+
}
348+
337349
import CatalogTable._
338350

339351
/**
@@ -361,11 +373,37 @@ object ViewHelper {
361373
}
362374
}
363375

376+
/**
377+
* Convert the view SQL configs to `properties`.
378+
*/
379+
private def sqlConfigsToProps(conf: SQLConf): Map[String, String] = {
380+
val modifiedConfs = conf.getAllConfs.filter { case (k, _) =>
381+
conf.isModifiable(k) && shouldCaptureConfig(k)
382+
}
383+
val props = new mutable.HashMap[String, String]
384+
for ((key, value) <- modifiedConfs) {
385+
props.put(s"$VIEW_SQL_CONFIG_PREFIX$key", value)
386+
}
387+
props.toMap
388+
}
389+
390+
/**
391+
* Remove the view SQL configs in `properties`.
392+
*/
393+
private def removeSQLConfigs(properties: Map[String, String]): Map[String, String] = {
394+
// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
395+
// while `CatalogTable` should be serializable.
396+
properties.filterNot { case (key, _) =>
397+
key.startsWith(VIEW_SQL_CONFIG_PREFIX)
398+
}
399+
}
400+
364401
/**
365402
* Generate the view properties in CatalogTable, including:
366403
* 1. view default database that is used to provide the default database name on view resolution.
367404
* 2. the output column names of the query that creates a view, this is used to map the output of
368405
* the view child to the view output during view resolution.
406+
* 3. the SQL configs when creating the view.
369407
*
370408
* @param properties the `properties` in CatalogTable.
371409
* @param session the spark session.
@@ -380,15 +418,18 @@ object ViewHelper {
380418
// for createViewCommand queryOutput may be different from fieldNames
381419
val queryOutput = analyzedPlan.schema.fieldNames
382420

421+
val conf = session.sessionState.conf
422+
383423
// Generate the query column names, throw an AnalysisException if there exists duplicate column
384424
// names.
385425
SchemaUtils.checkColumnNameDuplication(
386-
fieldNames, "in the view definition", session.sessionState.conf.resolver)
426+
fieldNames, "in the view definition", conf.resolver)
387427

388-
// Generate the view default catalog and namespace.
428+
// Generate the view default catalog and namespace, as well as captured SQL configs.
389429
val manager = session.sessionState.catalogManager
390-
removeQueryColumnNames(properties) ++
430+
removeSQLConfigs(removeQueryColumnNames(properties)) ++
391431
catalogAndNamespaceToProps(manager.currentCatalog.name, manager.currentNamespace) ++
432+
sqlConfigsToProps(conf) ++
392433
generateQueryColumnNames(queryOutput)
393434
}
394435

0 commit comments

Comments
 (0)