Skip to content

Commit d7545d0

Browse files
nastracloud-fan
authored andcommitted
[SPARK-50624][SQL] Add TimestampNTZType to ColumnarRow/MutableColumnarRow
### What changes were proposed in this pull request? Noticed that this was missing when using this in Iceberg. See additional details in apache/iceberg#11815 (comment) ### Why are the changes needed? To be able to read `TimestampNTZType` when using `ColumnarRow` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Added some unit tests that failed without the fix ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#49437 from nastra/SPARK-50624. Authored-by: Eduard Tudenhoefner <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 3f86455 commit d7545d0

File tree

4 files changed

+49
-0
lines changed

4 files changed

+49
-0
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ public Object get(int ordinal, DataType dataType) {
188188
return getInt(ordinal);
189189
} else if (dataType instanceof TimestampType) {
190190
return getLong(ordinal);
191+
} else if (dataType instanceof TimestampNTZType) {
192+
return getLong(ordinal);
191193
} else if (dataType instanceof ArrayType) {
192194
return getArray(ordinal);
193195
} else if (dataType instanceof StructType) {

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ public InternalRow copy() {
8282
row.setInt(i, getInt(i));
8383
} else if (dt instanceof TimestampType) {
8484
row.setLong(i, getLong(i));
85+
} else if (dt instanceof TimestampNTZType) {
86+
row.setLong(i, getLong(i));
8587
} else if (dt instanceof StructType) {
8688
row.update(i, getStruct(i, ((StructType) dt).fields().length).copy());
8789
} else if (dt instanceof ArrayType) {
@@ -191,6 +193,8 @@ public Object get(int ordinal, DataType dataType) {
191193
return getInt(ordinal);
192194
} else if (dataType instanceof TimestampType) {
193195
return getLong(ordinal);
196+
} else if (dataType instanceof TimestampNTZType) {
197+
return getLong(ordinal);
194198
} else if (dataType instanceof ArrayType) {
195199
return getArray(ordinal);
196200
} else if (dataType instanceof StructType structType) {

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,19 @@ class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
274274
}
275275
}
276276

277+
testVectors("mutable ColumnarRow with TimestampNTZType", 10, TimestampNTZType) { testVector =>
278+
val mutableRow = new MutableColumnarRow(Array(testVector))
279+
(0 until 10).foreach { i =>
280+
mutableRow.rowId = i
281+
mutableRow.setLong(0, 10 - i)
282+
}
283+
(0 until 10).foreach { i =>
284+
mutableRow.rowId = i
285+
assert(mutableRow.get(0, TimestampNTZType) === (10 - i))
286+
assert(mutableRow.copy().get(0, TimestampNTZType) === (10 - i))
287+
}
288+
}
289+
277290
val arrayType: ArrayType = ArrayType(IntegerType, containsNull = true)
278291
testVectors("array", 10, arrayType) { testVector =>
279292

@@ -384,18 +397,24 @@ class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
384397
}
385398

386399
val structType: StructType = new StructType().add("int", IntegerType).add("double", DoubleType)
400+
.add("ts", TimestampNTZType)
387401
testVectors("struct", 10, structType) { testVector =>
388402
val c1 = testVector.getChild(0)
389403
val c2 = testVector.getChild(1)
404+
val c3 = testVector.getChild(2)
390405
c1.putInt(0, 123)
391406
c2.putDouble(0, 3.45)
407+
c3.putLong(0, 1000L)
392408
c1.putInt(1, 456)
393409
c2.putDouble(1, 5.67)
410+
c3.putLong(1, 2000L)
394411

395412
assert(testVector.getStruct(0).get(0, IntegerType) === 123)
396413
assert(testVector.getStruct(0).get(1, DoubleType) === 3.45)
414+
assert(testVector.getStruct(0).get(2, TimestampNTZType) === 1000L)
397415
assert(testVector.getStruct(1).get(0, IntegerType) === 456)
398416
assert(testVector.getStruct(1).get(1, DoubleType) === 5.67)
417+
assert(testVector.getStruct(1).get(2, TimestampNTZType) === 2000L)
399418
}
400419

401420
testVectors("SPARK-44805: getInts with dictionary", 3, IntegerType) { testVector =>

sql/core/src/test/scala/org/apache/spark/sql/vectorized/ArrowColumnVectorSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,4 +515,28 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
515515
columnVector.close()
516516
allocator.close()
517517
}
518+
519+
test("struct with TimestampNTZType") {
520+
val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
521+
val schema = new StructType().add("ts", TimestampNTZType)
522+
val vector = ArrowUtils.toArrowField("struct", schema, nullable = true, null)
523+
.createVector(allocator).asInstanceOf[StructVector]
524+
vector.allocateNew()
525+
val timestampVector = vector.getChildByOrdinal(0).asInstanceOf[TimeStampMicroVector]
526+
527+
vector.setIndexDefined(0)
528+
timestampVector.setSafe(0, 1000L)
529+
530+
timestampVector.setValueCount(1)
531+
vector.setValueCount(1)
532+
533+
val columnVector = new ArrowColumnVector(vector)
534+
assert(columnVector.dataType === schema)
535+
536+
val row0 = columnVector.getStruct(0)
537+
assert(row0.get(0, TimestampNTZType) === 1000L)
538+
539+
columnVector.close()
540+
allocator.close()
541+
}
518542
}

0 commit comments

Comments
 (0)