From d09a81e79cf4febc35b42fb0ff57d772f70b8e49 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 21 Sep 2016 12:56:25 -0700 Subject: [PATCH 1/3] Override outputsUnsafeRows when overriding canProcessUnsafeRows --- .../main/scala/org/apache/spark/sql/execution/Window.scala | 1 + .../org/apache/spark/sql/execution/basicOperators.scala | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index b1280c32a6a43..b3f1580c2784a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -96,6 +96,7 @@ case class Window( override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def canProcessUnsafeRows: Boolean = true + override def outputsUnsafeRows: Boolean = false /** * Create a bound ordering object for a given frame type and offset. A bound ordering object is diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a42aea0b96d43..58d5669c9c19d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -251,6 +251,7 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode { } override def canProcessUnsafeRows: Boolean = true + override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows } /** @@ -319,6 +320,7 @@ case class AppendColumns[T, U]( // We are using an unsafe combiner. override def canProcessSafeRows: Boolean = false override def canProcessUnsafeRows: Boolean = true + override def outputsUnsafeRows: Boolean = true override def output: Seq[Attribute] = child.output ++ newColumns @@ -326,10 +328,11 @@ case class AppendColumns[T, U]( child.execute().mapPartitionsInternal { iter => val tBoundEncoder = tEncoder.bind(child.output) val combiner = GenerateUnsafeRowJoiner.create(tEncoder.schema, uEncoder.schema) - iter.map { row => + val unsafeRows: Iterator[UnsafeRow] = iter.map { row => val newColumns = uEncoder.toRow(func(tBoundEncoder.fromRow(row))) - combiner.join(row.asInstanceOf[UnsafeRow], newColumns.asInstanceOf[UnsafeRow]): InternalRow + combiner.join(row.asInstanceOf[UnsafeRow], newColumns.asInstanceOf[UnsafeRow]) } + unsafeRows } } } From 9d4cf441a8d7d0af8eba3d473e996e0b39fd5560 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 21 Sep 2016 12:57:40 -0700 Subject: [PATCH 2/3] Throw IllegalArgumentException when performing illegal UnsafeRow equals comparison. --- .../apache/spark/sql/catalyst/expressions/UnsafeRow.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index af687eaeb8ab4..3d4f1474e5777 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -610,8 +610,12 @@ public boolean equals(Object other) { return (sizeInBytes == o.sizeInBytes) && ByteArrayMethods.arrayEquals(baseObject, baseOffset, o.baseObject, o.baseOffset, sizeInBytes); + } else if (other == null) { + return false; + } else { + throw new IllegalArgumentException( + "Cannot compare UnsafeRow to " + other.getClass().getName()); } - return false; } /** From 1319e8281ab3ec14a5ba11fca0261d19b7890ad3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 21 Sep 2016 13:57:17 -0700 Subject: [PATCH 3/3] Ignore non-InternalRow comparisons. --- .../org/apache/spark/sql/catalyst/expressions/UnsafeRow.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 3d4f1474e5777..5555b54684af1 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.Set; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.BinaryType; import org.apache.spark.sql.types.BooleanType; @@ -610,7 +611,7 @@ public boolean equals(Object other) { return (sizeInBytes == o.sizeInBytes) && ByteArrayMethods.arrayEquals(baseObject, baseOffset, o.baseObject, o.baseOffset, sizeInBytes); - } else if (other == null) { + } else if (other == null || !(other instanceof InternalRow)) { return false; } else { throw new IllegalArgumentException(