From 54c42096ca3cf0808989ebc577749531ffe0214d Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 12 May 2015 14:49:55 +0800 Subject: [PATCH 1/4] fix 7551 --- .../catalyst/plans/logical/LogicalPlan.scala | 43 ++++++++++++++++++- .../org/apache/spark/sql/DataFrame.scala | 4 +- .../org/apache/spark/sql/DataFrameSuite.scala | 20 +++++++++ 3 files changed, 63 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index dbb12d56f9497..6146690e8483b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -105,7 +105,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { } /** - * Optionally resolves the given string to a [[NamedExpression]] using the input from all child + * Optionally resolves the given strings to a [[NamedExpression]] using the input from all child * nodes of this LogicalPlan. The attribute is expressed as * as string in the following form: `[scope].AttributeName.[nested].[fields]...`. */ @@ -116,7 +116,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { resolve(nameParts, children.flatMap(_.output), resolver, throwErrors) /** - * Optionally resolves the given string to a [[NamedExpression]] based on the output of this + * Optionally resolves the given strings to a [[NamedExpression]] based on the output of this * LogicalPlan. The attribute is expressed as string in the following form: * `[scope].AttributeName.[nested].[fields]...`. */ @@ -126,6 +126,45 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { throwErrors: Boolean = false): Option[NamedExpression] = resolve(nameParts, output, resolver, throwErrors) + def resolveQuoted( + name: String, + resolver: Resolver): Option[NamedExpression] = { + resolve(parseAttributeName(name), resolver, true) + } + + private def parseAttributeName(name: String) = { + val e = new AnalysisException(s"wrong syntax for attribute name: $name") + val nameParts = scala.collection.mutable.ArrayBuffer.empty[String] + val tmp = scala.collection.mutable.ArrayBuffer.empty[Char] + var hasBacktick = false + val it = name.iterator.buffered + while (it.hasNext) { + val char = it.next() + if (hasBacktick) { + if (char == '`') { + hasBacktick = false + if (it.hasNext && it.head != '.') throw e + } else { + tmp += char + } + } else { + if (char == '`') { + if (tmp.nonEmpty) throw e + hasBacktick = true + } else if (char == '.') { + if (tmp.isEmpty) throw e + nameParts += tmp.mkString + tmp.clear() + } else { + tmp += char + } + } + } + if (tmp.isEmpty || hasBacktick) throw e + nameParts += tmp.mkString + nameParts.toSeq + } + /** * Resolve the given `name` string against the given attribute, returning either 0 or 1 match. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 265a61592b943..e276075efe7f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -158,7 +158,7 @@ class DataFrame private[sql]( } protected[sql] def resolve(colName: String): NamedExpression = { - queryExecution.analyzed.resolve(colName.split("\\."), sqlContext.analyzer.resolver).getOrElse { + queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse { throw new AnalysisException( s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""") } @@ -166,7 +166,7 @@ class DataFrame private[sql]( protected[sql] def numericColumns: Seq[Expression] = { schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n => - queryExecution.analyzed.resolve(n.name.split("\\."), sqlContext.analyzer.resolver).get + queryExecution.analyzed.resolveQuoted(n.name, sqlContext.analyzer.resolver).get } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 2ade955864b71..c558f69d6fccd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -458,6 +458,26 @@ class DataFrameSuite extends QueryTest { assert(complexData.filter(complexData("s")("key") === 1).count() == 1) } + test("SPARK-7551: support backticks for DataFrame attribute resolution") { + val df = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD( + """{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil)) + checkAnswer( + df.select(df("`a.b`.c.`d..e`.`f`")), + Row(1) + ) + + def checkError(testFun: => Unit): Unit = { + val e = intercept[org.apache.spark.sql.AnalysisException] { + testFun + } + assert(e.getMessage.contains("wrong syntax for attribute name")) + } + checkError(df("`abc.`c`")) + checkError(df("`abc`..d")) + checkError(df("`a`.b.")) + checkError(df("`a.b`.c.`d")) + } + test("SPARK-7324 dropDuplicates") { val testData = TestSQLContext.sparkContext.parallelize( (2, 1, 2) :: (1, 1, 1) :: From e218d99bf5051fb1c16ddde0c0f4f3ee3907f083 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 12 May 2015 15:29:43 +0800 Subject: [PATCH 2/4] address comments --- .../catalyst/plans/logical/LogicalPlan.scala | 32 +++++++++++++------ .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 6146690e8483b..dd53649b626b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -126,31 +126,42 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { throwErrors: Boolean = false): Option[NamedExpression] = resolve(nameParts, output, resolver, throwErrors) + /** + * Given an attribute name, split it to name parts by dot, but + * don't split the name parts quoted by backticks, for example, + * `ab.cd`.`efg` should be split into two parts "ab.cd" and "efg". + */ def resolveQuoted( name: String, resolver: Resolver): Option[NamedExpression] = { resolve(parseAttributeName(name), resolver, true) } + /** + * Internal method, used to split attribute name by dot with backticks rule. + * Backticks must appear in pairs, and the quoted string must be a complete name part, + * which means `ab..c`e.f is not allowed. + * Escape character is not supported now, so we can't use backtick inside name part. + */ private def parseAttributeName(name: String) = { - val e = new AnalysisException(s"wrong syntax for attribute name: $name") + val e = new AnalysisException(s"syntax error in attribute name: $name") val nameParts = scala.collection.mutable.ArrayBuffer.empty[String] val tmp = scala.collection.mutable.ArrayBuffer.empty[Char] - var hasBacktick = false - val it = name.iterator.buffered - while (it.hasNext) { - val char = it.next() - if (hasBacktick) { + var inBacktick = false + var i = 0 + while (i < name.length) { + val char = name(i) + if (inBacktick) { if (char == '`') { - hasBacktick = false - if (it.hasNext && it.head != '.') throw e + inBacktick = false + if (i + 1 < name.length && name(i + 1) != '.') throw e } else { tmp += char } } else { if (char == '`') { if (tmp.nonEmpty) throw e - hasBacktick = true + inBacktick = true } else if (char == '.') { if (tmp.isEmpty) throw e nameParts += tmp.mkString @@ -159,8 +170,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { tmp += char } } + i += 1 } - if (tmp.isEmpty || hasBacktick) throw e + if (tmp.isEmpty || inBacktick) throw e nameParts += tmp.mkString nameParts.toSeq } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c558f69d6fccd..7a601b39b5483 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -470,7 +470,7 @@ class DataFrameSuite extends QueryTest { val e = intercept[org.apache.spark.sql.AnalysisException] { testFun } - assert(e.getMessage.contains("wrong syntax for attribute name")) + assert(e.getMessage.contains("syntax error in attribute name:")) } checkError(df("`abc.`c`")) checkError(df("`abc`..d")) From 2b8669948465e5ae82a4b205b30e4a8f2881fbe3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 13 May 2015 13:26:44 +0800 Subject: [PATCH 3/4] handle blank --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 3 ++- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index dd53649b626b3..299f4148aa81d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -143,7 +143,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { * which means `ab..c`e.f is not allowed. * Escape character is not supported now, so we can't use backtick inside name part. */ - private def parseAttributeName(name: String) = { + private def parseAttributeName(name: String): Seq[String] = { val e = new AnalysisException(s"syntax error in attribute name: $name") val nameParts = scala.collection.mutable.ArrayBuffer.empty[String] val tmp = scala.collection.mutable.ArrayBuffer.empty[Char] @@ -164,6 +164,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { inBacktick = true } else if (char == '.') { if (tmp.isEmpty) throw e + if (name(i - 1) != '`' && tmp.contains(' ')) throw e nameParts += tmp.mkString tmp.clear() } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 7a601b39b5483..879ff14de8553 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -466,6 +466,13 @@ class DataFrameSuite extends QueryTest { Row(1) ) + val df2 = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD( + """{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil)) + checkAnswer( + df2.select(df2("`a b`.c.`d e`.f")), + Row(1) + ) + def checkError(testFun: => Unit): Unit = { val e = intercept[org.apache.spark.sql.AnalysisException] { testFun @@ -476,6 +483,8 @@ class DataFrameSuite extends QueryTest { checkError(df("`abc`..d")) checkError(df("`a`.b.")) checkError(df("`a.b`.c.`d")) + checkError(df2("`a b`.c.d e.f")) + checkError(df2("a b.c.`d e`.f")) } test("SPARK-7324 dropDuplicates") { From e6f579eafcc25809ac68106ffbf595da4b9efe68 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 13 May 2015 14:04:12 +0800 Subject: [PATCH 4/4] allow space --- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 1 - .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 299f4148aa81d..dba69659afc80 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -164,7 +164,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { inBacktick = true } else if (char == '.') { if (tmp.isEmpty) throw e - if (name(i - 1) != '`' && tmp.contains(' ')) throw e nameParts += tmp.mkString tmp.clear() } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 879ff14de8553..ebfcf66ad72a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -469,7 +469,7 @@ class DataFrameSuite extends QueryTest { val df2 = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD( """{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil)) checkAnswer( - df2.select(df2("`a b`.c.`d e`.f")), + df2.select(df2("`a b`.c.d e.f")), Row(1) ) @@ -483,8 +483,6 @@ class DataFrameSuite extends QueryTest { checkError(df("`abc`..d")) checkError(df("`a`.b.")) checkError(df("`a.b`.c.`d")) - checkError(df2("`a b`.c.d e.f")) - checkError(df2("a b.c.`d e`.f")) } test("SPARK-7324 dropDuplicates") {