Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,23 +180,20 @@ abstract class Star extends LeafExpression with NamedExpression {
case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable {

override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = {
// If there is no table specified, use all input attributes.
if (target.isEmpty) return input.output

// First try to expand assuming it is table.*.
val expandedAttributes: Seq[Attribute] = target match {
// If there is no table specified, use all input attributes.
case None => input.output
// If there is a table, pick out attributes that are part of this table.
case Some(t) => if (t.size == 1) {
input.output.filter(_.qualifiers.exists(resolver(_, t.head)))
val expandedAttributes =
if (target.get.size == 1) {
// If there is a table, pick out attributes that are part of this table.
input.output.filter(_.qualifiers.exists(resolver(_, target.get.head)))
} else {
List()
}
}
if (expandedAttributes.nonEmpty) return expandedAttributes

// Try to resolve it as a struct expansion. If there is a conflict and both are possible,
// (i.e. [name].* is both a table and a struct), the struct path can always be qualified.
require(target.isDefined)
val attribute = input.resolve(target.get, resolver)
if (attribute.isDefined) {
// This target resolved to an attribute in child. It must be a struct. Expand it.
Expand Down
31 changes: 31 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1950,6 +1950,37 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("Star Expansion - table with zero column") {
withTempTable("temp_table_no_cols") {
val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
val dfNoCols = sqlContext.createDataFrame(rddNoCols, StructType(Seq.empty))
dfNoCols.registerTempTable("temp_table_no_cols")

// ResolvedStar
checkAnswer(
dfNoCols,
dfNoCols.select(dfNoCols.col("*")))

// UnresolvedStar
checkAnswer(
dfNoCols,
sql("SELECT * FROM temp_table_no_cols"))
checkAnswer(
dfNoCols,
dfNoCols.select($"*"))

var e = intercept[AnalysisException] {
sql("SELECT a.* FROM temp_table_no_cols a")
}.getMessage
assert(e.contains("cannot resolve 'a.*' give input columns ''"))

e = intercept[AnalysisException] {
dfNoCols.select($"b.*")
}.getMessage
assert(e.contains("cannot resolve 'b.*' give input columns ''"))
}
}

test("Common subexpression elimination") {
// select from a table to prevent constant folding.
val df = sql("SELECT a, b from testData2 limit 1")
Expand Down