diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 6f445b1e88d70..389e939bd8e2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -500,25 +500,44 @@ abstract class UnresolvedStarBase(target: Option[Seq[String]]) extends Star with override def expand( input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + expandStar(input.output, input.metadataOutput, input.resolve, input.inputSet.toSeq, resolver) + } + + /** + * Method used to expand a star. It uses output and metadata output attributes of the child + * for the expansion and it supports both recursive and non-recursive data types. + * + * @param childOperatorOutput The output attributes of the child operator + * @param childOperatorMetadataOutput The metadata output attributes of the child operator + * @param resolve A function to resolve the given name parts to an attribute + * @param suggestedAttributes A list of attributes that are suggested for expansion + * @param resolver The resolver used to match the name parts + */ + def expandStar( + childOperatorOutput: Seq[Attribute], + childOperatorMetadataOutput: Seq[Attribute], + resolve: (Seq[String], Resolver) => Option[NamedExpression], + suggestedAttributes: Seq[Attribute], + resolver: Resolver): Seq[NamedExpression] = { // If there is no table specified, use all non-hidden input attributes. - if (target.isEmpty) return input.output + if (target.isEmpty) return childOperatorOutput // If there is a table specified, use hidden input attributes as well - val hiddenOutput = input.metadataOutput.filter(_.qualifiedAccessOnly) + val hiddenOutput = childOperatorMetadataOutput.filter(_.qualifiedAccessOnly) // Remove the qualified-access-only restriction immediately. The expanded attributes will be // put in a logical plan node and becomes normal attributes. They can still keep the special // attribute metadata to indicate that they are from metadata columns, but they should not // keep any restrictions that may break column resolution for normal attributes. // See SPARK-42084 for more details. .map(_.markAsAllowAnyAccess()) - val expandedAttributes = (hiddenOutput ++ input.output).filter( + val expandedAttributes = (hiddenOutput ++ childOperatorOutput).filter( matchedQualifier(_, target.get, resolver)) if (expandedAttributes.nonEmpty) return expandedAttributes // Try to resolve it as a struct expansion. If there is a conflict and both are possible, // (i.e. [name].* is both a table and a struct), the struct path can always be qualified. - val attribute = input.resolve(target.get, resolver) + val attribute = resolve(target.get, resolver) if (attribute.isDefined) { // This target resolved to an attribute in child. It must be a struct. Expand it. attribute.get.dataType match { @@ -532,7 +551,7 @@ abstract class UnresolvedStarBase(target: Option[Seq[String]]) extends Star with throw QueryCompilationErrors.starExpandDataTypeNotSupportedError(target.get) } } else { - val from = input.inputSet.map(_.name).map(toSQLId).mkString(", ") + val from = suggestedAttributes.map(_.name).map(toSQLId).mkString(", ") val targetString = target.get.mkString(".") throw QueryCompilationErrors.cannotResolveStarExpandGivenInputColumnsError( targetString, from)