From 1c5797222f97d1a547adca9ac631dab70d40d6b3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 26 Jun 2016 14:10:17 +0900 Subject: [PATCH 01/10] pass information on containsNull in ArrayType --- .../expressions/codegen/GenerateUnsafeProjection.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 7e4c9089a2cb9..cfeef20cce869 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -117,12 +117,12 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro $rowWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor - $tmpCursor); """ - case a @ ArrayType(et, _) => + case a @ ArrayType(et, cn) => s""" // Remember the current cursor so that we can calculate how many bytes are // written later. final int $tmpCursor = $bufferHolder.cursor; - ${writeArrayToBuffer(ctx, input.value, et, bufferHolder)} + ${writeArrayToBuffer(ctx, input.value, et, cn, bufferHolder)} $rowWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor - $tmpCursor); """ @@ -171,6 +171,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro ctx: CodegenContext, input: String, elementType: DataType, + containsNull: Boolean, bufferHolder: String): String = { val arrayWriterClass = classOf[UnsafeArrayWriter].getName val arrayWriter = ctx.freshName("arrayWriter") @@ -202,7 +203,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro $arrayWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor - $tmpCursor); """ - case a @ ArrayType(et, _) => + case a @ ArrayType(et, cn) => s""" final int $tmpCursor = $bufferHolder.cursor; ${writeArrayToBuffer(ctx, element, et, bufferHolder)} @@ -275,7 +276,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro // Write the numBytes of key array into the first 8 bytes. Platform.putLong($bufferHolder.buffer, $tmpCursor - 8, $bufferHolder.cursor - $tmpCursor); - ${writeArrayToBuffer(ctx, values, valueType, bufferHolder)} + ${writeArrayToBuffer(ctx, values, valueType, true, bufferHolder)} } """ } From 3d78991b55e388065da98bafbf435cb8c6c61a78 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 26 Jun 2016 14:54:15 +0900 Subject: [PATCH 02/10] generate helper call instead of for-loop --- .../codegen/UnsafeArrayWriter.java | 106 ++++++++++++++++++ .../codegen/GenerateUnsafeProjection.scala | 35 ++++-- 2 files changed, 132 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java index afea4676893ed..e989fe8933809 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen; +import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.types.Decimal; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; @@ -259,4 +260,109 @@ public void write(int ordinal, CalendarInterval input) { // move the cursor forward. holder.cursor += 16; } + + public void writePrimiviveBooleanArray(ArrayData arrayData) { + // uncomment this if SPARK-16043 is merged + // boolean[] intput = ((GenericBooleanArrayData)arrayData).primitiveArray(); + boolean[] input = arrayData.toBooleanArray(); + int length = input.length; + Platform.copyMemory(input, Platform.INT_ARRAY_OFFSET, + holder.buffer, holder.cursor, length); + // remove the followings if SPARK-15962 is merged + for (int ordinal = 0; ordinal < input.length; ordinal++) { + final int relativeOffset = (holder.cursor + ordinal) - startingOffset; + Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); + } + holder.cursor += length; + } + + public void writePrimitiveByteArray(ArrayData arrayData) { + // uncomment this if SPARK-16043 is merged + // byte[] intput = ((GenericByteArrayData)arrayData).primitiveArray(); + byte[] input = arrayData.toByteArray(); + int length = input.length; + Platform.copyMemory(input, Platform.INT_ARRAY_OFFSET, + holder.buffer, holder.cursor, length); + // remove the followings if SPARK-15962 is merged + for (int ordinal = 0; ordinal < input.length; ordinal++) { + final int relativeOffset = (holder.cursor + ordinal) - startingOffset; + Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); + } + holder.cursor += length; + } + + public void writePrimitiveShortArray(ArrayData arrayData) { + // uncomment this if SPARK-16043 is merged + // short[] input = ((GenericShortArrayData)arrayData).primitiveArray(); + short[] input = arrayData.toShortArray(); + int length = input.length * 2; + Platform.copyMemory(input, Platform.INT_ARRAY_OFFSET, + holder.buffer, holder.cursor, length); + // remove the followings if SPARK-15962 is merged + for (int ordinal = 0; ordinal < input.length; ordinal++) { + final int relativeOffset = (holder.cursor + ordinal * 2) - startingOffset; + Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); + } + holder.cursor += length; + } + + public void writePrimitiveIntArray(ArrayData arrayData) { + // uncomment this if SPARK-16043 is merged + // int[] input = ((GenericIntArrayData)arrayData).primitiveArray(); + int[] input = arrayData.toIntArray(); + int length = input.length * 4; + Platform.copyMemory(input, Platform.INT_ARRAY_OFFSET, + holder.buffer, holder.cursor, length); + // remove the followings if SPARK-15962 is merged + for (int ordinal = 0; ordinal < input.length; ordinal++) { + final int relativeOffset = (holder.cursor + ordinal * 4) - startingOffset; + Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); + } + holder.cursor += length; + } + + public void writePrimitiveLongArray(ArrayData arrayData) { + // uncomment this if SPARK-16043 is merged + // long[] input = ((GenericLongArrayData)arrayData).primitiveArray(); + long[] input = arrayData.toLongArray(); + int length = input.length * 8; + Platform.copyMemory(input, Platform.LONG_ARRAY_OFFSET, + holder.buffer, holder.cursor, length); + // remove the followings if SPARK-15962 is merged + for (int ordinal = 0; ordinal < input.length; ordinal++) { + final int relativeOffset = (holder.cursor + ordinal * 8) - startingOffset; + Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); + } + holder.cursor += length; + } + + public void writePrimitiveFloatArray(ArrayData arrayData) { + // uncomment this if SPARK-16043 is merged + // float[] input = ((GenericFloatArrayData)arrayData).primitiveArray(); + float[] input = arrayData.toFloatArray(); + int length = input.length * 4; + Platform.copyMemory(input, Platform.FLOAT_ARRAY_OFFSET, + holder.buffer, holder.cursor, length); + // remove the followings if SPARK-15962 is merged + for (int ordinal = 0; ordinal < input.length; ordinal++) { + final int relativeOffset = (holder.cursor + ordinal * 4) - startingOffset; + Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); + } + holder.cursor += length; + } + + public void writePrimitiveDoubleArray(ArrayData arrayData) { + // uncomment this if SPARK-16043 is merged + // double[] input = ((GenericDoubleArrayData)arrayData).primitiveArray(); + double[] input = arrayData.toDoubleArray(); + int length = input.length * 8; + Platform.copyMemory(input, Platform.DOUBLE_ARRAY_OFFSET, + holder.buffer, holder.cursor, length); + // remove the followings if SPARK-15962 is merged + for (int ordinal = 0; ordinal < input.length; ordinal++) { + final int relativeOffset = (holder.cursor + ordinal * 8) - startingOffset; + Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); + } + holder.cursor += length; + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index cfeef20cce869..f0f3729d685e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -225,7 +225,31 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro case _ => s"$arrayWriter.write($index, $element);" } - val primitiveTypeName = if (ctx.isPrimitiveType(jt)) ctx.primitiveTypeName(et) else "" + val storeElements = if (containsNull) { + s""" + for (int $index = 0; $index < $numElements; $index++) { + if ($input.isNullAt($index)) { + $arrayWriter.setNullAt($index); + } else { + final $jt $element = ${ctx.getValue(input, et, index)}; + $writeElement + } + } + """ + } else { + if (ctx.isPrimitiveType(et)) { + val typeName = ctx.primitiveTypeName(et) + s"$arrayWriter.writePrimitive${typeName}Array($input);" + } else { + s""" + for (int $index = 0; $index < $numElements; $index++) { + final $jt $element = ${ctx.getValue(input, et, index)}; + $writeElement + } + """ + } + } + s""" if ($input instanceof UnsafeArrayData) { ${writeUnsafeData(ctx, s"((UnsafeArrayData) $input)", bufferHolder)} @@ -233,14 +257,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro final int $numElements = $input.numElements(); $arrayWriter.initialize($bufferHolder, $numElements, $elementOrOffsetSize); - for (int $index = 0; $index < $numElements; $index++) { - if ($input.isNullAt($index)) { - $arrayWriter.setNull$primitiveTypeName($index); - } else { - final $jt $element = ${ctx.getValue(input, et, index)}; - $writeElement - } - } + $storeElements } """ } From 90d716f6dafe431ef5cc98592dddbdf0f38defcf Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 26 Jun 2016 14:54:37 +0900 Subject: [PATCH 03/10] add test suite --- .../spark/sql/DataFrameComplexTypeSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala index 1230b921aa279..a5c201949748d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala @@ -27,6 +27,25 @@ import org.apache.spark.sql.test.SharedSQLContext class DataFrameComplexTypeSuite extends QueryTest with SharedSQLContext { import testImplicits._ + test("primitive type on array") { + val rows = sparkContext.parallelize(Seq(1, 2), 1).toDF("v"). + selectExpr("Array(v + 2, v + 3)") + checkAnswer(rows, Seq(Row(Array(3, 4)), Row(Array(4, 5)))) + } + + test("primitive type and null on array") { + val rows = sparkContext.parallelize(Seq(1, 2), 1).toDF("v"). + selectExpr("Array(v + 2, null, v + 3)") + checkAnswer(rows, Seq(Row(Array(3, null, 4)), Row(Array(4, null, 5)))) + } + + test("array with null on array") { + val rows = sparkContext.parallelize(Seq(1, 2), 1).toDF("v"). + selectExpr("Array(Array(v, v + 1)," + + "null," + + "Array(v, v - 1))").collect + } + test("UDF on struct") { val f = udf((a: String) => a) val df = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b") From 0b50242d99529f10e2f150a05d8dfef8987ab3af Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 26 Jun 2016 17:28:33 +0900 Subject: [PATCH 04/10] fix test failure --- .../sql/catalyst/expressions/codegen/UnsafeArrayWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java index e989fe8933809..7d99cea70e5e7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java @@ -261,7 +261,7 @@ public void write(int ordinal, CalendarInterval input) { holder.cursor += 16; } - public void writePrimiviveBooleanArray(ArrayData arrayData) { + public void writePrimitiveBooleanArray(ArrayData arrayData) { // uncomment this if SPARK-16043 is merged // boolean[] intput = ((GenericBooleanArrayData)arrayData).primitiveArray(); boolean[] input = arrayData.toBooleanArray(); From 4b132de5f0f1fff0f7da670fb8097232c3f65b83 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 2 Jul 2016 15:35:26 +0900 Subject: [PATCH 05/10] update --- .../codegen/UnsafeArrayWriter.java | 93 +++++++------------ 1 file changed, 32 insertions(+), 61 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java index 7d99cea70e5e7..3f9230c25c8a5 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java @@ -261,19 +261,26 @@ public void write(int ordinal, CalendarInterval input) { holder.cursor += 16; } + private void writePrimitiveArray(Object input, int offset, int elementSize, int length) { + // uncomment this if SPARK-16043 is merged + // Platform.copyMemory(input, offset, holder.buffer, startingOffset + headerInBytes, + // elementSize * length); + Platform.copyMemory(input, offset, holder.buffer, holder.cursor, elementSize * length); + // remove the followings if SPARK-15962 is merged + for (int ordinal = 0; ordinal < length; ordinal++) { + Platform.putInt(holder.buffer, getElementOffset(ordinal), + (holder.cursor + ordinal * elementSize) - startingOffset); + } + holder.cursor += elementSize * length; + } + public void writePrimitiveBooleanArray(ArrayData arrayData) { // uncomment this if SPARK-16043 is merged // boolean[] intput = ((GenericBooleanArrayData)arrayData).primitiveArray(); boolean[] input = arrayData.toBooleanArray(); int length = input.length; - Platform.copyMemory(input, Platform.INT_ARRAY_OFFSET, - holder.buffer, holder.cursor, length); - // remove the followings if SPARK-15962 is merged - for (int ordinal = 0; ordinal < input.length; ordinal++) { - final int relativeOffset = (holder.cursor + ordinal) - startingOffset; - Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); - } - holder.cursor += length; + int offset = Platform.BYTE_ARRAY_OFFSET; + writePrimitiveArray(input, offset, 1, length); } public void writePrimitiveByteArray(ArrayData arrayData) { @@ -281,88 +288,52 @@ public void writePrimitiveByteArray(ArrayData arrayData) { // byte[] intput = ((GenericByteArrayData)arrayData).primitiveArray(); byte[] input = arrayData.toByteArray(); int length = input.length; - Platform.copyMemory(input, Platform.INT_ARRAY_OFFSET, - holder.buffer, holder.cursor, length); - // remove the followings if SPARK-15962 is merged - for (int ordinal = 0; ordinal < input.length; ordinal++) { - final int relativeOffset = (holder.cursor + ordinal) - startingOffset; - Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); - } - holder.cursor += length; + int offset = Platform.BYTE_ARRAY_OFFSET; + writePrimitiveArray(input, offset, 1, length); } public void writePrimitiveShortArray(ArrayData arrayData) { // uncomment this if SPARK-16043 is merged // short[] input = ((GenericShortArrayData)arrayData).primitiveArray(); short[] input = arrayData.toShortArray(); - int length = input.length * 2; - Platform.copyMemory(input, Platform.INT_ARRAY_OFFSET, - holder.buffer, holder.cursor, length); - // remove the followings if SPARK-15962 is merged - for (int ordinal = 0; ordinal < input.length; ordinal++) { - final int relativeOffset = (holder.cursor + ordinal * 2) - startingOffset; - Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); - } - holder.cursor += length; + int length = input.length; + int offset = Platform.SHORT_ARRAY_OFFSET; + writePrimitiveArray(input, offset, 2, length); } public void writePrimitiveIntArray(ArrayData arrayData) { // uncomment this if SPARK-16043 is merged // int[] input = ((GenericIntArrayData)arrayData).primitiveArray(); int[] input = arrayData.toIntArray(); - int length = input.length * 4; - Platform.copyMemory(input, Platform.INT_ARRAY_OFFSET, - holder.buffer, holder.cursor, length); - // remove the followings if SPARK-15962 is merged - for (int ordinal = 0; ordinal < input.length; ordinal++) { - final int relativeOffset = (holder.cursor + ordinal * 4) - startingOffset; - Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); - } - holder.cursor += length; + int length = input.length; + int offset = Platform.INT_ARRAY_OFFSET; + writePrimitiveArray(input, offset, 4, length); } public void writePrimitiveLongArray(ArrayData arrayData) { // uncomment this if SPARK-16043 is merged // long[] input = ((GenericLongArrayData)arrayData).primitiveArray(); long[] input = arrayData.toLongArray(); - int length = input.length * 8; - Platform.copyMemory(input, Platform.LONG_ARRAY_OFFSET, - holder.buffer, holder.cursor, length); - // remove the followings if SPARK-15962 is merged - for (int ordinal = 0; ordinal < input.length; ordinal++) { - final int relativeOffset = (holder.cursor + ordinal * 8) - startingOffset; - Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); - } - holder.cursor += length; + int length = input.length; + int offset = Platform.LONG_ARRAY_OFFSET; + writePrimitiveArray(input, offset, 8, length); } public void writePrimitiveFloatArray(ArrayData arrayData) { // uncomment this if SPARK-16043 is merged // float[] input = ((GenericFloatArrayData)arrayData).primitiveArray(); float[] input = arrayData.toFloatArray(); - int length = input.length * 4; - Platform.copyMemory(input, Platform.FLOAT_ARRAY_OFFSET, - holder.buffer, holder.cursor, length); - // remove the followings if SPARK-15962 is merged - for (int ordinal = 0; ordinal < input.length; ordinal++) { - final int relativeOffset = (holder.cursor + ordinal * 4) - startingOffset; - Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); - } - holder.cursor += length; + int length = input.length; + int offset = Platform.FLOAT_ARRAY_OFFSET; + writePrimitiveArray(input, offset, 4, length); } public void writePrimitiveDoubleArray(ArrayData arrayData) { // uncomment this if SPARK-16043 is merged // double[] input = ((GenericDoubleArrayData)arrayData).primitiveArray(); double[] input = arrayData.toDoubleArray(); - int length = input.length * 8; - Platform.copyMemory(input, Platform.DOUBLE_ARRAY_OFFSET, - holder.buffer, holder.cursor, length); - // remove the followings if SPARK-15962 is merged - for (int ordinal = 0; ordinal < input.length; ordinal++) { - final int relativeOffset = (holder.cursor + ordinal * 8) - startingOffset; - Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); - } - holder.cursor += length; + int length = input.length; + int offset = Platform.DOUBLE_ARRAY_OFFSET; + writePrimitiveArray(input, offset, 8, length); } } From 249a1a9b0c9fa837917214cbf8f120dd9ea15d28 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 2 Jul 2016 15:35:53 +0900 Subject: [PATCH 06/10] add test suites --- .../codegen/BufferHolderSuite.scala | 200 +++++++++++++++++- 1 file changed, 199 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala index c7c386b5b838a..99d5470c37c00 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeRow} +import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.unsafe.Platform class BufferHolderSuite extends SparkFunSuite { @@ -36,4 +38,200 @@ class BufferHolderSuite extends SparkFunSuite { } assert(e.getMessage.contains("exceeds size limitation")) } + + def performUnsafeArrayWriter(length: Int, elementSize: Int, f: (UnsafeArrayWriter) => Unit): + UnsafeArrayData = { + val unsafeRow = new UnsafeRow(1) + val unsafeArrayWriter = new UnsafeArrayWriter + val bufferHolder = new BufferHolder(unsafeRow, 32) + bufferHolder.reset() + val cursor = bufferHolder.cursor + unsafeArrayWriter.initialize(bufferHolder, length, elementSize) + // execute UnsafeArrayWriter.foo() in f() + f(unsafeArrayWriter) + + val unsafeArray = new UnsafeArrayData + unsafeArray.pointTo(bufferHolder.buffer, cursor.toLong, bufferHolder.cursor - cursor) + assert(unsafeArray.numElements() == length) + unsafeArray + } + + def initializeUnsafeArrayData(data: Seq[Any], elementSize: Int): + UnsafeArrayData = { + val length = data.length + val unsafeArray = new UnsafeArrayData + // uncomment this if SPARK-15962 is merged + // val headerSize = calculateHeaderPortionInBytes(length) + val headerSize = 4 + 4 * length + val size = headerSize + elementSize * length + val buffer = new Array[Byte](size) + Platform.putInt(buffer, Platform.BYTE_ARRAY_OFFSET, length) + unsafeArray.pointTo(buffer, Platform.BYTE_ARRAY_OFFSET, size) + // remove this if SPARK-15962 is merged + (0 until length).map(ordinal => + Platform.putInt(buffer, Platform.BYTE_ARRAY_OFFSET + 4 + ordinal * 4, + headerSize + ordinal * elementSize) + ) + assert(unsafeArray.numElements == length) + data.zipWithIndex.map { case (e, i) => + val offset = Platform.BYTE_ARRAY_OFFSET + headerSize + elementSize * i + e match { + case _ : Boolean => Platform.putBoolean(buffer, offset, e.asInstanceOf[Boolean]) + case _ : Byte => Platform.putByte(buffer, offset, e.asInstanceOf[Byte]) + case _ : Short => Platform.putShort(buffer, offset, e.asInstanceOf[Short]) + case _ : Int => Platform.putInt(buffer, offset, e.asInstanceOf[Int]) + case _ : Long => Platform.putLong(buffer, offset, e.asInstanceOf[Long]) + case _ : Float => Platform.putFloat(buffer, offset, e.asInstanceOf[Float]) + case _ : Double => Platform.putDouble(buffer, offset, e.asInstanceOf[Double]) + case _ => throw new UnsupportedOperationException() + } + } + unsafeArray + } + + val booleanData = Seq(true, false) + val byteData = Seq(0.toByte, 1.toByte, Byte.MaxValue, Byte.MinValue) + val shortData = Seq(0.toShort, 1.toShort, Short.MaxValue, Short.MinValue) + val intData = Seq(0, 1, -1, Int.MaxValue, Int.MinValue) + val longData = Seq(0.toLong, 1.toLong, -1.toLong, Long.MaxValue, Long.MinValue) + val floatData = Seq(0.toFloat, 1.1.toFloat, -1.1.toFloat, Float.MaxValue, Float.MinValue) + val doubleData = Seq(0.toDouble, 1.1.toDouble, -1.1.toDouble, Double.MaxValue, Double.MinValue) + + test("UnsafeArrayDataWriter write") { + val boolUnsafeArray = performUnsafeArrayWriter(booleanData.length, 1, + (writer: UnsafeArrayWriter) => booleanData.zipWithIndex.map { + case (e, i) => writer.write(i, e) }) + booleanData.zipWithIndex.map { case (e, i) => assert(boolUnsafeArray.getBoolean(i) == e) } + + val byteUnsafeArray = performUnsafeArrayWriter(byteData.length, 1, + (writer: UnsafeArrayWriter) => byteData.zipWithIndex.map { + case (e, i) => writer.write(i, e) }) + byteData.zipWithIndex.map { case (e, i) => assert(byteUnsafeArray.getByte(i) == e) } + + val shortUnsafeArray = performUnsafeArrayWriter(shortData.length, 2, + (writer: UnsafeArrayWriter) => shortData.zipWithIndex.map { + case (e, i) => writer.write(i, e) }) + shortData.zipWithIndex.map { case (e, i) => assert(shortUnsafeArray.getShort(i) == e) } + + val intUnsafeArray = performUnsafeArrayWriter(intData.length, 4, + (writer: UnsafeArrayWriter) => intData.zipWithIndex.map { + case (e, i) => writer.write(i, e) }) + intData.zipWithIndex.map { case (e, i) => assert(intUnsafeArray.getInt(i) == e) } + + val longUnsafeArray = performUnsafeArrayWriter(longData.length, 8, + (writer: UnsafeArrayWriter) => longData.zipWithIndex.map { + case (e, i) => writer.write(i, e) }) + longData.zipWithIndex.map { case (e, i) => assert(longUnsafeArray.getLong(i) == e) } + + val floatUnsafeArray = performUnsafeArrayWriter(floatData.length, 8, + (writer: UnsafeArrayWriter) => floatData.zipWithIndex.map { + case (e, i) => writer.write(i, e) }) + floatData.zipWithIndex.map { case (e, i) => assert(floatUnsafeArray.getFloat(i) == e) } + + val doubleUnsafeArray = performUnsafeArrayWriter(doubleData.length, 8, + (writer: UnsafeArrayWriter) => doubleData.zipWithIndex.map { + case (e, i) => writer.write(i, e) }) + doubleData.zipWithIndex.map { case (e, i) => assert(doubleUnsafeArray.getDouble(i) == e) } + } + + test("toPrimitiveArray") { + val booleanUnsafeArray = initializeUnsafeArrayData(booleanData, 1) + booleanUnsafeArray.toBooleanArray(). + zipWithIndex.map { case (e, i) => assert(e == booleanData(i)) } + + val byteUnsafeArray = initializeUnsafeArrayData(byteData, 1) + byteUnsafeArray.toByteArray().zipWithIndex.map { case (e, i) => assert(e == byteData(i)) } + + val shortUnsafeArray = initializeUnsafeArrayData(shortData, 2) + shortUnsafeArray.toShortArray().zipWithIndex.map { case (e, i) => assert(e == shortData(i)) } + + val intUnsafeArray = initializeUnsafeArrayData(intData, 4) + intUnsafeArray.toIntArray().zipWithIndex.map { case (e, i) => assert(e == intData(i)) } + + val longUnsafeArray = initializeUnsafeArrayData(longData, 8) + longUnsafeArray.toLongArray().zipWithIndex.map { case (e, i) => assert(e == longData(i)) } + + val floatUnsafeArray = initializeUnsafeArrayData(floatData, 4) + floatUnsafeArray.toFloatArray().zipWithIndex.map { case (e, i) => assert(e == floatData(i)) } + + val doubleUnsafeArray = initializeUnsafeArrayData(doubleData, 8) + doubleUnsafeArray.toDoubleArray(). + zipWithIndex.map { case (e, i) => assert(e == doubleData(i)) } + } + + test("fromPrimitiveArray") { + // uncomment this if SPARK-15962 is merged +/* + val booleanArray = booleanData.toArray + val booleanUnsafeArray = UnsafeArrayData.fromPrimitiveArray(booleanArray) + booleanArray.zipWithIndex.map { case (e, i) => assert(booleanUnsafeArray.getBoolean(i) == e) } + + val byteArray = byteData.toArray + val byteUnsafeArray = UnsafeArrayData.fromPrimitiveArray(byteArray) + byteArray.zipWithIndex.map { case (e, i) => assert(byteUnsafeArray.getByte(i) == e) } + + val shortArray = shortData.toArray + val shortUnsafeArray = UnsafeArrayData.fromPrimitiveArray(shortArray) + shortArray.zipWithIndex.map { case (e, i) => assert(shortUnsafeArray.getShort(i) == e) } +*/ + val intArray = intData.toArray + val intUnsafeArray = UnsafeArrayData.fromPrimitiveArray(intArray) + intArray.zipWithIndex.map { case (e, i) => assert(intUnsafeArray.getInt(i) == e) } + +/* + val longArray = longData.toArray + val longUnsafeArray = UnsafeArrayData.fromPrimitiveArray(longArray) + longArray.zipWithIndex.map { case (e, i) => assert(longUnsafeArray.getLong(i) == e) } + + val floatArray = floatData.toArray + val floatUnsafeArray = UnsafeArrayData.fromPrimitiveArray(floatArray) + floatArray.zipWithIndex.map { case (e, i) => assert(floatUnsafeArray.getFloat(i) == e) } +*/ + val doubleArray = doubleData.toArray + val doubleUnsafeArray = UnsafeArrayData.fromPrimitiveArray(doubleArray) + doubleArray.zipWithIndex.map { case (e, i) => assert(doubleUnsafeArray.getDouble(i) == e) } + } + + test("writePrimitiveArray") { + val booleanArray = booleanData.toArray + val booleanUnsafeArray = performUnsafeArrayWriter(booleanArray.length, 4, + (writer: UnsafeArrayWriter) => + writer.writePrimitiveBooleanArray(new GenericArrayData(booleanArray))) + booleanArray.zipWithIndex.map { case (e, i) => assert(booleanUnsafeArray.getBoolean(i) == e) } + + val byteArray = byteData.toArray + val byteUnsafeArray = performUnsafeArrayWriter(byteArray.length, 4, + (writer: UnsafeArrayWriter) => + writer.writePrimitiveByteArray(new GenericArrayData(byteArray))) + byteArray.zipWithIndex.map { case (e, i) => assert(byteUnsafeArray.getByte(i) == e) } + + val shortArray = shortData.toArray + val shortUnsafeArray = performUnsafeArrayWriter(shortArray.length, 4, + (writer: UnsafeArrayWriter) => + writer.writePrimitiveShortArray(new GenericArrayData(shortArray))) + shortArray.zipWithIndex.map { case (e, i) => assert(shortUnsafeArray.getShort(i) == e) } + + val intArray = intData.toArray + val intUnsafeArray = performUnsafeArrayWriter(intArray.length, 4, + (writer: UnsafeArrayWriter) => writer.writePrimitiveIntArray(new GenericArrayData(intArray))) + intArray.zipWithIndex.map { case (e, i) => assert(intUnsafeArray.getInt(i) == e) } + + val longArray = longData.toArray + val longUnsafeArray = performUnsafeArrayWriter(longArray.length, 8, + (writer: UnsafeArrayWriter) => + writer.writePrimitiveLongArray(new GenericArrayData(longArray))) + longArray.zipWithIndex.map { case (e, i) => assert(longUnsafeArray.getLong(i) == e) } + + val floatArray = floatData.toArray + val floatUnsafeArray = performUnsafeArrayWriter(floatArray.length, 4, + (writer: UnsafeArrayWriter) => + writer.writePrimitiveFloatArray(new GenericArrayData(floatArray))) + floatArray.zipWithIndex.map { case (e, i) => assert(floatUnsafeArray.getFloat(i) == e) } + + val doubleArray = doubleData.toArray + val doubleUnsafeArray = performUnsafeArrayWriter(doubleArray.length, 8, + (writer: UnsafeArrayWriter) => + writer.writePrimitiveDoubleArray(new GenericArrayData(doubleArray))) + doubleArray.zipWithIndex.map { case (e, i) => assert(doubleUnsafeArray.getDouble(i) == e) } + } } From 96b0853d2876ba3aa3cc1db1c23df55e226deb49 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 14 Jul 2016 20:00:29 +0900 Subject: [PATCH 07/10] fix potential bug since writePrimitiveTypeArray() may receive GenericRefArrayData --- .../codegen/UnsafeArrayWriter.java | 135 +++++++++++++++--- 1 file changed, 117 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java index 3f9230c25c8a5..b37a5c45ac9ed 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java @@ -262,11 +262,7 @@ public void write(int ordinal, CalendarInterval input) { } private void writePrimitiveArray(Object input, int offset, int elementSize, int length) { - // uncomment this if SPARK-16043 is merged - // Platform.copyMemory(input, offset, holder.buffer, startingOffset + headerInBytes, - // elementSize * length); Platform.copyMemory(input, offset, holder.buffer, holder.cursor, elementSize * length); - // remove the followings if SPARK-15962 is merged for (int ordinal = 0; ordinal < length; ordinal++) { Platform.putInt(holder.buffer, getElementOffset(ordinal), (holder.cursor + ordinal * elementSize) - startingOffset); @@ -275,8 +271,6 @@ private void writePrimitiveArray(Object input, int offset, int elementSize, int } public void writePrimitiveBooleanArray(ArrayData arrayData) { - // uncomment this if SPARK-16043 is merged - // boolean[] intput = ((GenericBooleanArrayData)arrayData).primitiveArray(); boolean[] input = arrayData.toBooleanArray(); int length = input.length; int offset = Platform.BYTE_ARRAY_OFFSET; @@ -284,8 +278,6 @@ public void writePrimitiveBooleanArray(ArrayData arrayData) { } public void writePrimitiveByteArray(ArrayData arrayData) { - // uncomment this if SPARK-16043 is merged - // byte[] intput = ((GenericByteArrayData)arrayData).primitiveArray(); byte[] input = arrayData.toByteArray(); int length = input.length; int offset = Platform.BYTE_ARRAY_OFFSET; @@ -293,8 +285,6 @@ public void writePrimitiveByteArray(ArrayData arrayData) { } public void writePrimitiveShortArray(ArrayData arrayData) { - // uncomment this if SPARK-16043 is merged - // short[] input = ((GenericShortArrayData)arrayData).primitiveArray(); short[] input = arrayData.toShortArray(); int length = input.length; int offset = Platform.SHORT_ARRAY_OFFSET; @@ -302,8 +292,6 @@ public void writePrimitiveShortArray(ArrayData arrayData) { } public void writePrimitiveIntArray(ArrayData arrayData) { - // uncomment this if SPARK-16043 is merged - // int[] input = ((GenericIntArrayData)arrayData).primitiveArray(); int[] input = arrayData.toIntArray(); int length = input.length; int offset = Platform.INT_ARRAY_OFFSET; @@ -311,8 +299,6 @@ public void writePrimitiveIntArray(ArrayData arrayData) { } public void writePrimitiveLongArray(ArrayData arrayData) { - // uncomment this if SPARK-16043 is merged - // long[] input = ((GenericLongArrayData)arrayData).primitiveArray(); long[] input = arrayData.toLongArray(); int length = input.length; int offset = Platform.LONG_ARRAY_OFFSET; @@ -320,8 +306,6 @@ public void writePrimitiveLongArray(ArrayData arrayData) { } public void writePrimitiveFloatArray(ArrayData arrayData) { - // uncomment this if SPARK-16043 is merged - // float[] input = ((GenericFloatArrayData)arrayData).primitiveArray(); float[] input = arrayData.toFloatArray(); int length = input.length; int offset = Platform.FLOAT_ARRAY_OFFSET; @@ -329,11 +313,126 @@ public void writePrimitiveFloatArray(ArrayData arrayData) { } public void writePrimitiveDoubleArray(ArrayData arrayData) { - // uncomment this if SPARK-16043 is merged - // double[] input = ((GenericDoubleArrayData)arrayData).primitiveArray(); double[] input = arrayData.toDoubleArray(); int length = input.length; int offset = Platform.DOUBLE_ARRAY_OFFSET; writePrimitiveArray(input, offset, 8, length); } + +/** uncomment this if SPARK-16043 is merged + + // remove this method if SPARK-15962 is merged + private void updateIndex(int elementSize, int length) { + for (int ordinal = 0; ordinal < length; ordinal++) { + Platform.putInt(holder.buffer, getElementOffset(ordinal), + (holder.cursor + ordinal * elementSize) - startingOffset); + } + holder.cursor += elementSize * length; + } + + public void writePrimitiveBooleanArray(ArrayData arrayData) { + if (arrayData instanceof GenericBooleanArrayData) { + boolean[] input = ((GenericBooleanArrayData)arrayData).primitiveArray(); + int length = input.length; + Platform.copyMemory(input, Platform.BOOLEAN_ARRAY_OFFSET, + holder.buffer, startingOffset + headerInBytes, length); + } else { + int length = arrayData.numElements(); + for (int i = 0; i < length; i++) { + Platform.putBoolean(holder.buffer, holder.cursor + i, arrayData.getBoolean(i)); + } + updateIndex(1, length); + } + } + + public void writePrimitiveByteArray(ArrayData arrayData) { + if (arrayData instanceof GenericByteArrayData) { + byte[] input = ((GenericByteArrayData)arrayData).primitiveArray(); + int length = input.length; + Platform.copyMemory(input, Platform.BYTE_ARRAY_OFFSET, + holder.buffer, startingOffset + headerInBytes, length); + } else { + int length = arrayData.numElements(); + for (int i = 0; i < length; i++) { + Platform.putByte(holder.buffer, holder.cursor + i, arrayData.getByte(i)); + } + updateIndex(1, length); + } + } + + public void writePrimitiveShortArray(ArrayData arrayData) { + if (arrayData instanceof GenericShortArrayData) { + short[] input = ((GenericShortArrayData)arrayData).primitiveArray(); + int length = input.length; + Platform.copyMemory(input, Platform.SHORT_ARRAY_OFFSET, + holder.buffer, startingOffset + headerInBytes, length); + } else { + int length = arrayData.numElements(); + for (int i = 0; i < length; i++) { + Platform.putShort(holder.buffer, holder.cursor + i, arrayData.getShort(i)); + } + updateIndex(2, length); + } + } + + public void writePrimitiveIntArray(ArrayData arrayData) { + if (arrayData instanceof GenericIntArrayData) { + int[] input = ((GenericIntArrayData)arrayData).primitiveArray(); + int length = input.length; + Platform.copyMemory(input, Platform.INT_ARRAY_OFFSET, + holder.buffer, startingOffset + headerInBytes, length); + } else { + int length = arrayData.numElements(); + for (int i = 0; i < length; i++) { + Platform.putInt(holder.buffer, holder.cursor + i, arrayData.getInt(i)); + } + updateIndex(4, length); + } + } + + public void writePrimitiveLongArray(ArrayData arrayData) { + if (arrayData instanceof GenericLongArrayData) { + long[] input = ((GenericLongArrayData)arrayData).primitiveArray(); + int length = input.length; + Platform.copyMemory(input, Platform.LONG_ARRAY_OFFSET, + holder.buffer, startingOffset + headerInBytes, length); + } else { + int length = arrayData.numElements(); + for (int i = 0; i < length; i++) { + Platform.putLong(holder.buffer, holder.cursor + i, arrayData.getLong(i)); + } + updateIndex(8, length); + } + } + + public void writePrimitiveFloatArray(ArrayData arrayData) { + if (arrayData instanceof GenericFloatArrayData) { + float[] input = ((GenericFloatArrayData)arrayData).primitiveArray(); + int length = input.length; + Platform.copyMemory(input, Platform.FLOAT_ARRAY_OFFSET, + holder.buffer, startingOffset + headerInBytes, length); + } else { + int length = arrayData.numElements(); + for (int i = 0; i < length; i++) { + Platform.putFloat(holder.buffer, holder.cursor + i, arrayData.getFloat(i)); + } + updateIndex(4, length); + } + } + + public void writePrimitiveDoubleArray(ArrayData arrayData) { + if (arrayData instanceof GenericDoubleArrayData) { + double[] input = ((GenericDoubleArrayData)arrayData).primitiveArray(); + int length = input.length; + Platform.copyMemory(input, Platform.DOUBLE_ARRAY_OFFSET, + holder.buffer, startingOffset + headerInBytes, length); + } else { + int length = arrayData.numElements(); + for (int i = 0; i < length; i++) { + Platform.putFloat(holder.buffer, holder.cursor + i, arrayData.getFloat(i)); + } + updateIndex(8, length); + } + } +*/ } From 88aad46c0c067ac5660d3aa7a3d8ed80a1be7556 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 3 Nov 2016 18:37:30 +0900 Subject: [PATCH 08/10] rebase --- .../codegen/UnsafeArrayWriter.java | 20 ------------------- .../codegen/GenerateUnsafeProjection.scala | 4 ++-- 2 files changed, 2 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java index b37a5c45ac9ed..2922e27f10c74 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java @@ -263,10 +263,6 @@ public void write(int ordinal, CalendarInterval input) { private void writePrimitiveArray(Object input, int offset, int elementSize, int length) { Platform.copyMemory(input, offset, holder.buffer, holder.cursor, elementSize * length); - for (int ordinal = 0; ordinal < length; ordinal++) { - Platform.putInt(holder.buffer, getElementOffset(ordinal), - (holder.cursor + ordinal * elementSize) - startingOffset); - } holder.cursor += elementSize * length; } @@ -321,15 +317,6 @@ public void writePrimitiveDoubleArray(ArrayData arrayData) { /** uncomment this if SPARK-16043 is merged - // remove this method if SPARK-15962 is merged - private void updateIndex(int elementSize, int length) { - for (int ordinal = 0; ordinal < length; ordinal++) { - Platform.putInt(holder.buffer, getElementOffset(ordinal), - (holder.cursor + ordinal * elementSize) - startingOffset); - } - holder.cursor += elementSize * length; - } - public void writePrimitiveBooleanArray(ArrayData arrayData) { if (arrayData instanceof GenericBooleanArrayData) { boolean[] input = ((GenericBooleanArrayData)arrayData).primitiveArray(); @@ -341,7 +328,6 @@ public void writePrimitiveBooleanArray(ArrayData arrayData) { for (int i = 0; i < length; i++) { Platform.putBoolean(holder.buffer, holder.cursor + i, arrayData.getBoolean(i)); } - updateIndex(1, length); } } @@ -356,7 +342,6 @@ public void writePrimitiveByteArray(ArrayData arrayData) { for (int i = 0; i < length; i++) { Platform.putByte(holder.buffer, holder.cursor + i, arrayData.getByte(i)); } - updateIndex(1, length); } } @@ -371,7 +356,6 @@ public void writePrimitiveShortArray(ArrayData arrayData) { for (int i = 0; i < length; i++) { Platform.putShort(holder.buffer, holder.cursor + i, arrayData.getShort(i)); } - updateIndex(2, length); } } @@ -386,7 +370,6 @@ public void writePrimitiveIntArray(ArrayData arrayData) { for (int i = 0; i < length; i++) { Platform.putInt(holder.buffer, holder.cursor + i, arrayData.getInt(i)); } - updateIndex(4, length); } } @@ -401,7 +384,6 @@ public void writePrimitiveLongArray(ArrayData arrayData) { for (int i = 0; i < length; i++) { Platform.putLong(holder.buffer, holder.cursor + i, arrayData.getLong(i)); } - updateIndex(8, length); } } @@ -416,7 +398,6 @@ public void writePrimitiveFloatArray(ArrayData arrayData) { for (int i = 0; i < length; i++) { Platform.putFloat(holder.buffer, holder.cursor + i, arrayData.getFloat(i)); } - updateIndex(4, length); } } @@ -431,7 +412,6 @@ public void writePrimitiveDoubleArray(ArrayData arrayData) { for (int i = 0; i < length; i++) { Platform.putFloat(holder.buffer, holder.cursor + i, arrayData.getFloat(i)); } - updateIndex(8, length); } } */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index f0f3729d685e1..7dfa28746abcb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -206,7 +206,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro case a @ ArrayType(et, cn) => s""" final int $tmpCursor = $bufferHolder.cursor; - ${writeArrayToBuffer(ctx, element, et, bufferHolder)} + ${writeArrayToBuffer(ctx, element, et, cn, bufferHolder)} $arrayWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor - $tmpCursor); """ @@ -289,7 +289,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro // Remember the current cursor so that we can write numBytes of key array later. final int $tmpCursor = $bufferHolder.cursor; - ${writeArrayToBuffer(ctx, keys, keyType, bufferHolder)} + ${writeArrayToBuffer(ctx, keys, keyType, false, bufferHolder)} // Write the numBytes of key array into the first 8 bytes. Platform.putLong($bufferHolder.buffer, $tmpCursor - 8, $bufferHolder.cursor - $tmpCursor); From 25ddd8e0c47a841d01b6ff15a6c1a34a58b93aca Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 4 Nov 2016 02:14:34 +0900 Subject: [PATCH 09/10] fix test failures --- .../sql/catalyst/expressions/codegen/UnsafeArrayWriter.java | 2 +- .../expressions/codegen/GenerateUnsafeProjection.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java index 2922e27f10c74..699617c6c46f2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java @@ -262,7 +262,7 @@ public void write(int ordinal, CalendarInterval input) { } private void writePrimitiveArray(Object input, int offset, int elementSize, int length) { - Platform.copyMemory(input, offset, holder.buffer, holder.cursor, elementSize * length); + Platform.copyMemory(input, offset, holder.buffer, startingOffset + headerInBytes, elementSize * length); holder.cursor += elementSize * length; } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 7dfa28746abcb..aa0a052f45d8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -225,11 +225,12 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro case _ => s"$arrayWriter.write($index, $element);" } + val typeName = if (ctx.isPrimitiveType(jt)) ctx.primitiveTypeName(et) else "" val storeElements = if (containsNull) { s""" for (int $index = 0; $index < $numElements; $index++) { if ($input.isNullAt($index)) { - $arrayWriter.setNullAt($index); + $arrayWriter.setNull${typeName}($index); } else { final $jt $element = ${ctx.getValue(input, et, index)}; $writeElement @@ -237,8 +238,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro } """ } else { - if (ctx.isPrimitiveType(et)) { - val typeName = ctx.primitiveTypeName(et) + if (ctx.isPrimitiveType(jt)) { s"$arrayWriter.writePrimitive${typeName}Array($input);" } else { s""" From 62042987039a4e9a9a35056dcc525bad80d2e0d2 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 5 Nov 2016 00:02:39 +0900 Subject: [PATCH 10/10] fix test failures --- .../expressions/codegen/UnsafeArrayWriter.java | 1 - .../expressions/codegen/BufferHolderSuite.scala | 16 +++------------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java index 699617c6c46f2..3af08617a6784 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java @@ -263,7 +263,6 @@ public void write(int ordinal, CalendarInterval input) { private void writePrimitiveArray(Object input, int offset, int elementSize, int length) { Platform.copyMemory(input, offset, holder.buffer, startingOffset + headerInBytes, elementSize * length); - holder.cursor += elementSize * length; } public void writePrimitiveBooleanArray(ArrayData arrayData) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala index 99d5470c37c00..e4a968e2eec8d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala @@ -60,18 +60,11 @@ class BufferHolderSuite extends SparkFunSuite { UnsafeArrayData = { val length = data.length val unsafeArray = new UnsafeArrayData - // uncomment this if SPARK-15962 is merged - // val headerSize = calculateHeaderPortionInBytes(length) - val headerSize = 4 + 4 * length + val headerSize = UnsafeArrayData.calculateHeaderPortionInBytes(length) val size = headerSize + elementSize * length val buffer = new Array[Byte](size) Platform.putInt(buffer, Platform.BYTE_ARRAY_OFFSET, length) unsafeArray.pointTo(buffer, Platform.BYTE_ARRAY_OFFSET, size) - // remove this if SPARK-15962 is merged - (0 until length).map(ordinal => - Platform.putInt(buffer, Platform.BYTE_ARRAY_OFFSET + 4 + ordinal * 4, - headerSize + ordinal * elementSize) - ) assert(unsafeArray.numElements == length) data.zipWithIndex.map { case (e, i) => val offset = Platform.BYTE_ARRAY_OFFSET + headerSize + elementSize * i @@ -160,8 +153,6 @@ class BufferHolderSuite extends SparkFunSuite { } test("fromPrimitiveArray") { - // uncomment this if SPARK-15962 is merged -/* val booleanArray = booleanData.toArray val booleanUnsafeArray = UnsafeArrayData.fromPrimitiveArray(booleanArray) booleanArray.zipWithIndex.map { case (e, i) => assert(booleanUnsafeArray.getBoolean(i) == e) } @@ -173,12 +164,11 @@ class BufferHolderSuite extends SparkFunSuite { val shortArray = shortData.toArray val shortUnsafeArray = UnsafeArrayData.fromPrimitiveArray(shortArray) shortArray.zipWithIndex.map { case (e, i) => assert(shortUnsafeArray.getShort(i) == e) } -*/ + val intArray = intData.toArray val intUnsafeArray = UnsafeArrayData.fromPrimitiveArray(intArray) intArray.zipWithIndex.map { case (e, i) => assert(intUnsafeArray.getInt(i) == e) } -/* val longArray = longData.toArray val longUnsafeArray = UnsafeArrayData.fromPrimitiveArray(longArray) longArray.zipWithIndex.map { case (e, i) => assert(longUnsafeArray.getLong(i) == e) } @@ -186,7 +176,7 @@ class BufferHolderSuite extends SparkFunSuite { val floatArray = floatData.toArray val floatUnsafeArray = UnsafeArrayData.fromPrimitiveArray(floatArray) floatArray.zipWithIndex.map { case (e, i) => assert(floatUnsafeArray.getFloat(i) == e) } -*/ + val doubleArray = doubleData.toArray val doubleUnsafeArray = UnsafeArrayData.fromPrimitiveArray(doubleArray) doubleArray.zipWithIndex.map { case (e, i) => assert(doubleUnsafeArray.getDouble(i) == e) }