Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ case class Window(
// to the result of bound value projection. This is done manually because we want to use
// Code Generation (if it is enabled).
val (sortExprs, schema) = exprs.map { case e =>
// This AttributeReference does not need to have unique IDs, it's OK to be called
// in executor.
val ref = AttributeReference("ordExpr", e.dataType, e.nullable)()
(SortOrder(ref, e.direction), ref)
}.unzip
Expand Down Expand Up @@ -201,18 +203,17 @@ case class Window(
* This method uses Code Generation. It can only be used on the executor side.
*
* @param expressions unbound ordered function expressions.
* @param attributes output attributes
* @return the final resulting projection.
*/
private[this] def createResultProjection(
expressions: Seq[Expression]): MutableProjection = {
val unboundToAttr = expressions.map {
e => (e, AttributeReference("windowResult", e.dataType, e.nullable)())
}
val unboundToAttrMap = unboundToAttr.toMap
expressions: Seq[Expression],
attributes: Seq[Attribute]): MutableProjection = {
val unboundToAttrMap = expressions.zip(attributes).toMap
val patchedWindowExpression = windowExpression.map(_.transform(unboundToAttrMap))
newMutableProjection(
projectList ++ patchedWindowExpression,
child.output ++ unboundToAttr.map(_._2))()
child.output ++ attributes)()
}

protected override def doExecute(): RDD[InternalRow] = {
Expand Down Expand Up @@ -247,12 +248,17 @@ case class Window(
factories(index) = () => createFrameProcessor(frame, functions, ordinal)
}

// AttributeReference can only be created in driver, or the id will not be unique
val outputAttributes = unboundExpressions.map {
e => AttributeReference("windowResult", e.dataType, e.nullable)()
}

// Start processing.
child.execute().mapPartitions { stream =>
new Iterator[InternalRow] {

// Get all relevant projections.
val result = createResultProjection(unboundExpressions)
val result = createResultProjection(unboundExpressions, outputAttributes)
val grouping = if (child.outputsUnsafeRows) {
UnsafeProjection.create(partitionSpec, child.output)
} else {
Expand Down