1717
1818package org .apache .spark .sql .execution .command
1919
20- import scala .util . control . NonFatal
20+ import scala .collection . mutable
2121
2222import org .apache .spark .sql .{AnalysisException , Row , SparkSession }
23- import org .apache .spark .sql .catalyst .{ SQLBuilder , TableIdentifier }
23+ import org .apache .spark .sql .catalyst .TableIdentifier
2424import org .apache .spark .sql .catalyst .analysis .{UnresolvedFunction , UnresolvedRelation }
2525import org .apache .spark .sql .catalyst .catalog .{CatalogStorageFormat , CatalogTable , CatalogTableType }
2626import org .apache .spark .sql .catalyst .expressions .Alias
@@ -64,9 +64,9 @@ object PersistedView extends ViewType
6464
6565
6666/**
67- * Create or replace a view with given query plan. This command will convert the query plan to
68- * canonicalized SQL string, and store it as view text in metastore, if we need to create a
69- * permanent view.
67+ * Create or replace a view with given query plan. This command will generate some view-specific
68+ * properties(e.g. view default database, view query output column names) and store them as
69+ * properties in metastore, if we need to create a permanent view.
7070 *
7171 * @param name the name of this view.
7272 * @param userSpecifiedColumns the output column names and optional comments specified by users,
@@ -75,8 +75,8 @@ object PersistedView extends ViewType
7575 * @param properties the properties of this view.
7676 * @param originalText the original SQL text of this view, can be None if this view is created via
7777 * Dataset API.
78- * @param child the logical plan that represents the view; this is used to generate a canonicalized
79- * version of the SQL that can be saved in the catalog .
78+ * @param child the logical plan that represents the view; this is used to generate the logical
79+ * plan for temporary view and the view schema .
8080 * @param allowExisting if true, and if the view already exists, noop; if false, and if the view
8181 * already exists, throws analysis exception.
8282 * @param replace if true, and if the view already exists, updates it; if false, and if the view
@@ -95,6 +95,8 @@ case class CreateViewCommand(
9595 viewType : ViewType )
9696 extends RunnableCommand {
9797
98+ import ViewHelper ._
99+
98100 override protected def innerChildren : Seq [QueryPlan [_]] = Seq (child)
99101
100102 if (viewType == PersistedView ) {
@@ -137,22 +139,12 @@ case class CreateViewCommand(
137139 // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
138140 verifyTemporaryObjectsNotExists(sparkSession)
139141
140- val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
141- analyzedPlan
142- } else {
143- val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
144- case (attr, (colName, None )) => Alias (attr, colName)()
145- case (attr, (colName, Some (colComment))) =>
146- val meta = new MetadataBuilder ().putString(" comment" , colComment).build()
147- Alias (attr, colName)(explicitMetadata = Some (meta))
148- }
149- sparkSession.sessionState.executePlan(Project (projectList, analyzedPlan)).analyzed
150- }
151-
152142 val catalog = sparkSession.sessionState.catalog
153143 if (viewType == LocalTempView ) {
144+ val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
154145 catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
155146 } else if (viewType == GlobalTempView ) {
147+ val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
156148 catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
157149 } else if (catalog.tableExists(name)) {
158150 val tableMetadata = catalog.getTableMetadata(name)
@@ -163,7 +155,7 @@ case class CreateViewCommand(
163155 throw new AnalysisException (s " $name is not a view " )
164156 } else if (replace) {
165157 // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
166- catalog.alterTable(prepareTable(sparkSession, aliasedPlan ))
158+ catalog.alterTable(prepareTable(sparkSession, analyzedPlan ))
167159 } else {
168160 // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
169161 // exists.
@@ -173,7 +165,7 @@ case class CreateViewCommand(
173165 }
174166 } else {
175167 // Create the view if it doesn't exist.
176- catalog.createTable(prepareTable(sparkSession, aliasedPlan ), ignoreIfExists = false )
168+ catalog.createTable(prepareTable(sparkSession, analyzedPlan ), ignoreIfExists = false )
177169 }
178170 Seq .empty[Row ]
179171 }
@@ -207,29 +199,44 @@ case class CreateViewCommand(
207199 }
208200
209201 /**
210- * Returns a [[ CatalogTable ]] that can be used to save in the catalog. This comment canonicalize
211- * SQL based on the analyzed plan, and also creates the proper schema for the view .
202+ * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
203+ * else return the analyzed plan directly .
212204 */
213- private def prepareTable (sparkSession : SparkSession , aliasedPlan : LogicalPlan ): CatalogTable = {
214- val viewSQL : String = new SQLBuilder (aliasedPlan).toSQL
215-
216- // Validate the view SQL - make sure we can parse it and analyze it.
217- // If we cannot analyze the generated query, there is probably a bug in SQL generation.
218- try {
219- sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
220- } catch {
221- case NonFatal (e) =>
222- throw new RuntimeException (s " Failed to analyze the canonicalized SQL: $viewSQL" , e)
205+ private def aliasPlan (session : SparkSession , analyzedPlan : LogicalPlan ): LogicalPlan = {
206+ if (userSpecifiedColumns.isEmpty) {
207+ analyzedPlan
208+ } else {
209+ val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
210+ case (attr, (colName, None )) => Alias (attr, colName)()
211+ case (attr, (colName, Some (colComment))) =>
212+ val meta = new MetadataBuilder ().putString(" comment" , colComment).build()
213+ Alias (attr, colName)(explicitMetadata = Some (meta))
214+ }
215+ session.sessionState.executePlan(Project (projectList, analyzedPlan)).analyzed
223216 }
217+ }
218+
219+ /**
220+ * Returns a [[CatalogTable ]] that can be used to save in the catalog. Generate the view-specific
221+ * properties(e.g. view default database, view query output column names) and store them as
222+ * properties in the CatalogTable, and also creates the proper schema for the view.
223+ */
224+ private def prepareTable (session : SparkSession , analyzedPlan : LogicalPlan ): CatalogTable = {
225+ if (originalText.isEmpty) {
226+ throw new AnalysisException (
227+ " It is not allowed to create a persisted view from the Dataset API" )
228+ }
229+
230+ val newProperties = generateViewProperties(properties, session, analyzedPlan)
224231
225232 CatalogTable (
226233 identifier = name,
227234 tableType = CatalogTableType .VIEW ,
228235 storage = CatalogStorageFormat .empty,
229- schema = aliasedPlan .schema,
230- properties = properties ,
236+ schema = aliasPlan(session, analyzedPlan) .schema,
237+ properties = newProperties ,
231238 viewOriginalText = originalText,
232- viewText = Some (viewSQL) ,
239+ viewText = originalText ,
233240 comment = comment
234241 )
235242 }
@@ -244,14 +251,16 @@ case class CreateViewCommand(
244251 * @param name the name of this view.
245252 * @param originalText the original SQL text of this view. Note that we can only alter a view by
246253 * SQL API, which means we always have originalText.
247- * @param query the logical plan that represents the view; this is used to generate a canonicalized
248- * version of the SQL that can be saved in the catalog .
254+ * @param query the logical plan that represents the view; this is used to generate the new view
255+ * schema .
249256 */
250257case class AlterViewAsCommand (
251258 name : TableIdentifier ,
252259 originalText : String ,
253260 query : LogicalPlan ) extends RunnableCommand {
254261
262+ import ViewHelper ._
263+
255264 override protected def innerChildren : Seq [QueryPlan [_]] = Seq (query)
256265
257266 override def run (session : SparkSession ): Seq [Row ] = {
@@ -275,21 +284,80 @@ case class AlterViewAsCommand(
275284 throw new AnalysisException (s " ${viewMeta.identifier} is not a view. " )
276285 }
277286
278- val viewSQL : String = new SQLBuilder (analyzedPlan).toSQL
279- // Validate the view SQL - make sure we can parse it and analyze it.
280- // If we cannot analyze the generated query, there is probably a bug in SQL generation.
281- try {
282- session.sql(viewSQL).queryExecution.assertAnalyzed()
283- } catch {
284- case NonFatal (e) =>
285- throw new RuntimeException (s " Failed to analyze the canonicalized SQL: $viewSQL" , e)
286- }
287+ val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan)
287288
288289 val updatedViewMeta = viewMeta.copy(
289290 schema = analyzedPlan.schema,
291+ properties = newProperties,
290292 viewOriginalText = Some (originalText),
291- viewText = Some (viewSQL ))
293+ viewText = Some (originalText ))
292294
293295 session.sessionState.catalog.alterTable(updatedViewMeta)
294296 }
295297}
298+
299+ object ViewHelper {
300+
301+ import CatalogTable ._
302+
303+ /**
304+ * Generate the view default database in `properties`.
305+ */
306+ private def generateViewDefaultDatabase (databaseName : String ): Map [String , String ] = {
307+ Map (VIEW_DEFAULT_DATABASE -> databaseName)
308+ }
309+
310+ /**
311+ * Generate the view query output column names in `properties`.
312+ */
313+ private def generateQueryColumnNames (columns : Seq [String ]): Map [String , String ] = {
314+ val props = new mutable.HashMap [String , String ]
315+ if (columns.nonEmpty) {
316+ props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS , columns.length.toString)
317+ columns.zipWithIndex.foreach { case (colName, index) =>
318+ props.put(s " $VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index" , colName)
319+ }
320+ }
321+ props.toMap
322+ }
323+
324+ /**
325+ * Remove the view query output column names in `properties`.
326+ */
327+ private def removeQueryColumnNames (properties : Map [String , String ]): Map [String , String ] = {
328+ // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
329+ // while `CatalogTable` should be serializable.
330+ properties.filterNot { case (key, _) =>
331+ key.startsWith(VIEW_QUERY_OUTPUT_PREFIX )
332+ }
333+ }
334+
335+ /**
336+ * Generate the view properties in CatalogTable, including:
337+ * 1. view default database that is used to provide the default database name on view resolution.
338+ * 2. the output column names of the query that creates a view, this is used to map the output of
339+ * the view child to the view output during view resolution.
340+ *
341+ * @param properties the `properties` in CatalogTable.
342+ * @param session the spark session.
343+ * @param analyzedPlan the analyzed logical plan that represents the child of a view.
344+ * @return new view properties including view default database and query column names properties.
345+ */
346+ def generateViewProperties (
347+ properties : Map [String , String ],
348+ session : SparkSession ,
349+ analyzedPlan : LogicalPlan ): Map [String , String ] = {
350+ // Generate the query column names, throw an AnalysisException if there exists duplicate column
351+ // names.
352+ val queryOutput = analyzedPlan.schema.fieldNames
353+ assert(queryOutput.distinct.size == queryOutput.size,
354+ s " The view output ${queryOutput.mkString(" (" , " ," , " )" )} contains duplicate column name. " )
355+
356+ // Generate the view default database name.
357+ val viewDefaultDatabase = session.sessionState.catalog.getCurrentDatabase
358+
359+ removeQueryColumnNames(properties) ++
360+ generateViewDefaultDatabase(viewDefaultDatabase) ++
361+ generateQueryColumnNames(queryOutput)
362+ }
363+ }
0 commit comments