Skip to content

Commit b58397c

Browse files
committed
[SPARK-33141][SQL] capture SQL configs when creating permanent views
1 parent 551b504 commit b58397c

File tree

20 files changed

+294
-179
lines changed

20 files changed

+294
-179
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,11 @@ object AnalysisContext {
132132
*/
133133
class Analyzer(
134134
override val catalogManager: CatalogManager,
135-
conf: SQLConf)
135+
deprecatedConf: SQLConf)
136136
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {
137137

138+
private def conf = SQLConf.get
139+
138140
private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog
139141

140142
override protected def isPlanIntegral(plan: LogicalPlan): Boolean = {
@@ -997,7 +999,9 @@ class Analyzer(
997999
s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " +
9981000
"around this.")
9991001
}
1000-
executeSameContext(child)
1002+
SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewQuerySQLConfigs)) {
1003+
executeSameContext(child)
1004+
}
10011005
}
10021006
view.copy(child = newChild)
10031007
case p @ SubqueryAlias(_, view: View) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class SessionCatalog(
6161
externalCatalogBuilder: () => ExternalCatalog,
6262
globalTempViewManagerBuilder: () => GlobalTempViewManager,
6363
functionRegistry: FunctionRegistry,
64-
conf: SQLConf,
64+
staticConf: SQLConf,
6565
hadoopConf: Configuration,
6666
parser: ParserInterface,
6767
functionResourceLoader: FunctionResourceLoader) extends Logging {
@@ -91,6 +91,8 @@ class SessionCatalog(
9191
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
9292
}
9393

94+
private def conf = SQLConf.get
95+
9496
lazy val externalCatalog = externalCatalogBuilder()
9597
lazy val globalTempViewManager = globalTempViewManagerBuilder()
9698

@@ -136,8 +138,8 @@ class SessionCatalog(
136138
}
137139

138140
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
139-
val cacheSize = conf.tableRelationCacheSize
140-
val cacheTTL = conf.metadataCacheTTL
141+
val cacheSize = staticConf.tableRelationCacheSize
142+
val cacheTTL = staticConf.metadataCacheTTL
141143

142144
var builder = CacheBuilder.newBuilder()
143145
.maximumSize(cacheSize)
@@ -794,14 +796,19 @@ class SessionCatalog(
794796

795797
if (metadata.tableType == CatalogTableType.VIEW) {
796798
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
797-
logDebug(s"'$viewText' will be used for the view($table).")
799+
val viewConfigs = metadata.viewQuerySQLConfigs
800+
val viewPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs)) {
801+
parser.parsePlan(viewText)
802+
}
803+
804+
logDebug(s"'$viewText' will be used for the view($table) with configs: $viewConfigs.")
798805
// The relation is a view, so we wrap the relation by:
799806
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
800807
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
801808
val child = View(
802809
desc = metadata,
803810
output = metadata.schema.toAttributes,
804-
child = parser.parsePlan(viewText))
811+
child = viewPlan)
805812
SubqueryAlias(multiParts, child)
806813
} else {
807814
SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata, options))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.mutable
2525
import scala.util.control.NonFatal
2626

2727
import org.apache.commons.lang3.StringUtils
28+
import org.json4s.jackson.JsonMethods
2829

2930
import org.apache.spark.internal.Logging
3031
import org.apache.spark.sql.AnalysisException
@@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
3839
import org.apache.spark.sql.internal.SQLConf
3940
import org.apache.spark.sql.types._
4041
import org.apache.spark.sql.util.CaseInsensitiveStringMap
42+
import org.apache.spark.util.JsonProtocol
4143

4244

