From 11f80a348f02f6592f944e313005bcae9e803f05 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 26 Jul 2015 20:23:19 -0700 Subject: [PATCH 1/6] [SPARK-9368][SQL] Support get(ordinal, dataType) generic getter in UnsafeRow. --- .../sql/catalyst/expressions/UnsafeRow.java | 46 ++++++++++++++++++- .../spark/sql/catalyst/InternalRow.scala | 4 +- .../expressions/SpecificMutableRow.scala | 2 +- .../codegen/GenerateProjection.scala | 2 +- .../spark/sql/catalyst/expressions/rows.scala | 4 +- 5 files changed, 51 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 87e5a89c19658..7812e8dc9c32a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -24,7 +24,7 @@ import java.util.HashSet; import java.util.Set; -import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.PlatformDependent; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.bitset.BitSetMethods; @@ -235,6 +235,35 @@ public Object get(int ordinal) { throw new UnsupportedOperationException(); } + @Override + public Object get(int ordinal, DataType dataType) { + if (dataType instanceof NullType) { + return null; + } else if (dataType instanceof BooleanType) { + return getBoolean(ordinal); + } else if (dataType instanceof ByteType) { + return getByte(ordinal); + } else if (dataType instanceof ShortType) { + return getShort(ordinal); + } else if (dataType instanceof IntegerType) { + return getInt(ordinal); + } else if (dataType instanceof LongType) { + return getLong(ordinal); + } else if (dataType instanceof FloatType) { + return getFloat(ordinal); + } else if (dataType instanceof DoubleType) { + return getDouble(ordinal); + } else if (dataType instanceof DecimalType) { + return getDecimal(ordinal); + } else if (dataType instanceof StringType) { + return getUTF8String(ordinal); + } else if (dataType instanceof StructType) { + return getStruct(ordinal, ((StructType) dataType).size()); + } else { + throw new UnsupportedOperationException("Unsupported data type " + dataType.simpleString()); + } + } + @Override public boolean isNullAt(int ordinal) { assertIndexIsValid(ordinal); @@ -436,4 +465,19 @@ public String toString() { public boolean anyNull() { return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes / 8); } + + /** + * Writes the content of this row into a memory address, identified by an object and an offset. + * The target memory address must already been allocated, and have enough space to hold all the + * bytes in this string. + */ + public void writeToMemory(Object target, long targetOffset) { + PlatformDependent.copyMemory( + baseObject, + baseOffset, + target, + targetOffset, + sizeInBytes + ); + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala index 385d9671386dc..ad3977281d1a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala @@ -30,11 +30,11 @@ abstract class InternalRow extends Serializable { def numFields: Int - def get(ordinal: Int): Any + def get(ordinal: Int): Any = get(ordinal, null) def genericGet(ordinal: Int): Any = get(ordinal, null) - def get(ordinal: Int, dataType: DataType): Any = get(ordinal) + def get(ordinal: Int, dataType: DataType): Any def getAs[T](ordinal: Int, dataType: DataType): T = get(ordinal, dataType).asInstanceOf[T] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala index 5953a093dc684..b877ce47c083f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -219,7 +219,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR values(i).isNull = true } - override def get(i: Int): Any = values(i).boxed + override def get(i: Int, dataType: DataType): Any = values(i).boxed override def getStruct(ordinal: Int, numFields: Int): InternalRow = { values(ordinal).boxed.asInstanceOf[InternalRow] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala index a361b216eb472..35920147105ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala @@ -183,7 +183,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { public void setNullAt(int i) { nullBits[i] = true; } public boolean isNullAt(int i) { return nullBits[i]; } - public Object get(int i) { + public Object get(int i, ${classOf[DataType].getName} dataType) { if (isNullAt(i)) return null; switch (i) { $getCases diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index daeabe8e90f1d..b7c4ece4a16fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -99,7 +99,7 @@ class GenericInternalRow(protected[sql] val values: Array[Any]) extends Internal override def numFields: Int = values.length - override def get(i: Int): Any = values(i) + override def get(i: Int, dataType: DataType): Any = values(i) override def getStruct(ordinal: Int, numFields: Int): InternalRow = { values(ordinal).asInstanceOf[InternalRow] @@ -130,7 +130,7 @@ class GenericMutableRow(val values: Array[Any]) extends MutableRow { override def numFields: Int = values.length - override def get(i: Int): Any = values(i) + override def get(i: Int, dataType: DataType): Any = values(i) override def getStruct(ordinal: Int, numFields: Int): InternalRow = { values(ordinal).asInstanceOf[InternalRow] From 9989064b9afc2007e769d520798a6b1da27d3e21 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 26 Jul 2015 20:40:24 -0700 Subject: [PATCH 2/6] JoinedRow. --- .../org/apache/spark/sql/catalyst/expressions/Projection.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index cc89d74146b34..27d6ff587ab71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -198,7 +198,7 @@ class JoinedRow extends InternalRow { if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields) } - override def get(i: Int): Any = + override def get(i: Int, dataType: DataType): Any = if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields) override def isNullAt(i: Int): Boolean = From 24a3e4604acb88c1fba83b577de547662285403a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 26 Jul 2015 21:13:00 -0700 Subject: [PATCH 3/6] Added support for DateType/TimestampType. Updated ExpressionEvalHelper to avoid conversion. --- .../sql/catalyst/expressions/UnsafeRow.java | 4 ++++ .../expressions/ExpressionEvalHelper.scala | 24 ++++++++++--------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 7812e8dc9c32a..f98a53feb4167 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -255,6 +255,10 @@ public Object get(int ordinal, DataType dataType) { return getDouble(ordinal); } else if (dataType instanceof DecimalType) { return getDecimal(ordinal); + } else if (dataType instanceof DateType) { + return getInt(ordinal); + } else if (dataType instanceof TimestampType) { + return getLong(ordinal); } else if (dataType instanceof StringType) { return getUTF8String(ordinal); } else if (dataType instanceof StructType) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 8b0f90cf3a623..eedac664c6f53 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -160,17 +160,20 @@ trait ExpressionEvalHelper { expected: Any, inputRow: InternalRow = EmptyRow): Unit = { - val plan = generateProject( + val project = generateProject( GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil), expression) - - val unsafeRow = plan(inputRow) - // UnsafeRow cannot be compared with GenericInternalRow directly - val actual = FromUnsafeProjection(expression.dataType :: Nil)(unsafeRow) - val expectedRow = InternalRow(expected) - if (actual != expectedRow) { - val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" - fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expectedRow$input") + val out = project(inputRow) + val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" + + if (expected == null) { + if (!out.isNullAt(0)) { + val actual = out.get(0, expression.dataType) + fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input") + } + } else if (out.get(0, expression.dataType) != expected) { + val actual = out.get(0, expression.dataType) + fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input") } } @@ -200,8 +203,7 @@ trait ExpressionEvalHelper { plan = generateProject( GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil), expression) - actual = FromUnsafeProjection(expression.dataType :: Nil)( - plan(inputRow)).get(0, expression.dataType) + actual = plan(inputRow).get(0, expression.dataType) assert(checkResult(actual, expected)) } } From fb6ca303bf7bafe587c24e1022b9d2ab2a4115fc Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 26 Jul 2015 21:14:22 -0700 Subject: [PATCH 4/6] Support BinaryType. --- .../org/apache/spark/sql/catalyst/expressions/UnsafeRow.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index f98a53feb4167..0fb33dd5a15a0 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -259,6 +259,8 @@ public Object get(int ordinal, DataType dataType) { return getInt(ordinal); } else if (dataType instanceof TimestampType) { return getLong(ordinal); + } else if (dataType instanceof BinaryType) { + return getBinary(ordinal); } else if (dataType instanceof StringType) { return getUTF8String(ordinal); } else if (dataType instanceof StructType) { From 0f57c556d15f1c1294e07b44fdd9e41c461d2dfd Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 26 Jul 2015 21:16:16 -0700 Subject: [PATCH 5/6] Reset the changes in ExpressionEvalHelper. --- .../expressions/ExpressionEvalHelper.scala | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index eedac664c6f53..7baa0291477b5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -156,24 +156,21 @@ trait ExpressionEvalHelper { } protected def checkEvalutionWithUnsafeProjection( - expression: Expression, - expected: Any, - inputRow: InternalRow = EmptyRow): Unit = { + expression: Expression, + expected: Any, + inputRow: InternalRow = EmptyRow): Unit = { - val project = generateProject( + val plan = generateProject( GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil), expression) - val out = project(inputRow) - val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" - - if (expected == null) { - if (!out.isNullAt(0)) { - val actual = out.get(0, expression.dataType) - fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input") - } - } else if (out.get(0, expression.dataType) != expected) { - val actual = out.get(0, expression.dataType) - fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input") + + val unsafeRow = plan(inputRow) + // UnsafeRow cannot be compared with GenericInternalRow directly + val actual = FromUnsafeProjection(expression.dataType :: Nil)(unsafeRow) + val expectedRow = InternalRow(expected) + if (actual != expectedRow) { + val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" + fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expectedRow$input") } } @@ -187,9 +184,9 @@ trait ExpressionEvalHelper { } protected def checkDoubleEvaluation( - expression: => Expression, - expected: Spread[Double], - inputRow: InternalRow = EmptyRow): Unit = { + expression: => Expression, + expected: Spread[Double], + inputRow: InternalRow = EmptyRow): Unit = { checkEvaluationWithoutCodegen(expression, expected) checkEvaluationWithGeneratedMutableProjection(expression, expected) checkEvaluationWithOptimization(expression, expected) @@ -203,7 +200,8 @@ trait ExpressionEvalHelper { plan = generateProject( GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil), expression) - actual = plan(inputRow).get(0, expression.dataType) + actual = FromUnsafeProjection(expression.dataType :: Nil)( + plan(inputRow)).get(0, expression.dataType) assert(checkResult(actual, expected)) } } From 3063788608a75c23c7f0c5691934ba3566804849 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 26 Jul 2015 21:18:07 -0700 Subject: [PATCH 6/6] Reset the change for real this time. --- .../catalyst/expressions/ExpressionEvalHelper.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 7baa0291477b5..8b0f90cf3a623 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -156,9 +156,9 @@ trait ExpressionEvalHelper { } protected def checkEvalutionWithUnsafeProjection( - expression: Expression, - expected: Any, - inputRow: InternalRow = EmptyRow): Unit = { + expression: Expression, + expected: Any, + inputRow: InternalRow = EmptyRow): Unit = { val plan = generateProject( GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil), @@ -184,9 +184,9 @@ trait ExpressionEvalHelper { } protected def checkDoubleEvaluation( - expression: => Expression, - expected: Spread[Double], - inputRow: InternalRow = EmptyRow): Unit = { + expression: => Expression, + expected: Spread[Double], + inputRow: InternalRow = EmptyRow): Unit = { checkEvaluationWithoutCodegen(expression, expected) checkEvaluationWithGeneratedMutableProjection(expression, expected) checkEvaluationWithOptimization(expression, expected)