Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jul 11, 2018

What changes were proposed in this pull request?

When we use a reference from Dataset in filter or sort, which was not used in the prior select, an AnalysisException occurs, e.g.,

val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
df.select(df("name")).filter(df("id") === 0).show()
org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#6 missing from name#5 in operator !Filter (id#6 = 0).;;
!Filter (id#6 = 0)
   +- AnalysisBarrier
      +- Project [name#5]
         +- Project [_1#2 AS name#5, _2#3 AS id#6]
            +- LocalRelation [_1#2, _2#3]

This change updates the rule ResolveMissingReferences so Filter and Sort with non-empty missingInputs will also be transformed.

How was this patch tested?

Added tests.

@viirya
Copy link
Member Author

viirya commented Jul 11, 2018

cc @ueshin @cloud-fan

val maybeResolvedExprs = exprs.map(resolveExpression(_, p))
val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child)
val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs)
val missingAttrs = AttributeSet(newExprs) --
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also fix in Aggregate case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might miss something, but how about val missingAttrs = AttributeSet(newExprs) -- p.outputSet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Aggregate, I've tested it. Seems ResolveAggregateFunctions already covers it.

Copy link
Member Author

@viirya viirya Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think using p.outputSet is simpler. Will update later.

// but a valid query like `df.select(df("name")).filter(df("id") === 0)` can make a query
// like this.
val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
val plan = Project(Stream(AttributeReference("b", IntegerType, nullable = true)()), relation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Stream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No special reason. Just following above test case.

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92851 has finished for PR 21745 at commit 97837a4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved
lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved &&
missingInput.isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missingInput is special, mostly we can't resolve it. I think that's why we didn't consider it in the resolved at the first place.

We can update the if condition in ResolveMissingReferences to take missingInput into consideration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I found that this change causes one test failure.

@gatorsmile
Copy link
Member

Which PR caused this regression?

CC @jerryshao We need to block 2.3.2 release before addressing this issue

val maybeResolvedExprs = exprs.map(resolveExpression(_, p))
val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child)
val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs)
// The resolved attributes might not come from `p.child`. Need to filter it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can this happen? if the resolved attributes do not exist in child, then the plan is invalid, isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, this case was resolved in ResolveMissingReferences in spark-v2.2.

@viirya
Copy link
Member Author

viirya commented Jul 12, 2018 via email

@maropu
Copy link
Member

maropu commented Jul 12, 2018

@gatorsmile It seems the AnalysisBarrier commit causes this error, so v2.2 does not have this issue;


scala> df.select(df("name")).filter(df("id") === 0).explain(true)
== Parsed Logical Plan ==
!Filter (id#26 = 0)
+- Project [name#25]
   +- Project [_1#22 AS name#25, _2#23 AS id#26]
      +- LocalRelation [_1#22, _2#23]

== Analyzed Logical Plan ==
name: string
Project [name#25]
+- Filter (id#26 = 0)
   +- Project [name#25, id#26]
      +- Project [_1#22 AS name#25, _2#23 AS id#26]
         +- LocalRelation [_1#22, _2#23]
...

=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences ===
!!Filter (id#26 = 0)                                Project [name#25]
!+- Project [name#25]                               +- Filter (id#26 = 0)
!   +- Project [_1#22 AS name#25, _2#23 AS id#26]      +- Project [name#25, id#26]
!      +- LocalRelation [_1#22, _2#23]                    +- Project [_1#22 AS name#25, _2#23 AS id#26]
!                                                            +- LocalRelation [_1#22, _2#23]

@gatorsmile
Copy link
Member

We might need to get rid of AnalysisBarrier in the next release. This already caused at least three regressions in 2.3

@viirya
Copy link
Member Author

viirya commented Jul 12, 2018

I tried to checkout the commit 82183f7 which is before AnalysisBarrier commit.

scala> val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
18/07/12 05:36:52 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [name: string, id: int]

scala> df.select(df("name")).filter(df("id") === 0).show()
org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#6 missing from name#5 in operator !Filter (id#6 = 0).;;
!Filter (id#6 = 0)
+- Project [name#5]
   +- Project [_1#2 AS name#5, _2#3 AS id#6]
      +- LocalRelation [_1#2, _2#3]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:89)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:291)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:53)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:168)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:174)
  at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
  at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:3240)
  at org.apache.spark.sql.Dataset.filter(Dataset.scala:1403)
  ... 49 elided

Looks like it is already failed at that time?

@ueshin
Copy link
Member

ueshin commented Jul 12, 2018

Actually, the very first time we introduced this regression was at 7463a88.
We added !f.resolved && in ResolveMissingReferences rule there, but after that the problem became complicated because we added the AnalysisBarrier and refactored some times.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92911 has finished for PR 21745 at commit b99d0c7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child)
val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs)
// Only add missing attributes coming from `newChild`.
val missingAttrs = (AttributeSet(newExprs) -- p.outputSet).intersect(newChild.outputSet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a second time, but we need to fix in Aggregate case? The logic seems completely different. Or can we remove Aggregate case if ResolveAggregateFunctions can handle this? I don't think we have any reason to keep a wrong logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think it's better to have a re-producible test case before changing Aggregate case. I'm trying to create a test case for it. Then it can be more confident to change Aggregate case.

Actually I found another place we need to fix. Seems we don't have enough test coverage for similar features.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic gets convoluted here and we need to add comments. Basically we need to explain when we should expand the project list.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92917 has finished for PR 21745 at commit eff3af2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class JavaSummarizerExample
  • trait ComplexTypeMergingExpression extends Expression
  • case class Size(child: Expression) extends UnaryExpression with ExpectsInputTypes
  • case class MapConcat(children: Seq[Expression]) extends Expression
  • case class StreamingGlobalLimitStrategy(outputMode: OutputMode) extends Strategy
  • case class StreamingGlobalLimitExec(
  • sealed trait MultipleWatermarkPolicy
  • case class WatermarkTracker(policy: MultipleWatermarkPolicy) extends Logging
  • trait MemorySinkBase extends BaseStreamingSink
  • class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink
  • class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode)
  • class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode)

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92918 has finished for PR 21745 at commit 6eda8d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92920 has finished for PR 21745 at commit 8432b00.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* This method tries to resolve expressions and find missing attributes recursively. Specially,
* when the expressions used in `Sort` or `Filter` contain unresolved attributes or resolved
* attributes which are missed from SELECT clause. This method tries to find the missing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which are missed from child output

val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs)
// If some attributes used by expressions are resolvable only on the rewritten child
// plan, we need to add them into original projection.
val missingAttrs = (AttributeSet(newExprs) -- p.outputSet).intersect(newChild.outputSet)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we do not do the .intersect(newChild.outputSet)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this intersect, some tests fail, e.g., group-analytics.sql in SQLQueryTestSuite. Some attributes are resolved on parent plans, not on child plans. We can't add them as missing attributes here.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92935 has finished for PR 21745 at commit 860d433.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM

val sort2 = df.select(col("name")).orderBy(col("id"))
checkAnswer(sort1, sort2.collect())

withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case should be split to two.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update it in next commit.

@gatorsmile
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Jul 13, 2018

Test build #92958 has finished for PR 21745 at commit a98f416.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 13, 2018

Test build #92961 has finished for PR 21745 at commit 9e00db9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 13, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 13, 2018

Test build #92964 has finished for PR 21745 at commit 9e00db9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master/2.3

asfgit pushed a commit that referenced this pull request Jul 13, 2018
…t not work

## What changes were proposed in this pull request?

When we use a reference from Dataset in filter or sort, which was not used in the prior select, an AnalysisException occurs, e.g.,

```scala
val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
df.select(df("name")).filter(df("id") === 0).show()
```

```scala
org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#6 missing from name#5 in operator !Filter (id#6 = 0).;;
!Filter (id#6 = 0)
   +- AnalysisBarrier
      +- Project [name#5]
         +- Project [_1#2 AS name#5, _2#3 AS id#6]
            +- LocalRelation [_1#2, _2#3]
```
This change updates the rule `ResolveMissingReferences` so `Filter` and `Sort` with non-empty `missingInputs` will also be transformed.

## How was this patch tested?

Added tests.

Author: Liang-Chi Hsieh <[email protected]>

Closes #21745 from viirya/SPARK-24781.

(cherry picked from commit dfd7ac9)
Signed-off-by: Xiao Li <[email protected]>
@asfgit asfgit closed this in dfd7ac9 Jul 13, 2018
@viirya viirya deleted the SPARK-24781 branch December 27, 2023 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants