From da832dad6b1601eb31595005dd72fa93715b9a60 Mon Sep 17 00:00:00 2001 From: Vladimir Golubev Date: Tue, 24 Dec 2024 22:57:35 +0000 Subject: [PATCH] Substitute LocalRelation with ComparableLocalRelation in NormalizePlan --- .../sql/catalyst/plans/NormalizePlan.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala index 1cc876588550..ee68e433fbea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.GetViewColumnByNameAndOrdinal import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.connector.read.streaming.SparkDataStream object NormalizePlan extends PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = @@ -104,6 +105,8 @@ object NormalizePlan extends PredicateHelper { case Project(projectList, child) => Project(normalizeProjectList(projectList), child) case c: KeepAnalyzedQuery => c.storeAnalyzedQuery() + case localRelation: LocalRelation => + ComparableLocalRelation.fromLocalRelation(localRelation) } } @@ -134,3 +137,33 @@ object NormalizePlan extends PredicateHelper { case _ => condition // Don't reorder. } } + +/** + * A substitute for the [[LocalRelation]] that has comparable `data` field. [[LocalRelation]]'s + * `data` is incomparable for maps, because [[ArrayBasedMapData]] doesn't define [[equals]]. + */ +case class ComparableLocalRelation( + override val output: Seq[Attribute], + data: Seq[Seq[Expression]], + override val isStreaming: Boolean, + stream: Option[SparkDataStream]) extends LeafNode + +object ComparableLocalRelation { + def fromLocalRelation(localRelation: LocalRelation): ComparableLocalRelation = { + val dataTypes = localRelation.output.map(_.dataType) + ComparableLocalRelation( + output = localRelation.output, + data = localRelation.data.map { row => + if (row != null) { + row.toSeq(dataTypes).zip(dataTypes).map { + case (value, dataType) => Literal(value, dataType) + } + } else { + Seq.empty + } + }, + isStreaming = localRelation.isStreaming, + stream = localRelation.stream + ) + } +}