Skip to content

Conversation

@heary-cao
Copy link
Contributor

@heary-cao heary-cao commented Aug 16, 2017

What changes were proposed in this pull request?

Currently, We do interpretedpredicate optimization, but not very well, because when our filter contained an nondeterminate expression.
in spark-shell. execute the following SQL statement:

val path = "/home/spark/files/7"
Seq(1 -> "a").toDF("a", "b").write.partitionBy("a").parquet(path)
val df = spark.read.parquet(path)
df.filter(rand(10) <= 1.0).select($"a").show

Spark throws an exceptions java.lang.IllegalArgumentException:

java.lang.IllegalArgumentException: requirement failed: Nondeterministic expression org.apache.spark.sql.catalyst.expressions.Rand should be initialized before eval.
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.sql.catalyst.expressions.Nondeterministic$class.eval(Expression.scala:291)
  at org.apache.spark.sql.catalyst.expressions.RDG.eval(randomExpressions.scala:34)
  at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:415)
  at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:38)
  at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$10.apply(PartitioningAwareFileIndex.scala:180)
  at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$10.apply(PartitioningAwareFileIndex.scala:179)
  at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
  at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
  at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
  at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.prunePartitions(PartitioningAwareFileIndex.scala:179)
  at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listFiles(PartitioningAwareFileIndex.scala:64)
  at org.apache.spark.sql.execution.FileSourceScanExec.org$apache$spark$sql$execution$FileSourceScanExec$$selectedPartitions$lzycompute(DataSourceScanExec.scala:180)
  at org.apache.spark.sql.execution.FileSourceScanExec.org$apache$spark$sql$execution$FileSourceScanExec$$selectedPartitions(DataSourceScanExec.scala:177)
  at org.apache.spark.sql.execution.FileSourceScanExec$$anonfun$21.apply(DataSourceScanExec.scala:279)
  at org.apache.spark.sql.execution.FileSourceScanExec$$anonfun$21.apply(DataSourceScanExec.scala:278)
  at scala.Option.map(Option.scala:146)
  at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:278)
  at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:106)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3031)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2344)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2557)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:671)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:630)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:639)

This PR describes solving this problem by adding the initialize method in InterpretedPredicate.

How was this patch tested?

Should be covered existing test cases and add new test cases.

@heary-cao heary-cao changed the title [SQL]nondeterministic expressions correctly for filter predicates [SPARK-21746][SQL]nondeterministic expressions correctly for filter predicates Aug 16, 2017
@heary-cao heary-cao changed the title [SPARK-21746][SQL]nondeterministic expressions correctly for filter predicates [SPARK-21746][SQL]nondeterministic expressions incorrectly for filter predicates Aug 16, 2017
@heary-cao heary-cao changed the title [SPARK-21746][SQL]nondeterministic expressions incorrectly for filter predicates [SPARK-21746][SQL]there is an java.lang.IllegalArgumentException when the filter contains nondeterminate expressions Aug 17, 2017
@heary-cao
Copy link
Contributor Author

cc @gatorsmile @cloud-fan

@gatorsmile
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Aug 18, 2017

Test build #80810 has finished for PR 18961 at commit a1fd5e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@heary-cao
Copy link
Contributor Author

@cloud-fan @gatorsmile
Could you take a look again?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to turn off this? It looks irrelevant to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha,you are right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 is not a partitionIndex and 0 will go to n.initialize(0) in line 46. Is it correct?

Copy link
Contributor Author

@heary-cao heary-cao Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we not need InterpretedPredicate.initialize ? and modify object InterpretedPredicate.create

  def create(expression: Expression): InterpretedPredicate = {
    expression.foreach {
      case n: Nondeterministic => n.initialize(0)
      case _ =>
    }
    new InterpretedPredicate(expression)
  }

Copy link
Contributor Author

@heary-cao heary-cao Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consideration was the consistency of class InterpretedPredicate, so add initialize method of the class InterpretedPredicate.

@gatorsmile
Copy link
Member

Will review it tonight.

@SparkQA
Copy link

SparkQA commented Aug 23, 2017

Test build #81011 has finished for PR 18961 at commit 954f0a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this test case will trigger InterpretedPredicate?

