Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
}

/**
* Optionally resolves the given string to a [[NamedExpression]] using the input from all child
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan. The attribute is expressed as
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
*/
Expand All @@ -116,7 +116,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
resolve(nameParts, children.flatMap(_.output), resolver, throwErrors)

/**
* Optionally resolves the given string to a [[NamedExpression]] based on the output of this
* Optionally resolves the given strings to a [[NamedExpression]] based on the output of this
* LogicalPlan. The attribute is expressed as string in the following form:
* `[scope].AttributeName.[nested].[fields]...`.
*/
Expand All @@ -126,6 +126,57 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
throwErrors: Boolean = false): Option[NamedExpression] =
resolve(nameParts, output, resolver, throwErrors)

/**
* Given an attribute name, split it to name parts by dot, but
* don't split the name parts quoted by backticks, for example,
* `ab.cd`.`efg` should be split into two parts "ab.cd" and "efg".
*/
def resolveQuoted(
name: String,
resolver: Resolver): Option[NamedExpression] = {
resolve(parseAttributeName(name), resolver, true)
}

/**
* Internal method, used to split attribute name by dot with backticks rule.
* Backticks must appear in pairs, and the quoted string must be a complete name part,
* which means `ab..c`e.f is not allowed.
* Escape character is not supported now, so we can't use backtick inside name part.
*/
private def parseAttributeName(name: String): Seq[String] = {
val e = new AnalysisException(s"syntax error in attribute name: $name")
val nameParts = scala.collection.mutable.ArrayBuffer.empty[String]
val tmp = scala.collection.mutable.ArrayBuffer.empty[Char]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of an iterator, how about just access a char string directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually never mind - with your implementation the code would look a lot uglier with a char array.

var inBacktick = false
var i = 0
while (i < name.length) {
val char = name(i)
if (inBacktick) {
if (char == '`') {
inBacktick = false
if (i + 1 < name.length && name(i + 1) != '.') throw e
} else {
tmp += char
}
} else {
if (char == '`') {
if (tmp.nonEmpty) throw e
inBacktick = true
} else if (char == '.') {
if (tmp.isEmpty) throw e
nameParts += tmp.mkString
tmp.clear()
} else {
tmp += char
}
}
i += 1
}
if (tmp.isEmpty || inBacktick) throw e
nameParts += tmp.mkString
nameParts.toSeq
}

/**
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
*
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ class DataFrame private[sql](
}

protected[sql] def resolve(colName: String): NamedExpression = {
queryExecution.analyzed.resolve(colName.split("\\."), sqlContext.analyzer.resolver).getOrElse {
queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
}
}

protected[sql] def numericColumns: Seq[Expression] = {
schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
queryExecution.analyzed.resolve(n.name.split("\\."), sqlContext.analyzer.resolver).get
queryExecution.analyzed.resolveQuoted(n.name, sqlContext.analyzer.resolver).get
}
}

Expand Down
27 changes: 27 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,33 @@ class DataFrameSuite extends QueryTest {
assert(complexData.filter(complexData("s")("key") === 1).count() == 1)
}

test("SPARK-7551: support backticks for DataFrame attribute resolution") {
val df = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
"""{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
checkAnswer(
df.select(df("`a.b`.c.`d..e`.`f`")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test case that includes spaces?

e.g.

`a  b`.c

and

a b.c

Row(1)
)

val df2 = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
"""{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil))
checkAnswer(
df2.select(df2("`a b`.c.d e.f")),
Row(1)
)

def checkError(testFun: => Unit): Unit = {
val e = intercept[org.apache.spark.sql.AnalysisException] {
testFun
}
assert(e.getMessage.contains("syntax error in attribute name:"))
}
checkError(df("`abc.`c`"))
checkError(df("`abc`..d"))
checkError(df("`a`.b."))
checkError(df("`a.b`.c.`d"))
}

test("SPARK-7324 dropDuplicates") {
val testData = TestSQLContext.sparkContext.parallelize(
(2, 1, 2) :: (1, 1, 1) ::
Expand Down