4345
/**
@@ -322,6 +324,23 @@ case class CatalogTable(
322324
)
323325
}
324326

327+
/**
328+
* Return the SQL configs of the query that creates a view, the configs are applied when parsing
329+
* and analyzing the view, should be empty if the CatalogTable is not a View or created by older
330+
* versions of Spark(before 3.1.0).
331+
*/
332+
def viewQuerySQLConfigs: Map[String, String] = {
333+
try {
334+
properties.get(CatalogTable.VIEW_QUERY_SQL_CONFIGS)
335+
.map(confJson => JsonProtocol.mapFromJson(JsonMethods.parse(confJson)).toMap)
336+
.getOrElse(Map.empty)
337+
} catch {
338+
case e: Exception =>
339+
throw new AnalysisException(
340+
"Corrupted view query SQL configs in catalog", cause = Some(e))
341+
}
342+
}
343+
325344
/** Syntactic sugar to update a field in `storage`. */
326345
def withNewStorage(
327346
locationUri: Option[URI] = storage.locationUri,
@@ -415,6 +434,8 @@ object CatalogTable {
415434
val VIEW_QUERY_OUTPUT_PREFIX = VIEW_PREFIX + "query.out."
416435
val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols"
417436
val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col."
437+
438+
val VIEW_QUERY_SQL_CONFIGS = VIEW_PREFIX + "query.sqlConfigs"
418439
}
419440

420441
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ import org.apache.spark.util.random.RandomSampler
5151
* The AstBuilder converts an ANTLR4 ParseTree into a catalyst Expression, LogicalPlan or
5252
* TableIdentifier.
5353
*/
54-
class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging {
54+
class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
5555
import ParserUtils._
5656

57-
def this() = this(new SQLConf())
57+
protected def conf = SQLConf.get
5858

5959
protected def typedVisit[T](ctx: ParseTree): T = {
6060
ctx.accept(this).asInstanceOf[T]

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ import org.apache.spark.sql.types.{DataType, StructType}
3333
/**
3434
* Base SQL parsing infrastructure.
3535
*/
36-
abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Logging {
36+
abstract class AbstractSqlParser extends ParserInterface with Logging {
37+
38+
private def conf = SQLConf.get
3739

3840
/** Creates/Resolves DataType for a given SQL string. */
3941
override def parseDataType(sqlText: String): DataType = parse(sqlText) { parser =>
@@ -138,13 +140,13 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log
138140
/**
139141
* Concrete SQL parser for Catalyst-only SQL statements.
140142
*/
141-
class CatalystSqlParser(conf: SQLConf) extends AbstractSqlParser(conf) {
142-
val astBuilder = new AstBuilder(conf)
143+
class CatalystSqlParser(deprecatedConf: SQLConf) extends AbstractSqlParser {
144+
val astBuilder = new AstBuilder
143145
}
144146

145147
/** For test-only. */
146-
object CatalystSqlParser extends AbstractSqlParser(SQLConf.get) {
147-
val astBuilder = new AstBuilder(SQLConf.get)
148+
object CatalystSqlParser extends AbstractSqlParser {
149+
val astBuilder = new AstBuilder
148150
}
149151

150152
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20-
import scala.collection.mutable
21-
2220
import org.apache.spark.sql.catalyst.AliasIdentifier
23-
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation}
21+
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2422
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
2523
import org.apache.spark.sql.catalyst.expressions._
2624
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction}
2725
import org.apache.spark.sql.catalyst.plans._
2826
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
2927
import org.apache.spark.sql.catalyst.util.truncatedString
30-
import org.apache.spark.sql.connector.catalog.Identifier
3128
import org.apache.spark.sql.internal.SQLConf
3229
import org.apache.spark.sql.types._
3330
import org.apache.spark.util.random.RandomSampler
@@ -456,6 +453,17 @@ case class View(
456453
}
457454
}
458455

456+
object View {
457+
def effectiveSQLConf(configs: Map[String, String]): SQLConf = {
458+
val sqlConf = new SQLConf()
459+
for ((k, v) <- configs) {
460+
sqlConf.settings.put(k, v)
461+
}
462+
sqlConf.setConf(SQLConf.MAX_NESTED_VIEW_DEPTH, SQLConf.get.maxNestedViewDepth)
463+
sqlConf
464+
}
465+
}
466+
459467
/**
460468
* A container for holding named common table expressions (CTEs) and a query plan.
461469
* This operator will be removed during analysis and the relations will be substituted into child.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -771,22 +771,24 @@ class AnalysisSuite extends AnalysisTest with Matchers {
771771
// RuleExecutor only throw exception or log warning when the rule is supposed to run
772772
// more than once.
773773
val maxIterations = 2
774-
val conf = new SQLConf().copy(SQLConf.ANALYZER_MAX_ITERATIONS -> maxIterations)
775-
val testAnalyzer = new Analyzer(
776-
new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), conf)
777-
778-
val plan = testRelation2.select(
779-
$"a" / Literal(2) as "div1",
780-
$"a" / $"b" as "div2",
781-
$"a" / $"c" as "div3",
782-
$"a" / $"d" as "div4",
783-
$"e" / $"e" as "div5")
784-
785-
val message = intercept[TreeNodeException[LogicalPlan]] {
786-
testAnalyzer.execute(plan)
787-
}.getMessage
788-
assert(message.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " +
789-
s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value."))
774+
withSQLConf(SQLConf.ANALYZER_MAX_ITERATIONS.key -> maxIterations.toString) {
775+
val conf = new SQLConf().copy(SQLConf.ANALYZER_MAX_ITERATIONS -> maxIterations)
776+
val testAnalyzer = new Analyzer(
777+
new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), conf)
778+
779+
val plan = testRelation2.select(
780+
$"a" / Literal(2) as "div1",
781+
$"a" / $"b" as "div2",
782+
$"a" / $"c" as "div3",
783+
$"a" / $"d" as "div4",
784+
$"e" / $"e" as "div5")
785+
786+
val message = intercept[TreeNodeException[LogicalPlan]] {
787+
testAnalyzer.execute(plan)
788+
}.getMessage
789+
assert(message.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " +
790+
s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value."))
791+
}
790792
}
791793

792794
test("SPARK-30886 Deprecate two-parameter TRIM/LTRIM/RTRIM") {

0 commit comments

Comments
 (0)