Copy link
Contributor Author

@heary-cao heary-cao Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

predicates is not empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, I tried to validate the spark 2.0.2 version, and it won't trigger InterpretedPredicate Exception.
spark 2.0.2 InterpretedPredicate.create

  def create(expression: Expression): (InternalRow => Boolean) = {
    expression.foreach {
      case n: Nondeterministic => n.setInitialValues()
      case _ =>
    }
    (r: InternalRow) => expression.eval(r).asInstanceOf[Boolean]
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this test without any exception. Are you sure this test can reproduce this issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, this exception comes from #18918 Unit testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test you add should trigger InterpretedPredicate as @cloud-fan mentioned. It should hit the code path you modified in this PR, not depending on another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry,I update the description of PR. thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this test expose the bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm updating to the latest version for validation.

Copy link
Contributor Author

@heary-cao heary-cao Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya, @cloud-fan
This trigger condition is associated with #18918. It will be more prone to this exception
The current spark master branch does not trigger this code path.
put this change on #18918 and close this PR.
thanks.

@heary-cao
Copy link
Contributor Author

@dongjoon-hyun @cloud-fan Do you have any suggestions?

@cloud-fan
Copy link
Contributor

Good catch! I think we do need InterpretedPredicate.initialize, but I don't think we should call initialize in InterpretedPredicate.create. Can you check out where we create InterpretedPredicate and call initialize there?

@heary-cao
Copy link
Contributor Author

@cloud-fan, Thank you for your suggest. I found 4 calls to InterpretedPredicate.create and modify it. Can you take a look again if you have time?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partition index is not always 0.

Copy link
Member

@viirya viirya Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the few places calling newPredicate. They are either already do initialize properly, or don't need to do it. So I think we don't need to do initialize here like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The predicate here can only be sources.GreaterThan. It can't include non-deterministic expressions.

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #81203 has finished for PR 18961 at commit 8be86e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #81210 has finished for PR 18961 at commit 349a2d2.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@heary-cao heary-cao closed this Aug 29, 2017
@gatorsmile
Copy link
Member

Any reason you closed it?

@gatorsmile
Copy link
Member

You do not need to open multiple PRs for the same issue.

@heary-cao
Copy link
Contributor Author

heary-cao commented Aug 31, 2017

@gatorsmile , This should be a problem for code execution, semantics, and consistency.
This trigger condition: FileSourceScanExec.partitionFilters is not null and contains nondeterministic function. however, the current spark master branch does not trigger this code path. because PhysicalOperation excluded that the condition is nondeterministic of Filter.
Similar spark sql: SELECT t1.i3 from tableorc t1 where rand(10) <= 0.5
the current spark master branch executed Plan:

*Project [i3#0]
+- *Filter (rand(10) <= 0.5)
   +- *FileScan orc default.tableorc[i3#0,i4#1,i5#2,i6#3,i7#4,i8#5,i9#6,i10#7,i11#8,s2#9,s3#10,s4#11,s5#12,s6#13,s7#14,d2#15,d3#16,d4#17,d5#18,d6#19,d7#20,i2#21] Batched: false, Format: ORC, Location: CatalogFileIndex[file:/home/cxw/spark/bin/spark-warehouse/tableorc], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i3:int,i4:int,i5:int,i6:int,i7:int,i8:int,i9:int,i10:int,i11:int,s2:string,s3:string,s4:st...

PartitionFilters is [] , instead of [(rand(10) <= 0.5)].

Thus, this trigger condition is associated with #18918. it will be more prone to this exception.
#18918 PR executed Plan:

*Project [i3#0]
+- *Filter (rand(10) <= 0.5)
   +- *FileScan orc default.tableorc[i3#0,i2#21] Batched: false, Format: ORC, Location: PrunedInMemoryFileIndex[file:/home/cxw/spark/bin/spark-warehouse/tableorc/i2=0], PartitionCount: 1, PartitionFilters: [(rand(10) <= 0.5)], PushedFilters: [], ReadSchema: struct<i3:int>

PartitionFilters is [(rand(10) <= 0.5)]
thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants