Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashSet;
import java.util.Set;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BooleanType;
Expand Down Expand Up @@ -610,8 +611,12 @@ public boolean equals(Object other) {
return (sizeInBytes == o.sizeInBytes) &&
ByteArrayMethods.arrayEquals(baseObject, baseOffset, o.baseObject, o.baseOffset,
sizeInBytes);
} else if (other == null || !(other instanceof InternalRow)) {
return false;
} else {
throw new IllegalArgumentException(
"Cannot compare UnsafeRow to " + other.getClass().getName());
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ case class Window(
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def canProcessUnsafeRows: Boolean = true
override def outputsUnsafeRows: Boolean = false

/**
* Create a bound ordering object for a given frame type and offset. A bound ordering object is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
}

override def canProcessUnsafeRows: Boolean = true
override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
}

/**
Expand Down Expand Up @@ -319,17 +320,19 @@ case class AppendColumns[T, U](
// We are using an unsafe combiner.
override def canProcessSafeRows: Boolean = false
override def canProcessUnsafeRows: Boolean = true
override def outputsUnsafeRows: Boolean = true

override def output: Seq[Attribute] = child.output ++ newColumns

override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { iter =>
val tBoundEncoder = tEncoder.bind(child.output)
val combiner = GenerateUnsafeRowJoiner.create(tEncoder.schema, uEncoder.schema)
iter.map { row =>
val unsafeRows: Iterator[UnsafeRow] = iter.map { row =>
val newColumns = uEncoder.toRow(func(tBoundEncoder.fromRow(row)))
combiner.join(row.asInstanceOf[UnsafeRow], newColumns.asInstanceOf[UnsafeRow]): InternalRow
combiner.join(row.asInstanceOf[UnsafeRow], newColumns.asInstanceOf[UnsafeRow])
}
unsafeRows
}
}
}
Expand Down