From ad49a311815c77734c26cd3e21b8505103200481 Mon Sep 17 00:00:00 2001 From: Nick Young Date: Mon, 29 Apr 2024 11:22:45 -0700 Subject: [PATCH 1/6] Reduce memory pressure of empty TreeNode tags --- .../spark/sql/catalyst/trees/TreeNode.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 94e893d468b39..e6133c4db3c43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -75,11 +75,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] override val origin: Origin = CurrentOrigin.get - /** - * A mutable map for holding auxiliary information of this tree node. It will be carried over - * when this node is copied via `makeCopy`, or transformed via `transformUp`/`transformDown`. - */ - private val tags: mutable.Map[TreeNodeTag[_], Any] = mutable.Map.empty + private[this] var _tags: mutable.Map[TreeNodeTag[_], Any] = null + private def tags: mutable.Map[TreeNodeTag[_], Any] = { + if (_tags eq null) { + _tags = mutable.Map.empty + } + _tags + } /** * Default tree pattern [[BitSet] for a [[TreeNode]]. @@ -151,7 +153,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] // SPARK-32753: it only makes sense to copy tags to a new node // but it's too expensive to detect other cases likes node removal // so we make a compromise here to copy tags to node with no tags - if (tags.isEmpty) { + if ((_tags eq null) || _tags.isEmpty) { tags ++= other.tags } } @@ -161,7 +163,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] } def getTagValue[T](tag: TreeNodeTag[T]): Option[T] = { - tags.get(tag).map(_.asInstanceOf[T]) + if (_tags eq null) { + None + } else { + tags.get(tag).map(_.asInstanceOf[T]) + } } def unsetTagValue[T](tag: TreeNodeTag[T]): Unit = { From 70dfc02aa9e498ccfd53a7a7f3e6f54badddfcdf Mon Sep 17 00:00:00 2001 From: Nick Young Date: Mon, 29 Apr 2024 11:50:14 -0700 Subject: [PATCH 2/6] comments --- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index e6133c4db3c43..94a82faa1f608 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -75,6 +75,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] override val origin: Origin = CurrentOrigin.get + /** + * A mutable map for holding auxiliary information of this tree node. It will be carried over + * when this node is copied via `makeCopy`, or transformed via `transformUp`/`transformDown`. + */ private[this] var _tags: mutable.Map[TreeNodeTag[_], Any] = null private def tags: mutable.Map[TreeNodeTag[_], Any] = { if (_tags eq null) { @@ -158,6 +162,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] } } + def resetTags(): Unit = { + if (!(_tags eq null)) { + tags.clear() + } + } + def setTagValue[T](tag: TreeNodeTag[T], value: T): Unit = { tags(tag) = value } @@ -171,7 +181,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] } def unsetTagValue[T](tag: TreeNodeTag[T]): Unit = { - tags -= tag + if (!(_tags eq null)) { + tags -= tag + } } /** From dbf703d2f3fed2c84df8b70b7645db68e1acd4ff Mon Sep 17 00:00:00 2001 From: Nick Young Date: Mon, 29 Apr 2024 11:51:36 -0700 Subject: [PATCH 3/6] comment --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 94a82faa1f608..5b20c1587d910 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -78,6 +78,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] /** * A mutable map for holding auxiliary information of this tree node. It will be carried over * when this node is copied via `makeCopy`, or transformed via `transformUp`/`transformDown`. + * We lazily evaluate the `tags` since the default size of a `mutable.Map` is nonzero. This + * will reduce unnecessary memory pressure. */ private[this] var _tags: mutable.Map[TreeNodeTag[_], Any] = null private def tags: mutable.Map[TreeNodeTag[_], Any] = { From 866c02826b4ed062284487ead23a028b27dfffe1 Mon Sep 17 00:00:00 2001 From: Nick Young Date: Mon, 29 Apr 2024 11:52:13 -0700 Subject: [PATCH 4/6] comment --- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 5b20c1587d910..3471e0a24dfd0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -164,12 +164,6 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] } } - def resetTags(): Unit = { - if (!(_tags eq null)) { - tags.clear() - } - } - def setTagValue[T](tag: TreeNodeTag[T], value: T): Unit = { tags(tag) = value } From 6303972595753fba3682a01aa2250a0eab0ac2ca Mon Sep 17 00:00:00 2001 From: Nick Young Date: Mon, 29 Apr 2024 11:58:19 -0700 Subject: [PATCH 5/6] comment --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 3471e0a24dfd0..fed0ffb19c522 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -177,7 +177,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] } def unsetTagValue[T](tag: TreeNodeTag[T]): Unit = { - if (!(_tags eq null)) { + if (_tags ne null) { tags -= tag } } From 09c93e41cfec356ab460a167adf51dbec43a4053 Mon Sep 17 00:00:00 2001 From: Nick Young Date: Mon, 29 Apr 2024 12:21:22 -0700 Subject: [PATCH 6/6] isEmpty --- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index fed0ffb19c522..dd39f3182bfb8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -155,11 +155,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] ineffectiveRules.get(ruleId.id) } + def isTagsEmpty: Boolean = (_tags eq null) || _tags.isEmpty + def copyTagsFrom(other: BaseType): Unit = { // SPARK-32753: it only makes sense to copy tags to a new node // but it's too expensive to detect other cases likes node removal // so we make a compromise here to copy tags to node with no tags - if ((_tags eq null) || _tags.isEmpty) { + if (isTagsEmpty && !other.isTagsEmpty) { tags ++= other.tags } } @@ -169,7 +171,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] } def getTagValue[T](tag: TreeNodeTag[T]): Option[T] = { - if (_tags eq null) { + if (isTagsEmpty) { None } else { tags.get(tag).map(_.asInstanceOf[T]) @@ -177,7 +179,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] } def unsetTagValue[T](tag: TreeNodeTag[T]): Unit = { - if (_tags ne null) { + if (!isTagsEmpty) { tags -= tag } }