Skip to content

Commit affbe32

Browse files
committed
[SPARK-9071][SQL] MonotonicallyIncreasingID and SparkPartitionID should be marked as nondeterministic.
I also took the chance to more explicitly define the semantics of deterministic. Author: Reynold Xin <[email protected]> Closes apache#7428 from rxin/non-deterministic and squashes the following commits: a760827 [Reynold Xin] [SPARK-9071][SQL] MonotonicallyIncreasingID and SparkPartitionID should be marked as nondeterministic.
1 parent 674eb2a commit affbe32

File tree

3 files changed

+14
-4
lines changed

3 files changed

+14
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,15 @@ abstract class Expression extends TreeNode[Expression] {
6161
def foldable: Boolean = false
6262

6363
/**
64-
* Returns true when the current expression always return the same result for fixed input values.
64+
* Returns true when the current expression always return the same result for fixed inputs from
65+
* children.
66+
*
67+
* Note that this means that an expression should be considered as non-deterministic if:
68+
* - if it relies on some mutable internal state, or
69+
* - if it relies on some implicit input that is not part of the children expression list.
70+
*
71+
* An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext.
6572
*/
66-
// TODO: Need to define explicit input values vs implicit input values.
6773
def deterministic: Boolean = true
6874

6975
def nullable: Boolean

sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
4141
*/
4242
@transient private[this] var count: Long = 0L
4343

44-
@transient private lazy val partitionMask = TaskContext.getPartitionId.toLong << 33
44+
@transient private lazy val partitionMask = TaskContext.getPartitionId().toLong << 33
45+
46+
override def deterministic: Boolean = false
4547

4648
override def nullable: Boolean = false
4749

sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ import org.apache.spark.sql.types.{IntegerType, DataType}
2929
*/
3030
private[sql] case object SparkPartitionID extends LeafExpression {
3131

32+
override def deterministic: Boolean = false
33+
3234
override def nullable: Boolean = false
3335

3436
override def dataType: DataType = IntegerType
3537

36-
@transient private lazy val partitionId = TaskContext.getPartitionId
38+
@transient private lazy val partitionId = TaskContext.getPartitionId()
3739

3840
override def eval(input: InternalRow): Int = partitionId
3941

0 commit comments

Comments
 (0)