From 2db2bb1fe97ace657bf8ef036a358af3fbaf4a62 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 29 Oct 2017 04:28:06 +0100 Subject: [PATCH 01/12] initial commit --- .../execution/columnar/ColumnAccessor.scala | 3 + .../columnar/InMemoryTableScanExec.scala | 2 + .../columnar/NullableColumnAccessor.scala | 12 + .../spark/sql/DataFrameTungstenSuite.scala | 6 +- .../execution/WholeStageCodegenSuite.scala | 14 + .../vectorized/ColumnVectorSuite.scala | 345 +++++++++++++++++- .../vectorized/ColumnarBatchSuite.scala | 19 +- 7 files changed, 388 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala index 85c36b7da9498..05e23c3f219b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala @@ -156,6 +156,9 @@ private[sql] object ColumnAccessor { if (columnAccessor.isInstanceOf[NativeColumnAccessor[_]]) { val nativeAccessor = columnAccessor.asInstanceOf[NativeColumnAccessor[_]] nativeAccessor.decompress(columnVector, numRows) + } else if (columnAccessor.isInstanceOf[ArrayColumnAccessor]) { + val arrayAccessor = columnAccessor.asInstanceOf[ArrayColumnAccessor] + arrayAccessor.extract(columnVector, numRows) } else { throw new RuntimeException("Not support non-primitive type now") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 3e73393b12850..4921bbd9bcc00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -56,6 +56,8 @@ case class InMemoryTableScanExec( relation.schema.fields.forall(f => f.dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType => true + case ArrayType(dt, _) if (dt == BooleanType || dt == ByteType || dt == ShortType || + dt == IntegerType || dt == LongType || dt == FloatType || dt == DoubleType) => true case _ => false }) && !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala index 2f09757aa341c..6d90b74aae2b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.columnar import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized.WritableColumnVector private[columnar] trait NullableColumnAccessor extends ColumnAccessor { private var nullsBuffer: ByteBuffer = _ @@ -56,4 +57,15 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor { } abstract override def hasNext: Boolean = seenNulls < nullCount || super.hasNext + + def extract(columnVector: WritableColumnVector, capacity: Int): Unit = { + if (nextNullIndex != -1) { + columnVector.putNull(nextNullIndex) + for (_ <- 1 until nullCount) { + val ordinal = ByteBufferHelper.getInt(nullsBuffer) + columnVector.putNull(ordinal) + } + } + columnVector.putUnsafeArrayData(underlyingBuffer) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala index 0881212a64de8..7e8f5c9071390 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala @@ -74,11 +74,13 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext { assert(df.select("b").first() === Row(outerStruct)) } + // checkAnswer(sparkContext.parallelize(Seq[java.lang.Integer](0, null, 2), 1).toDF, + test("primitive data type accesses in persist data") { val data = Seq(true, 1.toByte, 3.toShort, 7, 15.toLong, - 31.25.toFloat, 63.75, null) + 31.25.toFloat, 63.75, null, Array(1.2, 2.3), Array[java.lang.Double](1.2, null)) val dataTypes = Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, IntegerType) + FloatType, DoubleType, IntegerType, ArrayType(DoubleType, false), ArrayType(DoubleType, true)) val schemas = dataTypes.zipWithIndex.map { case (dataType, index) => StructField(s"col$index", dataType, true) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index bc05dca578c47..7e6d552561459 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -135,6 +135,20 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { ) assert(dsIntFilter.collect() === Array(1, 2)) + val dsIntArray = sparkContext.parallelize(Seq(Array(1, 2), Array(-1, -2)), 1).toDS.cache + dsIntArray.count + val dsIntArrayFilter = dsIntArray.filter(a => a(0) > 0) + val planIntArray = dsIntArrayFilter.queryExecution.executedPlan + assert(planIntArray.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec] && + p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child + .isInstanceOf[InMemoryTableScanExec] && + p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child + .asInstanceOf[InMemoryTableScanExec].supportCodegen).isDefined + ) + assert(dsIntArrayFilter.collect() === Array(Array(1, 2))) + // cache for string type is not supported for InMemoryTableScanExec val dsString = spark.range(3).map(_.toString).cache dsString.count diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 3c76ca79f5dda..1d4e12666de9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.vectorized import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, SpecificInternalRow, UnsafeArrayData} import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.execution.columnar.ColumnAccessor import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper @@ -396,5 +396,348 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } } } + + test("CachedBatch boolean array Apis") { + val N = 16 + val dataType = ArrayType(BooleanType, false) + val columnBuilder = ColumnBuilderHelper(dataType, 4096, "col", true) + val row = new GenericInternalRow(N) + val data = new Array[Array[Boolean]](N) + val nulls = Seq(0, 6, 11) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + row.setNullAt(0) + } else { + data(i) = Array.tabulate(i)(i => i % 2 == 0) + row.update(0, UnsafeArrayData.fromPrimitiveArray(data(i))) + } + columnBuilder.appendFrom(row, 0) + } + + withVectors(N, dataType) { testVector => + val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) + ColumnAccessor.decompress(columnAccessor, testVector, 16) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + assert(testVector.isNullAt(i) == true) + } else { + assert(testVector.isNullAt(i) == false) + assert(testVector.getArray(i).toBooleanArray() === data(i)) + } + } + for (i <- 0 to N / 3) { + if (nulls.contains(i * 3)) { + assert(testVector.isNullAt(i * 3) == true) + } else { + assert(testVector.isNullAt(i * 3) == false) + assert(testVector.getArray(i * 3).toBooleanArray() === data(i * 3)) + } + } + for (i <- 1 to N / 3) { + if (nulls.contains(N - i * 3)) { + assert(testVector.isNullAt(N - i * 3) == true) + } else { + assert(testVector.isNullAt(N - i * 3) == false) + assert(testVector.getArray(N - i * 3).toBooleanArray() === data(N - i * 3)) + } + } + } + } + + test("CachedBatch byte array Apis") { + val N = 16 + val dataType = ArrayType(ByteType, false) + val columnBuilder = ColumnBuilderHelper(dataType, 4096, "col", true) + val row = new GenericInternalRow(N) + val data = new Array[Array[Byte]](N) + val nulls = Seq(0, 6, 11) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + row.setNullAt(0) + } else { + data(i) = Array.tabulate(i)(i => i.toByte) + row.update(0, UnsafeArrayData.fromPrimitiveArray(data(i))) + } + columnBuilder.appendFrom(row, 0) + } + + withVectors(N, dataType) { testVector => + val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) + ColumnAccessor.decompress(columnAccessor, testVector, 16) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + assert(testVector.isNullAt(i) == true) + } else { + assert(testVector.isNullAt(i) == false) + assert(testVector.getArray(i).toByteArray() === data(i)) + } + } + for (i <- 0 to N / 3) { + if (nulls.contains(i * 3)) { + assert(testVector.isNullAt(i * 3) == true) + } else { + assert(testVector.isNullAt(i * 3) == false) + assert(testVector.getArray(i * 3).toByteArray() === data(i * 3)) + } + } + for (i <- 1 to N / 3) { + if (nulls.contains(N - i * 3)) { + assert(testVector.isNullAt(N - i * 3) == true) + } else { + assert(testVector.isNullAt(N - i * 3) == false) + assert(testVector.getArray(N - i * 3).toByteArray() === data(N - i * 3)) + } + } + } + } + + test("CachedBatch short array Apis") { + val N = 16 + val dataType = ArrayType(ShortType, false) + val columnBuilder = ColumnBuilderHelper(dataType, 4096, "col", true) + val row = new GenericInternalRow(N) + val data = new Array[Array[Short]](N) + val nulls = Seq(0, 6, 11) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + row.setNullAt(0) + } else { + data(i) = Array.tabulate(i)(i => i.toShort) + row.update(0, UnsafeArrayData.fromPrimitiveArray(data(i))) + } + columnBuilder.appendFrom(row, 0) + } + + withVectors(N, dataType) { testVector => + val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) + ColumnAccessor.decompress(columnAccessor, testVector, 16) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + assert(testVector.isNullAt(i) == true) + } else { + assert(testVector.isNullAt(i) == false) + assert(testVector.getArray(i).toShortArray() === data(i)) + } + } + for (i <- 0 to N / 3) { + if (nulls.contains(i * 3)) { + assert(testVector.isNullAt(i * 3) == true) + } else { + assert(testVector.isNullAt(i * 3) == false) + assert(testVector.getArray(i * 3).toShortArray() === data(i * 3)) + } + } + for (i <- 1 to N / 3) { + if (nulls.contains(N - i * 3)) { + assert(testVector.isNullAt(N - i * 3) == true) + } else { + assert(testVector.isNullAt(N - i * 3) == false) + assert(testVector.getArray(N - i * 3).toShortArray() === data(N - i * 3)) + } + } + } + } + + test("CachedBatch int array Apis") { + val N = 16 + val dataType = ArrayType(IntegerType, false) + val columnBuilder = ColumnBuilderHelper(dataType, 4096, "col", true) + val row = new GenericInternalRow(N) + val data = new Array[Array[Int]](N) + val nulls = Seq(0, 6, 11) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + row.setNullAt(0) + } else { + data(i) = Array.range(0, i) + row.update(0, UnsafeArrayData.fromPrimitiveArray(data(i))) + } + columnBuilder.appendFrom(row, 0) + } + + withVectors(N, dataType) { testVector => + val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) + ColumnAccessor.decompress(columnAccessor, testVector, 16) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + assert(testVector.isNullAt(i) == true) + } else { + assert(testVector.isNullAt(i) == false) + assert(testVector.getArray(i).toIntArray() === data(i)) + } + } + for (i <- 0 to N / 3) { + if (nulls.contains(i * 3)) { + assert(testVector.isNullAt(i * 3) == true) + } else { + assert(testVector.isNullAt(i * 3) == false) + assert(testVector.getArray(i * 3).toIntArray() === data(i * 3)) + } + } + for (i <- 1 to N / 3) { + if (nulls.contains(N - i * 3)) { + assert(testVector.isNullAt(N - i * 3) == true) + } else { + assert(testVector.isNullAt(N - i * 3) == false) + assert(testVector.getArray(N - i * 3).toIntArray() === data(N - i * 3)) + } + } + } + } + + test("CachedBatch long array Apis") { + val N = 16 + val dataType = ArrayType(LongType, false) + val columnBuilder = ColumnBuilderHelper(dataType, 4096, "col", true) + val row = new GenericInternalRow(N) + val data = new Array[Array[Long]](N) + val nulls = Seq(0, 6, 11) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + row.setNullAt(0) + } else { + data(i) = Array.tabulate(i)(i => i.toLong) + row.update(0, UnsafeArrayData.fromPrimitiveArray(data(i))) + } + columnBuilder.appendFrom(row, 0) + } + + withVectors(N, dataType) { testVector => + val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) + ColumnAccessor.decompress(columnAccessor, testVector, 16) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + assert(testVector.isNullAt(i) == true) + } else { + assert(testVector.isNullAt(i) == false) + assert(testVector.getArray(i).toLongArray() === data(i)) + } + } + for (i <- 0 to N / 3) { + if (nulls.contains(i * 3)) { + assert(testVector.isNullAt(i * 3) == true) + } else { + assert(testVector.isNullAt(i * 3) == false) + assert(testVector.getArray(i * 3).toLongArray() === data(i * 3)) + } + } + for (i <- 1 to N / 3) { + if (nulls.contains(N - i * 3)) { + assert(testVector.isNullAt(N - i * 3) == true) + } else { + assert(testVector.isNullAt(N - i * 3) == false) + assert(testVector.getArray(N - i * 3).toLongArray() === data(N - i * 3)) + } + } + } + } + + test("CachedBatch float array Apis") { + val N = 16 + val dataType = ArrayType(FloatType, false) + val columnBuilder = ColumnBuilderHelper(dataType, 4096, "col", true) + val row = new GenericInternalRow(N) + val data = new Array[Array[Float]](N) + val nulls = Seq(0, 6, 11) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + row.setNullAt(0) + } else { + data(i) = Array.tabulate(i)(i => i.toFloat) + row.update(0, UnsafeArrayData.fromPrimitiveArray(data(i))) + } + columnBuilder.appendFrom(row, 0) + } + + withVectors(N, dataType) { testVector => + val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) + ColumnAccessor.decompress(columnAccessor, testVector, 16) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + assert(testVector.isNullAt(i) == true) + } else { + assert(testVector.isNullAt(i) == false) + assert(testVector.getArray(i).toFloatArray() === data(i)) + } + } + for (i <- 0 to N / 3) { + if (nulls.contains(i * 3)) { + assert(testVector.isNullAt(i * 3) == true) + } else { + assert(testVector.isNullAt(i * 3) == false) + assert(testVector.getArray(i * 3).toFloatArray() === data(i * 3)) + } + } + for (i <- 1 to N / 3) { + if (nulls.contains(N - i * 3)) { + assert(testVector.isNullAt(N - i * 3) == true) + } else { + assert(testVector.isNullAt(N - i * 3) == false) + assert(testVector.getArray(N - i * 3).toFloatArray() === data(N - i * 3)) + } + } + } + } + + test("CachedBatch double array Apis") { + val N = 16 + val dataType = ArrayType(DoubleType, false) + val columnBuilder = ColumnBuilderHelper(dataType, 4096, "col", true) + val row = new GenericInternalRow(N) + val data = new Array[Array[Double]](N) + val nulls = Seq(0, 6, 11) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + row.setNullAt(0) + } else { + data(i) = Array.tabulate(i)(i => i.toDouble) + row.update(0, UnsafeArrayData.fromPrimitiveArray(data(i))) + } + columnBuilder.appendFrom(row, 0) + } + + withVectors(N, dataType) { testVector => + val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) + ColumnAccessor.decompress(columnAccessor, testVector, 16) + + for (i <- 0 until N) { + if (nulls.contains(i)) { + assert(testVector.isNullAt(i) == true) + } else { + assert(testVector.isNullAt(i) == false) + assert(testVector.getArray(i).toDoubleArray() === data(i)) + } + } + for (i <- 0 to N / 3) { + if (nulls.contains(i * 3)) { + assert(testVector.isNullAt(i * 3) == true) + } else { + assert(testVector.isNullAt(i * 3) == false) + assert(testVector.getArray(i * 3).toDoubleArray() === data(i * 3)) + } + } + for (i <- 1 to N / 3) { + if (nulls.contains(N - i * 3)) { + assert(testVector.isNullAt(N - i * 3) == true) + } else { + assert(testVector.isNullAt(N - i * 3) == false) + assert(testVector.getArray(N - i * 3).toDoubleArray() === data(N - i * 3)) + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 80a50866aa504..8f9c9776b51e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -645,26 +645,26 @@ class ColumnarBatchSuite extends SparkFunSuite { column.putArray(2, 2, 0) column.putArray(3, 3, 3) - val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] - val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]] - val a3 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(2)).asInstanceOf[Array[Int]] - val a4 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(3)).asInstanceOf[Array[Int]] + val a1 = column.getArray(0).toIntArray() + val a2 = column.getArray(1).toIntArray() + val a3 = column.getArray(2).toIntArray() + val a4 = column.getArray(3).toIntArray() assert(a1 === Array(0)) assert(a2 === Array(1, 2)) assert(a3 === Array.empty[Int]) assert(a4 === Array(3, 4, 5)) // Verify the ArrayData APIs - assert(column.getArray(0).length == 1) + assert(column.getArray(0).numElements == 1) assert(column.getArray(0).getInt(0) == 0) - assert(column.getArray(1).length == 2) + assert(column.getArray(1).numElements == 2) assert(column.getArray(1).getInt(0) == 1) assert(column.getArray(1).getInt(1) == 2) - assert(column.getArray(2).length == 0) + assert(column.getArray(2).numElements == 0) - assert(column.getArray(3).length == 3) + assert(column.getArray(3).numElements == 3) assert(column.getArray(3).getInt(0) == 3) assert(column.getArray(3).getInt(1) == 4) assert(column.getArray(3).getInt(2) == 5) @@ -677,8 +677,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(data.capacity == array.length * 2) data.putInts(0, array.length, array, 0) column.putArray(0, 0, array.length) - assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] - === array) + assert(column.getArray(0).toIntArray === array) } test("toArray for primitive types") { From 642110ec18bed6b700577454d9f42ca491aca063 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 30 Oct 2017 10:48:46 +0100 Subject: [PATCH 02/12] add UnsafeColumnVector to support array for table cache --- .../vectorized/ColumnVectorUtils.java | 22 - .../vectorized/OffHeapColumnVector.java | 5 + .../vectorized/OnHeapColumnVector.java | 5 + .../vectorized/UnsafeColumnVector.java | 534 ++++++++++++++++++ .../columnar/InMemoryTableScanExec.scala | 24 +- .../columnar/NullableColumnAccessor.scala | 2 +- .../spark/sql/DataFrameTungstenSuite.scala | 9 +- .../vectorized/ColumnVectorSuite.scala | 42 +- 8 files changed, 610 insertions(+), 33 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index b4b5f0a265934..051d8e38a9805 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -93,28 +93,6 @@ public static void populate(WritableColumnVector col, InternalRow row, int field } } - /** - * Returns the array data as the java primitive array. - * For example, an array of IntegerType will return an int[]. - * Throws exceptions for unhandled schemas. - */ - public static Object toPrimitiveJavaArray(ColumnarArray array) { - DataType dt = array.data.dataType(); - if (dt instanceof IntegerType) { - int[] result = new int[array.length]; - ColumnVector data = array.data; - for (int i = 0; i < result.length; i++) { - if (data.isNullAt(array.offset + i)) { - throw new RuntimeException("Cannot handle NULL values."); - } - result[i] = data.getInt(array.offset + i); - } - return result; - } else { - throw new UnsupportedOperationException(); - } - } - private static void appendValue(WritableColumnVector dst, DataType t, Object o) { if (o == null) { if (t instanceof CalendarIntervalType) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 1cbaf08569334..230c341e3c801 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -97,6 +97,11 @@ public void close() { offsetData = 0; } + @Override + public void putUnsafeData(ByteBuffer array) { + throw new RuntimeException("Cannot put UnsafeData for off-heap column"); + } + // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 85d72295ab9b8..65c0b16ff65b7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -94,6 +94,11 @@ public void close() { arrayOffsets = null; } + @Override + public void putUnsafeData(ByteBuffer array) { + throw new RuntimeException("Cannot put UnsafeData for on-heap column"); + } + // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java new file mode 100644 index 0000000000000..90e91b6438e20 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.nio.ByteBuffer; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.Platform; + +/** + * A column backed by UnsafeArrayData on byte[]. + */ +public final class UnsafeColumnVector extends WritableColumnVector { + // This is faster than a boolean array and we optimize this over memory footprint. + private byte[] nulls; + + // Array stored in byte array + private byte[] data; + private long offset; + + // Only set if type is Array. + private int lastArrayRow; + private int lastArrayPos; + private UnsafeArrayData unsafeArray = new UnsafeArrayData(); + + public UnsafeColumnVector(int capacity, DataType type) { + super(capacity, type); + + reserveInternal(capacity); + reset(); + nulls = new byte[capacity]; + } + + @Override + public void putUnsafeData(ByteBuffer buffer) { + assert(this.resultArray != null); + data = buffer.array(); + offset = Platform.BYTE_ARRAY_OFFSET + buffer.position(); + + lastArrayRow = Integer.MAX_VALUE; + lastArrayPos = 0; + + setIsConstant(); + } + + @Override + public long valuesNativeAddress() { + throw new RuntimeException("Cannot get native address for on heap column"); + } + @Override + public long nullsNativeAddress() { + throw new RuntimeException("Cannot get native address for on heap column"); + } + + @Override + public void close() { + } + + // + // APIs dealing with nulls + // + + @Override + public void putNotNull(int rowId) { + nulls[rowId] = (byte)0; + } + + @Override + public void putNull(int rowId) { + nulls[rowId] = (byte)1; + ++numNulls; + anyNullsSet = true; + } + + @Override + public void putNulls(int rowId, int count) { + for (int i = 0; i < count; ++i) { + nulls[rowId + i] = (byte)1; + } + anyNullsSet = true; + numNulls += count; + } + + @Override + public void putNotNulls(int rowId, int count) { + if (!anyNullsSet) return; + for (int i = 0; i < count; ++i) { + nulls[rowId + i] = (byte)0; + } + } + + @Override + public boolean isNullAt(int rowId) { + if (nulls == null) return false; + // If @@@ existins in @@@@, data is null. + if (data != null) { + return nulls[rowId] == 1; + } else { + return unsafeArray.isNullAt(rowId); + } + } + + // + // APIs dealing with Booleans + // + + @Override + public void putBoolean(int rowId, boolean value) { + throw new NotImplementedException(); + } + + @Override + public void putBooleans(int rowId, int count, boolean value) { + throw new NotImplementedException(); + } + + @Override + public boolean getBoolean(int rowId) { + assert(dictionary == null); + return unsafeArray.getBoolean(rowId); + } + + @Override + public boolean[] getBooleans(int rowId, int count) { + assert(dictionary == null); + boolean[] array = unsafeArray.toBooleanArray(); + if (array.length == count) { + return array; + } else { + assert(count < array.length); + boolean[] newArray = new boolean[count]; + System.arraycopy(array, 0, newArray, 0, count); + return newArray; + } + } + + // + // APIs dealing with Bytes + // + + @Override + public void putByte(int rowId, byte value) { + throw new NotImplementedException(); + } + + @Override + public void putBytes(int rowId, int count, byte value) { + throw new NotImplementedException(); + } + + @Override + public void putBytes(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public byte getByte(int rowId) { + assert(dictionary == null); + return unsafeArray.getByte(rowId); + } + + @Override + public byte[] getBytes(int rowId, int count) { + assert(dictionary == null); + byte[] array = unsafeArray.toByteArray(); + if (array.length == count) { + return array; + } else { + assert(count < array.length); + byte[] newArray = new byte[count]; + System.arraycopy(array, 0, newArray, 0, count); + return newArray; + } + } + + // + // APIs dealing with Shorts + // + + @Override + public void putShort(int rowId, short value) { + throw new NotImplementedException(); + } + + @Override + public void putShorts(int rowId, int count, short value) { + throw new NotImplementedException(); + } + + @Override + public void putShorts(int rowId, int count, short[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putShorts(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public short getShort(int rowId) { + assert(dictionary == null); + return unsafeArray.getShort(rowId); + } + + @Override + public short[] getShorts(int rowId, int count) { + assert(dictionary == null); + short[] array = unsafeArray.toShortArray(); + if (array.length == count) { + return array; + } else { + assert(count < array.length); + short[] newArray = new short[count]; + System.arraycopy(array, 0, newArray, 0, count); + return newArray; + } + } + + // + // APIs dealing with Ints + // + + @Override + public void putInt(int rowId, int value) { + throw new NotImplementedException(); + } + + @Override + public void putInts(int rowId, int count, int value) { + throw new NotImplementedException(); + } + + @Override + public void putInts(int rowId, int count, int[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putInts(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public int getInt(int rowId) { + assert(dictionary == null); + return unsafeArray.getInt(rowId); + } + + @Override + public int[] getInts(int rowId, int count) { + assert(dictionary == null); + int[] array = unsafeArray.toIntArray(); + if (array.length == count) { + return array; + } else { + assert(count < array.length); + int[] newArray = new int[count]; + System.arraycopy(array, 0, newArray, 0, count); + return newArray; + } + } + + public int getDictId(int rowId) { + throw new NotImplementedException(); + } + + // + // APIs dealing with Longs + // + + @Override + public void putLong(int rowId, long value) { + throw new NotImplementedException(); + } + + @Override + public void putLongs(int rowId, int count, long value) { + throw new NotImplementedException(); + } + + @Override + public void putLongs(int rowId, int count, long[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putLongs(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public long getLong(int rowId) { + assert(dictionary == null); + return unsafeArray.getLong(rowId); + } + + @Override + public long[] getLongs(int rowId, int count) { + assert(dictionary == null); + long[] array = unsafeArray.toLongArray(); + if (array.length == count) { + return array; + } else { + assert(count < array.length); + long[] newArray = new long[count]; + System.arraycopy(array, 0, newArray, 0, count); + return newArray; + } + } + + // + // APIs dealing with floats + // + + @Override + public void putFloat(int rowId, float value) { + throw new NotImplementedException(); + } + + @Override + public void putFloats(int rowId, int count, float value) { + throw new NotImplementedException(); + } + + @Override + public void putFloats(int rowId, int count, float[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putFloats(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public float getFloat(int rowId) { + assert(dictionary == null); + return unsafeArray.getFloat(rowId); + } + + @Override + public float[] getFloats(int rowId, int count) { + assert(dictionary == null); + float[] array = unsafeArray.toFloatArray(); + if (array.length == count) { + return array; + } else { + assert(count < array.length); + float[] newArray = new float[count]; + System.arraycopy(array, 0, newArray, 0, count); + return newArray; + } + } + + // + // APIs dealing with doubles + // + + @Override + public void putDouble(int rowId, double value) { + throw new NotImplementedException(); + } + + @Override + public void putDoubles(int rowId, int count, double value) { + throw new NotImplementedException(); + } + + @Override + public void putDoubles(int rowId, int count, double[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { + throw new NotImplementedException(); + } + + @Override + public double getDouble(int rowId) { + assert(dictionary == null); + return unsafeArray.getDouble(rowId); + } + + @Override + public double[] getDoubles(int rowId, int count) { + assert(dictionary == null); + double[] array = unsafeArray.toDoubleArray(); + if (array.length == count) { + return array; + } else { + assert(count < array.length); + double[] newArray = new double[count]; + System.arraycopy(array, 0, newArray, 0, count); + return newArray; + } + } + + // + // APIs dealing with Arrays + // + + private void updateLastArrayPos(int rowId) { + int relative = rowId - lastArrayRow; + if (relative == 1) { + int totalBytesLastArray = Platform.getInt(data, offset + lastArrayPos); + lastArrayPos += totalBytesLastArray + 4; // 4 for totalbytes in UnsafeArrayData + } else if (relative == 0) { + // return the same position + return; + } else if (relative > 0) { + for (int i = 0; i < relative; i++) { + if (isNullAt(lastArrayRow + i)) continue; + int totalBytesLastArray = Platform.getInt(data, offset + lastArrayPos); + lastArrayPos += totalBytesLastArray + 4; // 4 for totalbytes in UnsafeArrayData + } + } else { + // recalculate pos from the first entry + lastArrayPos = 0; + for (int i = 0; i < rowId; i++) { + if (isNullAt(i)) continue; + int totalBytesLastArray = Platform.getInt(data, offset + lastArrayPos); + lastArrayPos += totalBytesLastArray + 4; // 4 for totalbytes in UnsafeArrayData + } + } + lastArrayRow = rowId; + } + + private int setUnsafeArray(int rowId) { + assert(data != null); + int length; + if (rowId - lastArrayRow == 1 && !anyNullsSet()) { + // inlined frequently-executed path (access an array in the next row) + lastArrayRow = rowId; + long localOffset = offset; + int localLastArrayPos = lastArrayPos; + int totalBytesLastArray = Platform.getInt(data, localOffset + localLastArrayPos); + localLastArrayPos += totalBytesLastArray + 4; // 4 for totalbytes in UnsafeArrayData + length = Platform.getInt(data, localOffset + localLastArrayPos); + ((UnsafeColumnVector)(this.resultArray.data)) + .unsafeArray.pointTo(data, localOffset + localLastArrayPos + 4, length); + lastArrayPos = localLastArrayPos; + } else { + updateLastArrayPos(rowId); + length = Platform.getInt(data, offset + lastArrayPos); // inline getArrayLength() + ((UnsafeColumnVector)(this.resultArray.data)) + .unsafeArray.pointTo(data, offset + lastArrayPos + 4, length); + } + return ((UnsafeColumnVector)(this.resultArray.data)).unsafeArray.numElements(); + } + + @Override + public ColumnVector.Array getArray(int rowId) { + int elements = setUnsafeArray(rowId); + + resultArray.length = elements; + resultArray.offset = 0; + return resultArray; + } + + @Override + public int getArrayLength(int rowId) { + updateLastArrayPos(rowId); + return Platform.getInt(data, offset + lastArrayPos); + } + + @Override + public int getArrayOffset(int rowId) { + updateLastArrayPos(rowId); + return lastArrayPos; + } + + @Override + public void putArray(int rowId, int offset, int length) { + throw new NotImplementedException(); + } + + @Override + public void loadBytes(ColumnVector.Array array) { + throw new NotImplementedException(); + } + + // + // APIs dealing with Byte Arrays + // + + @Override + public int putByteArray(int rowId, byte[] value, int offset, int length) { + throw new NotImplementedException(); + } + + // Spilt this function out since it is the slow path. + @Override + protected void reserveInternal(int newCapacity) { + assert(this.resultArray != null); + byte[] newNulls = new byte[newCapacity]; + if (nulls != null) System.arraycopy(nulls, 0, newNulls, 0, capacity); + nulls = newNulls; + capacity = newCapacity; + } + + @Override + protected UnsafeColumnVector reserveNewColumn(int capacity, DataType type) { + return new UnsafeColumnVector(capacity, type); + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 4921bbd9bcc00..79534fdac83c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -67,16 +67,38 @@ case class InMemoryTableScanExec( private val relationSchema = relation.schema.toArray - private lazy val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i))) + private val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i))) + + override def vectorTypes: Option[Seq[String]] = { + val fields = columnarBatchSchema.fields + Option((0 until fields.length).map { i => + if (fields(i).dataType.isInstanceOf[ArrayType]) { + classOf[UnsafeColumnVector].getName + } else { + classOf[OnHeapColumnVector].getName + } + }) + } private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = { val rowCount = cachedColumnarBatch.numRows +<<<<<<< HEAD val taskContext = Option(TaskContext.get()) val columnVectors = if (!conf.offHeapColumnVectorEnabled || taskContext.isEmpty) { OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema) } else { OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema) } +======= + val fields = columnarBatchSchema.fields + val columnVectors = (0 until fields.length).map { i => + if (fields(i).dataType.isInstanceOf[ArrayType]) { + new UnsafeColumnVector(rowCount, fields(i).dataType) + } else { + new OnHeapColumnVector(rowCount, fields(i).dataType) + } + }.toArray +>>>>>>> add UnsafeColumnVector to support array for table cache val columnarBatch = new ColumnarBatch( columnarBatchSchema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount) columnarBatch.setNumRows(rowCount) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala index 6d90b74aae2b7..116ef0e369e2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala @@ -66,6 +66,6 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor { columnVector.putNull(ordinal) } } - columnVector.putUnsafeArrayData(underlyingBuffer) + columnVector.putUnsafeData(underlyingBuffer) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala index 7e8f5c9071390..7d9760ba0a86f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala @@ -74,8 +74,6 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext { assert(df.select("b").first() === Row(outerStruct)) } - // checkAnswer(sparkContext.parallelize(Seq[java.lang.Integer](0, null, 2), 1).toDF, - test("primitive data type accesses in persist data") { val data = Seq(true, 1.toByte, 3.toShort, 7, 15.toLong, 31.25.toFloat, 63.75, null, Array(1.2, 2.3), Array[java.lang.Double](1.2, null)) @@ -111,4 +109,11 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext { df.count assert(df.filter("d < 3").count == 1) } + + test("primitive array for Dataset") { + val ds = sparkContext.parallelize(Seq(Array(6, 7), Array(8, 9, 10)), 1).toDS.cache + ds.count + val ds1 = ds.map(p => p).collect + assert(ds1(0) === Array(6, 7) && ds1(1) === Array(8, 9, 10)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 1d4e12666de9d..b8296e4da8136 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -52,6 +52,13 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } } + private def withColumnArrayVectors( + size: Int, + dt: DataType)( + block: WritableColumnVector => Unit): Unit = { + withVector(new UnsafeColumnVector(size, dt))(block) + } + testVectors("boolean", 10, BooleanType) { testVector => (0 until 10).foreach { i => testVector.appendBoolean(i % 2 == 0) @@ -415,7 +422,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { columnBuilder.appendFrom(row, 0) } - withVectors(N, dataType) { testVector => + withColumnArrayVectors(N, dataType) { testVector => val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) ColumnAccessor.decompress(columnAccessor, testVector, 16) @@ -425,6 +432,9 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } else { assert(testVector.isNullAt(i) == false) assert(testVector.getArray(i).toBooleanArray() === data(i)) + for (j <- 0 until data(i).length) { + assert(testVector.getArray(i).getBoolean(j) == data(i)(j)) + } } } for (i <- 0 to N / 3) { @@ -464,7 +474,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { columnBuilder.appendFrom(row, 0) } - withVectors(N, dataType) { testVector => + withColumnArrayVectors(N, dataType) { testVector => val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) ColumnAccessor.decompress(columnAccessor, testVector, 16) @@ -474,6 +484,9 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } else { assert(testVector.isNullAt(i) == false) assert(testVector.getArray(i).toByteArray() === data(i)) + for (j <- 0 until data(i).length) { + assert(testVector.getArray(i).getByte(j) == data(i)(j)) + } } } for (i <- 0 to N / 3) { @@ -513,7 +526,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { columnBuilder.appendFrom(row, 0) } - withVectors(N, dataType) { testVector => + withColumnArrayVectors(N, dataType) { testVector => val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) ColumnAccessor.decompress(columnAccessor, testVector, 16) @@ -523,6 +536,9 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } else { assert(testVector.isNullAt(i) == false) assert(testVector.getArray(i).toShortArray() === data(i)) + for (j <- 0 until data(i).length) { + assert(testVector.getArray(i).getShort(j) == data(i)(j)) + } } } for (i <- 0 to N / 3) { @@ -562,7 +578,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { columnBuilder.appendFrom(row, 0) } - withVectors(N, dataType) { testVector => + withColumnArrayVectors(N, dataType) { testVector => val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) ColumnAccessor.decompress(columnAccessor, testVector, 16) @@ -580,6 +596,9 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } else { assert(testVector.isNullAt(i * 3) == false) assert(testVector.getArray(i * 3).toIntArray() === data(i * 3)) + for (j <- 0 until data(i).length) { + assert(testVector.getArray(i).getInt(j) == data(i)(j)) + } } } for (i <- 1 to N / 3) { @@ -611,7 +630,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { columnBuilder.appendFrom(row, 0) } - withVectors(N, dataType) { testVector => + withColumnArrayVectors(N, dataType) { testVector => val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) ColumnAccessor.decompress(columnAccessor, testVector, 16) @@ -621,6 +640,9 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } else { assert(testVector.isNullAt(i) == false) assert(testVector.getArray(i).toLongArray() === data(i)) + for (j <- 0 until data(i).length) { + assert(testVector.getArray(i).getLong(j) == data(i)(j)) + } } } for (i <- 0 to N / 3) { @@ -660,7 +682,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { columnBuilder.appendFrom(row, 0) } - withVectors(N, dataType) { testVector => + withColumnArrayVectors(N, dataType) { testVector => val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) ColumnAccessor.decompress(columnAccessor, testVector, 16) @@ -670,6 +692,9 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } else { assert(testVector.isNullAt(i) == false) assert(testVector.getArray(i).toFloatArray() === data(i)) + for (j <- 0 until data(i).length) { + assert(testVector.getArray(i).getFloat(j) == data(i)(j)) + } } } for (i <- 0 to N / 3) { @@ -709,7 +734,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { columnBuilder.appendFrom(row, 0) } - withVectors(N, dataType) { testVector => + withColumnArrayVectors(N, dataType) { testVector => val columnAccessor = ColumnAccessor(dataType, columnBuilder.build) ColumnAccessor.decompress(columnAccessor, testVector, 16) @@ -719,6 +744,9 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } else { assert(testVector.isNullAt(i) == false) assert(testVector.getArray(i).toDoubleArray() === data(i)) + for (j <- 0 until data(i).length) { + assert(testVector.getArray(i).getDouble(j) == data(i)(j)) + } } } for (i <- 0 to N / 3) { From ef1fb3df1169ab128565d91a93ff6c1f628a4209 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 30 Oct 2017 18:43:40 +0100 Subject: [PATCH 03/12] fix faiulres in CacheTableSuite, HiveCompatibilitySuite, and HiveQuerySuite --- .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 79534fdac83c4..587e78004d40e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -67,7 +67,7 @@ case class InMemoryTableScanExec( private val relationSchema = relation.schema.toArray - private val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i))) + private lazy val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i))) override def vectorTypes: Option[Seq[String]] = { val fields = columnarBatchSchema.fields From c5882ce6f2f44b731c9d34a25a550d605bfad507 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 30 Oct 2017 18:44:05 +0100 Subject: [PATCH 04/12] remove wrong assert to fix failures --- .../spark/sql/execution/vectorized/UnsafeColumnVector.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java index 90e91b6438e20..bda7a0d0b14c6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java @@ -520,7 +520,6 @@ public int putByteArray(int rowId, byte[] value, int offset, int length) { // Spilt this function out since it is the slow path. @Override protected void reserveInternal(int newCapacity) { - assert(this.resultArray != null); byte[] newNulls = new byte[newCapacity]; if (nulls != null) System.arraycopy(nulls, 0, newNulls, 0, capacity); nulls = newNulls; From f6b963bee335e9c49b96590f8a8802cbd1eec0c9 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 5 Nov 2017 06:57:47 +0000 Subject: [PATCH 05/12] avoid to override ColumnVector.getArray() --- .../vectorized/UnsafeColumnVector.java | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java index bda7a0d0b14c6..b7cd69d1beaba 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java @@ -35,10 +35,10 @@ public final class UnsafeColumnVector extends WritableColumnVector { private byte[] data; private long offset; - // Only set if type is Array. private int lastArrayRow; private int lastArrayPos; private UnsafeArrayData unsafeArray = new UnsafeArrayData(); + private boolean isTopLevel = true; public UnsafeColumnVector(int capacity, DataType type) { super(capacity, type); @@ -110,7 +110,7 @@ public void putNotNulls(int rowId, int count) { public boolean isNullAt(int rowId) { if (nulls == null) return false; // If @@@ existins in @@@@, data is null. - if (data != null) { + if (isTopLevel) { return nulls[rowId] == 1; } else { return unsafeArray.isNullAt(rowId); @@ -430,7 +430,7 @@ public double[] getDoubles(int rowId, int count) { private void updateLastArrayPos(int rowId) { int relative = rowId - lastArrayRow; - if (relative == 1) { + if (relative == 1 && !anyNullsSet()) { int totalBytesLastArray = Platform.getInt(data, offset + lastArrayPos); lastArrayPos += totalBytesLastArray + 4; // 4 for totalbytes in UnsafeArrayData } else if (relative == 0) { @@ -477,25 +477,14 @@ private int setUnsafeArray(int rowId) { return ((UnsafeColumnVector)(this.resultArray.data)).unsafeArray.numElements(); } - @Override - public ColumnVector.Array getArray(int rowId) { - int elements = setUnsafeArray(rowId); - - resultArray.length = elements; - resultArray.offset = 0; - return resultArray; - } - @Override public int getArrayLength(int rowId) { - updateLastArrayPos(rowId); - return Platform.getInt(data, offset + lastArrayPos); + return setUnsafeArray(rowId); } @Override public int getArrayOffset(int rowId) { - updateLastArrayPos(rowId); - return lastArrayPos; + return 0; } @Override From c81528c1a712999ee4fcaf096d61a281f9f28ef8 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 5 Nov 2017 15:53:24 +0000 Subject: [PATCH 06/12] fix test failures --- .../spark/sql/execution/vectorized/UnsafeColumnVector.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java index b7cd69d1beaba..f58cbceb335a3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java @@ -38,7 +38,6 @@ public final class UnsafeColumnVector extends WritableColumnVector { private int lastArrayRow; private int lastArrayPos; private UnsafeArrayData unsafeArray = new UnsafeArrayData(); - private boolean isTopLevel = true; public UnsafeColumnVector(int capacity, DataType type) { super(capacity, type); @@ -109,8 +108,7 @@ public void putNotNulls(int rowId, int count) { @Override public boolean isNullAt(int rowId) { if (nulls == null) return false; - // If @@@ existins in @@@@, data is null. - if (isTopLevel) { + if (data != null) { return nulls[rowId] == 1; } else { return unsafeArray.isNullAt(rowId); From e8fcd34d4e78c62c62912444e875da7f9c21fd0f Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 10 Nov 2017 16:06:20 +0000 Subject: [PATCH 07/12] Remove ColumnVector.putUnsafeData() --- .../vectorized/OffHeapColumnVector.java | 5 ---- .../vectorized/OnHeapColumnVector.java | 5 ---- .../vectorized/UnsafeColumnVector.java | 23 ++++++++----------- .../columnar/NullableColumnAccessor.scala | 3 ++- 4 files changed, 12 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 230c341e3c801..1cbaf08569334 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -97,11 +97,6 @@ public void close() { offsetData = 0; } - @Override - public void putUnsafeData(ByteBuffer array) { - throw new RuntimeException("Cannot put UnsafeData for off-heap column"); - } - // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 65c0b16ff65b7..85d72295ab9b8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -94,11 +94,6 @@ public void close() { arrayOffsets = null; } - @Override - public void putUnsafeData(ByteBuffer array) { - throw new RuntimeException("Cannot put UnsafeData for on-heap column"); - } - // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java index f58cbceb335a3..1c77b2092140e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java @@ -47,18 +47,6 @@ public UnsafeColumnVector(int capacity, DataType type) { nulls = new byte[capacity]; } - @Override - public void putUnsafeData(ByteBuffer buffer) { - assert(this.resultArray != null); - data = buffer.array(); - offset = Platform.BYTE_ARRAY_OFFSET + buffer.position(); - - lastArrayRow = Integer.MAX_VALUE; - lastArrayPos = 0; - - setIsConstant(); - } - @Override public long valuesNativeAddress() { throw new RuntimeException("Cannot get native address for on heap column"); @@ -501,7 +489,16 @@ public void loadBytes(ColumnVector.Array array) { @Override public int putByteArray(int rowId, byte[] value, int offset, int length) { - throw new NotImplementedException(); + assert(this.resultArray != null); + data = value; + this.offset = Platform.BYTE_ARRAY_OFFSET + offset; + + lastArrayRow = Integer.MAX_VALUE; + lastArrayPos = 0; + + setIsConstant(); + + return value.length - offset; } // Spilt this function out since it is the slow path. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala index 116ef0e369e2b..a947ca7e8b0ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala @@ -66,6 +66,7 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor { columnVector.putNull(ordinal) } } - columnVector.putUnsafeData(underlyingBuffer) + columnVector.putByteArray( + 0, underlyingBuffer.array, underlyingBuffer.position, underlyingBuffer.limit) } } From 886833efd127ce70f780beef511c6a6eddaf7f65 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 15 Nov 2017 18:32:30 +0000 Subject: [PATCH 08/12] rebase with master --- .../spark/sql/execution/vectorized/UnsafeColumnVector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java index 1c77b2092140e..123d057433c64 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java @@ -479,7 +479,7 @@ public void putArray(int rowId, int offset, int length) { } @Override - public void loadBytes(ColumnVector.Array array) { + public void loadBytes(ColumnarArray array) { throw new NotImplementedException(); } From 71df28b8ce881521b56bdc3646f2bf5d2b7382c5 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 16 Nov 2017 07:18:17 +0000 Subject: [PATCH 09/12] address review comment --- .../vectorized/UnsafeColumnVector.java | 27 ++++++++++--------- .../vectorized/ColumnVectorSuite.scala | 6 +++++ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java index 123d057433c64..babfb5501fef0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java @@ -127,12 +127,12 @@ public boolean getBoolean(int rowId) { public boolean[] getBooleans(int rowId, int count) { assert(dictionary == null); boolean[] array = unsafeArray.toBooleanArray(); - if (array.length == count) { + if (rowId == 0 && array.length == count) { return array; } else { assert(count < array.length); boolean[] newArray = new boolean[count]; - System.arraycopy(array, 0, newArray, 0, count); + System.arraycopy(array, rowId, newArray, 0, count); return newArray; } } @@ -210,12 +210,12 @@ public short getShort(int rowId) { public short[] getShorts(int rowId, int count) { assert(dictionary == null); short[] array = unsafeArray.toShortArray(); - if (array.length == count) { + if (rowId == 0 && array.length == count) { return array; } else { assert(count < array.length); short[] newArray = new short[count]; - System.arraycopy(array, 0, newArray, 0, count); + System.arraycopy(array, rowId, newArray, 0, count); return newArray; } } @@ -259,12 +259,12 @@ public int getInt(int rowId) { public int[] getInts(int rowId, int count) { assert(dictionary == null); int[] array = unsafeArray.toIntArray(); - if (array.length == count) { + if (rowId == 0 && array.length == count) { return array; } else { assert(count < array.length); int[] newArray = new int[count]; - System.arraycopy(array, 0, newArray, 0, count); + System.arraycopy(array, rowId, newArray, 0, count); return newArray; } } @@ -312,12 +312,12 @@ public long getLong(int rowId) { public long[] getLongs(int rowId, int count) { assert(dictionary == null); long[] array = unsafeArray.toLongArray(); - if (array.length == count) { + if (rowId == 0 && array.length == count) { return array; } else { assert(count < array.length); long[] newArray = new long[count]; - System.arraycopy(array, 0, newArray, 0, count); + System.arraycopy(array, rowId, newArray, 0, count); return newArray; } } @@ -356,12 +356,12 @@ public float getFloat(int rowId) { public float[] getFloats(int rowId, int count) { assert(dictionary == null); float[] array = unsafeArray.toFloatArray(); - if (array.length == count) { + if (rowId == 0 && array.length == count) { return array; } else { assert(count < array.length); float[] newArray = new float[count]; - System.arraycopy(array, 0, newArray, 0, count); + System.arraycopy(array, rowId, newArray, 0, count); return newArray; } } @@ -400,12 +400,12 @@ public double getDouble(int rowId) { public double[] getDoubles(int rowId, int count) { assert(dictionary == null); double[] array = unsafeArray.toDoubleArray(); - if (array.length == count) { + if (rowId == 0 && array.length == count) { return array; } else { assert(count < array.length); double[] newArray = new double[count]; - System.arraycopy(array, 0, newArray, 0, count); + System.arraycopy(array, rowId, newArray, 0, count); return newArray; } } @@ -487,6 +487,9 @@ public void loadBytes(ColumnarArray array) { // APIs dealing with Byte Arrays // + // This method puts byte[] in value for the whole data, which is represented in + // UnsafeArrayData, into data without copying data to avoid inefficient copy. + // For UnsafeColumnVector, this method should be called only once at the beginning. @Override public int putByteArray(int rowId, byte[] value, int offset, int length) { assert(this.resultArray != null); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index b8296e4da8136..47780f129505b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -435,6 +435,12 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { for (j <- 0 until data(i).length) { assert(testVector.getArray(i).getBoolean(j) == data(i)(j)) } + if (data(i).length > 1) { + val subArray = testVector.getArray(i).getBooleans(1, data(i).length - 1) + for (j <- 1 until data(i).length) { + assert(subArray(j - 1)until === data(i)(1 + 0) + } + } } } for (i <- 0 to N / 3) { From b8cef29805e1fe792cf330a1588a9ed5386f8ecb Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 16 Nov 2017 07:54:19 +0000 Subject: [PATCH 10/12] fix compilation error --- .../spark/sql/execution/vectorized/ColumnVectorSuite.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 47780f129505b..b8296e4da8136 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -435,12 +435,6 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { for (j <- 0 until data(i).length) { assert(testVector.getArray(i).getBoolean(j) == data(i)(j)) } - if (data(i).length > 1) { - val subArray = testVector.getArray(i).getBooleans(1, data(i).length - 1) - for (j <- 1 until data(i).length) { - assert(subArray(j - 1)until === data(i)(1 + 0) - } - } } } for (i <- 0 to N / 3) { From 274b7773b5fe941580c2aa264d8d851022d7b3b6 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 21 Nov 2017 18:51:00 +0000 Subject: [PATCH 11/12] fix failures of rebase --- .../columnar/InMemoryTableScanExec.scala | 27 +++++-------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 587e78004d40e..ac555af244ce7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -37,15 +37,6 @@ case class InMemoryTableScanExec( override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren - override def vectorTypes: Option[Seq[String]] = - Option(Seq.fill(attributes.length)( - if (!conf.offHeapColumnVectorEnabled) { - classOf[OnHeapColumnVector].getName - } else { - classOf[OffHeapColumnVector].getName - } - )) - /** * If true, get data from ColumnVector in ColumnarBatch, which are generally faster. * If false, get data from UnsafeRow build from ColumnVector @@ -74,31 +65,27 @@ case class InMemoryTableScanExec( Option((0 until fields.length).map { i => if (fields(i).dataType.isInstanceOf[ArrayType]) { classOf[UnsafeColumnVector].getName - } else { + } else if (!conf.offHeapColumnVectorEnabled) { classOf[OnHeapColumnVector].getName + } else { + classOf[OffHeapColumnVector].getName } }) } private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = { + val fields = columnarBatchSchema.fields val rowCount = cachedColumnarBatch.numRows -<<<<<<< HEAD val taskContext = Option(TaskContext.get()) - val columnVectors = if (!conf.offHeapColumnVectorEnabled || taskContext.isEmpty) { - OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema) - } else { - OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema) - } -======= - val fields = columnarBatchSchema.fields val columnVectors = (0 until fields.length).map { i => if (fields(i).dataType.isInstanceOf[ArrayType]) { new UnsafeColumnVector(rowCount, fields(i).dataType) - } else { + } else if (!conf.offHeapColumnVectorEnabled || taskContext.isEmpty) { new OnHeapColumnVector(rowCount, fields(i).dataType) + } else { + new OffHeapColumnVector(rowCount, fields(i).dataType) } }.toArray ->>>>>>> add UnsafeColumnVector to support array for table cache val columnarBatch = new ColumnarBatch( columnarBatchSchema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount) columnarBatch.setNumRows(rowCount) From 20d2ba2819f9f6c5c10752df2d5f9ca450b0ad51 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 28 Nov 2017 10:48:37 +0000 Subject: [PATCH 12/12] rebase with master --- .../vectorized/UnsafeColumnVector.java | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java index babfb5501fef0..f5db99f268898 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/UnsafeColumnVector.java @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.types.UTF8String; /** * A column backed by UnsafeArrayData on byte[]. @@ -47,15 +48,6 @@ public UnsafeColumnVector(int capacity, DataType type) { nulls = new byte[capacity]; } - @Override - public long valuesNativeAddress() { - throw new RuntimeException("Cannot get native address for on heap column"); - } - @Override - public long nullsNativeAddress() { - throw new RuntimeException("Cannot get native address for on heap column"); - } - @Override public void close() { } @@ -176,6 +168,11 @@ public byte[] getBytes(int rowId, int count) { } } + @Override + protected UTF8String getBytesAsUTF8String(int rowId, int count) { + return UTF8String.fromAddress(null, unsafeArray.getBaseOffset() + rowId, count); + } + // // APIs dealing with Shorts // @@ -478,11 +475,6 @@ public void putArray(int rowId, int offset, int length) { throw new NotImplementedException(); } - @Override - public void loadBytes(ColumnarArray array) { - throw new NotImplementedException(); - } - // // APIs dealing with Byte Arrays //