diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index f8929530c5036..7e204ec88aa1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -146,6 +146,8 @@ case class Window( // to the result of bound value projection. This is done manually because we want to use // Code Generation (if it is enabled). val (sortExprs, schema) = exprs.map { case e => + // This AttributeReference does not need to have unique IDs, it's OK to be called + // in executor. val ref = AttributeReference("ordExpr", e.dataType, e.nullable)() (SortOrder(ref, e.direction), ref) }.unzip @@ -201,18 +203,17 @@ case class Window( * This method uses Code Generation. It can only be used on the executor side. * * @param expressions unbound ordered function expressions. + * @param attributes output attributes * @return the final resulting projection. */ private[this] def createResultProjection( - expressions: Seq[Expression]): MutableProjection = { - val unboundToAttr = expressions.map { - e => (e, AttributeReference("windowResult", e.dataType, e.nullable)()) - } - val unboundToAttrMap = unboundToAttr.toMap + expressions: Seq[Expression], + attributes: Seq[Attribute]): MutableProjection = { + val unboundToAttrMap = expressions.zip(attributes).toMap val patchedWindowExpression = windowExpression.map(_.transform(unboundToAttrMap)) newMutableProjection( projectList ++ patchedWindowExpression, - child.output ++ unboundToAttr.map(_._2))() + child.output ++ attributes)() } protected override def doExecute(): RDD[InternalRow] = { @@ -247,12 +248,17 @@ case class Window( factories(index) = () => createFrameProcessor(frame, functions, ordinal) } + // AttributeReference can only be created in driver, or the id will not be unique + val outputAttributes = unboundExpressions.map { + e => AttributeReference("windowResult", e.dataType, e.nullable)() + } + // Start processing. child.execute().mapPartitions { stream => new Iterator[InternalRow] { // Get all relevant projections. - val result = createResultProjection(unboundExpressions) + val result = createResultProjection(unboundExpressions, outputAttributes) val grouping = if (child.outputsUnsafeRows) { UnsafeProjection.create(partitionSpec, child.output) } else {