From 86526befca0ad74bb3e48720c90d43ca888e5353 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 14 Feb 2022 19:43:17 +0900 Subject: [PATCH 1/3] [SPARK-38124][SS][FOLLOWUP] Document the current challenge on fixing distribution of stateful operator --- .../catalyst/plans/physical/partitioning.scala | 8 ++++++++ .../streaming/FlatMapGroupsWithStateExec.scala | 3 +++ .../streaming/statefulOperators.scala | 18 +++++++++++++++++- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 4418d3253a8b..3d868cef2f46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -101,6 +101,14 @@ case class ClusteredDistribution( * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the * stateful operator, only [[HashPartitioning]] (and HashPartitioning in * [[PartitioningCollection]]) can satisfy this distribution. + * + * NOTE: This is applied only stream-stream join as of now. For other stateful operators, we have + * been using ClusteredDistribution, which could construct the physical partitioning of the state + * as different way. (ClusteredDistribution requires relaxed condition and multiple + * partitionings can satisfy the requirement.) We need to construct the way to fix this with + * minimizing possibility to break the existing checkpoints. + * + * TODO: SPARK-38204 to address above note. */ case class StatefulOpClusteredDistribution( expressions: Seq[Expression], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index a00a62216f3d..47576e72805b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -93,6 +93,9 @@ case class FlatMapGroupsWithStateExec( * to have the same grouping so that the data are co-lacated on the same task. */ override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO: SPARK-38204 ClusteredDistribution(groupingAttributes, stateInfo.map(_.numPartitions)) :: ClusteredDistribution(initialStateGroupAttrs, stateInfo.map(_.numPartitions)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 3431823765c1..59e120fb3420 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -334,6 +334,9 @@ case class StateStoreRestoreExec( override def outputPartitioning: Partitioning = child.outputPartitioning override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO: SPARK-38204 if (keyExpressions.isEmpty) { AllTuples :: Nil } else { @@ -493,6 +496,9 @@ case class StateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO: SPARK-38204 if (keyExpressions.isEmpty) { AllTuples :: Nil } else { @@ -573,6 +579,9 @@ case class SessionWindowStateStoreRestoreExec( } override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO: SPARK-38204 ClusteredDistribution(keyWithoutSessionExpressions, stateInfo.map(_.numPartitions)) :: Nil } @@ -684,6 +693,9 @@ case class SessionWindowStateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO: SPARK-38204 ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil } @@ -741,8 +753,12 @@ case class StreamingDeduplicateExec( extends UnaryExecNode with StateStoreWriter with WatermarkSupport { /** Distribute by grouping attributes */ - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { + // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution + // before making any changes. + // TODO: SPARK-38204 ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil + } override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver From 82575eb9daede8f148b4fcad12ad042fc5afae0d Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 14 Feb 2022 19:52:04 +0900 Subject: [PATCH 2/3] fix --- .../apache/spark/sql/catalyst/plans/physical/partitioning.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 3d868cef2f46..e3e81a627819 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -104,7 +104,7 @@ case class ClusteredDistribution( * * NOTE: This is applied only stream-stream join as of now. For other stateful operators, we have * been using ClusteredDistribution, which could construct the physical partitioning of the state - * as different way. (ClusteredDistribution requires relaxed condition and multiple + * in different way. (ClusteredDistribution requires relaxed condition and multiple * partitionings can satisfy the requirement.) We need to construct the way to fix this with * minimizing possibility to break the existing checkpoints. * From 93556b9ae66ca9b8ece382c47ced28fc2f686600 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 15 Feb 2022 05:35:34 +0900 Subject: [PATCH 3/3] fix --- .../sql/catalyst/plans/physical/partitioning.scala | 8 ++++---- .../streaming/FlatMapGroupsWithStateExec.scala | 2 +- .../sql/execution/streaming/statefulOperators.scala | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index e3e81a627819..5342c8ee6d67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -102,13 +102,13 @@ case class ClusteredDistribution( * stateful operator, only [[HashPartitioning]] (and HashPartitioning in * [[PartitioningCollection]]) can satisfy this distribution. * - * NOTE: This is applied only stream-stream join as of now. For other stateful operators, we have - * been using ClusteredDistribution, which could construct the physical partitioning of the state - * in different way. (ClusteredDistribution requires relaxed condition and multiple + * NOTE: This is applied only to stream-stream join as of now. For other stateful operators, we + * have been using ClusteredDistribution, which could construct the physical partitioning of the + * state in different way (ClusteredDistribution requires relaxed condition and multiple * partitionings can satisfy the requirement.) We need to construct the way to fix this with * minimizing possibility to break the existing checkpoints. * - * TODO: SPARK-38204 to address above note. + * TODO(SPARK-38204): address the issue explained in above note. */ case class StatefulOpClusteredDistribution( expressions: Seq[Expression], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index 47576e72805b..93ed5916bfb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -95,7 +95,7 @@ case class FlatMapGroupsWithStateExec( override def requiredChildDistribution: Seq[Distribution] = { // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution // before making any changes. - // TODO: SPARK-38204 + // TODO(SPARK-38204) ClusteredDistribution(groupingAttributes, stateInfo.map(_.numPartitions)) :: ClusteredDistribution(initialStateGroupAttrs, stateInfo.map(_.numPartitions)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 59e120fb3420..3ab2ad47e98c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -336,7 +336,7 @@ case class StateStoreRestoreExec( override def requiredChildDistribution: Seq[Distribution] = { // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution // before making any changes. - // TODO: SPARK-38204 + // TODO(SPARK-38204) if (keyExpressions.isEmpty) { AllTuples :: Nil } else { @@ -498,7 +498,7 @@ case class StateStoreSaveExec( override def requiredChildDistribution: Seq[Distribution] = { // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution // before making any changes. - // TODO: SPARK-38204 + // TODO(SPARK-38204) if (keyExpressions.isEmpty) { AllTuples :: Nil } else { @@ -581,7 +581,7 @@ case class SessionWindowStateStoreRestoreExec( override def requiredChildDistribution: Seq[Distribution] = { // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution // before making any changes. - // TODO: SPARK-38204 + // TODO(SPARK-38204) ClusteredDistribution(keyWithoutSessionExpressions, stateInfo.map(_.numPartitions)) :: Nil } @@ -695,7 +695,7 @@ case class SessionWindowStateStoreSaveExec( override def requiredChildDistribution: Seq[Distribution] = { // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution // before making any changes. - // TODO: SPARK-38204 + // TODO(SPARK-38204) ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil } @@ -756,7 +756,7 @@ case class StreamingDeduplicateExec( override def requiredChildDistribution: Seq[Distribution] = { // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution // before making any changes. - // TODO: SPARK-38204 + // TODO(SPARK-38204) ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil }