Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ object ExternalCatalogUtils {
val index = partitionSchema.indexWhere(_.name == att.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
boundPredicate.initialize(0)

inputPartitions.filter { p =>
boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ object InterpretedPredicate {

case class InterpretedPredicate(expression: Expression) extends BasePredicate {
override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean]

override def initialize(partitionIndex: Int): Unit = {
expression.foreach {
case n: Nondeterministic => n.initialize(partitionIndex)
case _ =>
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ abstract class PartitioningAwareFileIndex(
val index = partitionColumns.indexWhere(a.name == _.name)
BoundReference(index, partitionColumns(index).dataType, nullable = true)
})
boundPredicate.initialize(0)

val selected = partitions.filter {
case PartitionPath(values, _) => boundPredicate.eval(values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2029,4 +2029,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
testData2.select(lit(7), 'a, 'b).orderBy(lit(1), lit(2), lit(3)),
Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2)))
}

test("SPARK-21746: nondeterministic expressions correctly for filter predicates") {
withTempPath { path =>
val p = path.getAbsolutePath
Seq(1 -> "a").toDF("a", "b").write.partitionBy("a").parquet(p)
val df = spark.read.parquet(p)
checkAnswer(df.filter(rand(10) <= 1.0).select($"a"), Row(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this test case will trigger InterpretedPredicate?

Copy link
Contributor Author

@heary-cao heary-cao Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

predicates is not empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, I tried to validate the spark 2.0.2 version, and it won't trigger InterpretedPredicate Exception.
spark 2.0.2 InterpretedPredicate.create

  def create(expression: Expression): (InternalRow => Boolean) = {
    expression.foreach {
      case n: Nondeterministic => n.setInitialValues()
      case _ =>
    }
    (r: InternalRow) => expression.eval(r).asInstanceOf[Boolean]
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this test without any exception. Are you sure this test can reproduce this issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, this exception comes from #18918 Unit testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test you add should trigger InterpretedPredicate as @cloud-fan mentioned. It should hit the code path you modified in this PR, not depending on another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry,I update the description of PR. thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this test expose the bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm updating to the latest version for validation.

Copy link
Contributor Author

@heary-cao heary-cao Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya, @cloud-fan
This trigger condition is associated with #18918. It will be more prone to this exception
The current spark master branch does not trigger this code path.
put this change on #18918 and close this PR.
thanks.

}
}
}