Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ case class ClusteredDistribution(
* Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
* stateful operator, only [[HashPartitioning]] (and HashPartitioning in
* [[PartitioningCollection]]) can satisfy this distribution.
*
* NOTE: This is applied only to stream-stream join as of now. For other stateful operators, we
* have been using ClusteredDistribution, which could construct the physical partitioning of the
* state in different way (ClusteredDistribution requires relaxed condition and multiple
* partitionings can satisfy the requirement.) We need to construct the way to fix this with
* minimizing possibility to break the existing checkpoints.
*
* TODO(SPARK-38204): address the issue explained in above note.
*/
case class StatefulOpClusteredDistribution(
expressions: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ case class FlatMapGroupsWithStateExec(
* to have the same grouping so that the data are co-lacated on the same task.
*/
override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
ClusteredDistribution(groupingAttributes, stateInfo.map(_.numPartitions)) ::
ClusteredDistribution(initialStateGroupAttrs, stateInfo.map(_.numPartitions)) ::
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ case class StateStoreRestoreExec(
override def outputPartitioning: Partitioning = child.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
if (keyExpressions.isEmpty) {
AllTuples :: Nil
} else {
Expand Down Expand Up @@ -493,6 +496,9 @@ case class StateStoreSaveExec(
override def outputPartitioning: Partitioning = child.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
if (keyExpressions.isEmpty) {
AllTuples :: Nil
} else {
Expand Down Expand Up @@ -573,6 +579,9 @@ case class SessionWindowStateStoreRestoreExec(
}

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
ClusteredDistribution(keyWithoutSessionExpressions, stateInfo.map(_.numPartitions)) :: Nil
}

Expand Down Expand Up @@ -684,6 +693,9 @@ case class SessionWindowStateStoreSaveExec(
override def outputPartitioning: Partitioning = child.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
}

Expand Down Expand Up @@ -741,8 +753,12 @@ case class StreamingDeduplicateExec(
extends UnaryExecNode with StateStoreWriter with WatermarkSupport {

/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
}

override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
Expand Down