Skip to content

Commit 349a2d2

Browse files
committed
nondeterministic expressions incorrectly for filter predicates
1 parent 8321c14 commit 349a2d2

File tree

4 files changed

+18
-0
lines changed

4 files changed

+18
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ object ExternalCatalogUtils {
153153
val index = partitionSchema.indexWhere(_.name == att.name)
154154
BoundReference(index, partitionSchema(index).dataType, nullable = true)
155155
})
156+
boundPredicate.initialize(0)
156157

157158
inputPartitions.filter { p =>
158159
boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ object InterpretedPredicate {
3636

3737
case class InterpretedPredicate(expression: Expression) extends BasePredicate {
3838
override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean]
39+
40+
override def initialize(partitionIndex: Int): Unit = {
41+
expression.foreach {
42+
case n: Nondeterministic => n.initialize(partitionIndex)
43+
case _ =>
44+
}
45+
}
3946
}
4047

4148
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ abstract class PartitioningAwareFileIndex(
175175
val index = partitionColumns.indexWhere(a.name == _.name)
176176
BoundReference(index, partitionColumns(index).dataType, nullable = true)
177177
})
178+
boundPredicate.initialize(0)
178179

179180
val selected = partitions.filter {
180181
case PartitionPath(values, _) => boundPredicate.eval(values)

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2029,4 +2029,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
20292029
testData2.select(lit(7), 'a, 'b).orderBy(lit(1), lit(2), lit(3)),
20302030
Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2)))
20312031
}
2032+
2033+
test("SPARK-21746: nondeterministic expressions correctly for filter predicates") {
2034+
withTempPath { path =>
2035+
val p = path.getAbsolutePath
2036+
Seq(1 -> "a").toDF("a", "b").write.partitionBy("a").parquet(p)
2037+
val df = spark.read.parquet(p)
2038+
checkAnswer(df.filter(rand(10) <= 1.0).select($"a"), Row(1))
2039+
}
2040+
}
20322041
}

0 commit comments

Comments
 (0)