Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Oct 9, 2015

Currently, All windows function could generate wrong result in cluster sometimes.

The root cause is that AttributeReference is called in executor, then id of it may not be unique than others created in driver.

Here is the script that could reproduce the problem (run in local cluster):

from pyspark import SparkContext, HiveContext
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

sqlContext = HiveContext(SparkContext())
sqlContext.setConf("spark.sql.shuffle.partitions", "3")
df =  sqlContext.range(1<<20)
df2 = df.select((df.id % 1000).alias("A"), (df.id / 1000).alias('B'))
ws = Window.partitionBy(df2.A).orderBy(df2.B)
df3 = df2.select("client", "date", rowNumber().over(ws).alias("rn")).filter("rn < 0")
assert df3.count() == 0

@SparkQA
Copy link

SparkQA commented Oct 9, 2015

Test build #1869 has finished for PR 9050 at commit 35b7b22.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ChildProcAppHandle implements SparkAppHandle
    • abstract class LauncherConnection implements Closeable, Runnable
    • final class LauncherProtocol
    • static class Message implements Serializable
    • static class Hello extends Message
    • static class SetAppId extends Message
    • static class SetState extends Message
    • static class Stop extends Message
    • class LauncherServer implements Closeable
    • class NamedThreadFactory implements ThreadFactory
    • class OutputRedirector
    • public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable
    • /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */

@yhuai
Copy link
Contributor

yhuai commented Oct 9, 2015

We have another place to create the attribute references (https://github.com/davies/spark/blob/wrong_window/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala#L149). Also change this?

@SparkQA
Copy link

SparkQA commented Oct 9, 2015

Test build #1870 has finished for PR 9050 at commit 35b7b22.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ChildProcAppHandle implements SparkAppHandle
    • abstract class LauncherConnection implements Closeable, Runnable
    • final class LauncherProtocol
    • static class Message implements Serializable
    • static class Hello extends Message
    • static class SetAppId extends Message
    • static class SetState extends Message
    • static class Stop extends Message
    • class LauncherServer implements Closeable
    • class NamedThreadFactory implements ThreadFactory
    • class OutputRedirector

@SparkQA
Copy link

SparkQA commented Oct 10, 2015

Test build #43505 has finished for PR 9050 at commit 39b99b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Oct 10, 2015

Thank you for fixing it! How about we add a test in HiveSparkSubmitSuite? I guess we can reproduce the problem with local-cluster.

@hvanhovell
Copy link
Contributor

Good catch! Shouldn't we also backport this one into the 1.5 branch?

@davies @yhuai could one of you guys explain to me why/where this is causing problems? I have been looking at the Window.scala code and it doesn't seem problematic there, so I assume that it causes issues further down the line?

@yhuai
Copy link
Contributor

yhuai commented Oct 10, 2015

@hvanhovell Yes, we will backport it to 1.5 branch. So it will be fixed in 1.5.2.

Let me explain the cause. Every attribute reference has an exprId. If you do not explicitly assign this id (probably most of cases) when you create an attribute reference, you will get a unique id (see https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L27). However, if we create attribute references in both driver and executors, the uniqueness of the exprId will not be held anymore. So, we can see two attribute references representing two different columns having the same ids. Because our attribute binding work relies on the uniqueness of the exprId, once this property does not hold anymore, we will bind to wrong columns when evaluating expressions and generate wrong results.

@davies
Copy link
Contributor Author

davies commented Oct 12, 2015

@yhuai HiveSparkSubmitSuite could be an option to have a regression test, but it's very slow, and regression test will be tricky (also depends on number of partitions), I'd not have a regression for now.

@davies
Copy link
Contributor Author

davies commented Oct 12, 2015

@yhuai @hvanhovell I had changed to use BoundReference instead of Attribute

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it true that the input row of this projection has more elements than child.output? Maybe it is not very easy to understand this subtle change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we put all the windowResult on the right side of child.output, patchedWindowExpression will be pointed to them.

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43590 has finished for PR 9050 at commit 2d55882.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Oct 13, 2015

@davies I created a test based on the case you put in the description (yhuai@bc566fa). Seems it indeed fails. How about we add it in the PR?

@davies
Copy link
Contributor Author

davies commented Oct 13, 2015

@yhuai Thanks, pulled in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, sorry. Actually, can we do df3.rdd.count() to make sure we do materialize all results?

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43618 has finished for PR 9050 at commit 153140a.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. My bad.... This condition should be !=. Can we run this action a few more times to make it have a higher chance of throwing the exception when it is broken (say 10 times)? It should not make this test much more expensive because the most expensive part is creating the HiveContext.

(1 to 10).foreach { i =>
  val count = df3.rdd.count()
  if (count != 0) {
    throw new Exception(s"df3 should have 0 row. However $count rows got returned.")
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If failure should only happen for the first time (having the same id both in driver and in executor). We can increase the number of partitions to increase the change to fail.

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43621 has finished for PR 9050 at commit 89c1401.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43624 has finished for PR 9050 at commit 3aec389.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #1887 has finished for PR 9050 at commit 3aec389.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to throw an exception at here. Otherwise, even this assertion fails, the test will pass (because we are running an application at here).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it while I merging it.

@asfgit asfgit closed this in 6987c06 Oct 13, 2015
asfgit pushed a commit that referenced this pull request Oct 13, 2015
Currently, All windows function could generate wrong result in cluster sometimes.

The root cause is that AttributeReference is called in executor, then id of it may not be unique than others created in driver.

Here is the script that could reproduce the problem (run in local cluster):
```
from pyspark import SparkContext, HiveContext
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

sqlContext = HiveContext(SparkContext())
sqlContext.setConf("spark.sql.shuffle.partitions", "3")
df =  sqlContext.range(1<<20)
df2 = df.select((df.id % 1000).alias("A"), (df.id / 1000).alias('B'))
ws = Window.partitionBy(df2.A).orderBy(df2.B)
df3 = df2.select("client", "date", rowNumber().over(ws).alias("rn")).filter("rn < 0")
assert df3.count() == 0
```

Author: Davies Liu <[email protected]>
Author: Yin Huai <[email protected]>

Closes #9050 from davies/wrong_window.

Conflicts:
	sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants