Skip to content

Commit 1a829df

Browse files
alahvanhovell
authored andcommitted
[SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data
`OffHeapColumnVector.reserveInternal()` will only copy already inserted values during reallocation if `data != null`. In vectors containing arrays or structs this is incorrect, since there field `data` is not used at all. We need to check `nulls` instead. Adds new tests to `ColumnVectorSuite` that reproduce the errors. Author: Ala Luszczak <[email protected]> Closes #19323 from ala/port-vector-realloc.
1 parent c0a34a9 commit 1a829df

File tree

2 files changed

+228
-1
lines changed

2 files changed

+228
-1
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ public void loadBytes(ColumnVector.Array array) {
436436
// Split out the slow path.
437437
@Override
438438
protected void reserveInternal(int newCapacity) {
439-
int oldCapacity = (this.data == 0L) ? 0 : capacity;
439+
int oldCapacity = (nulls == 0L) ? 0 : capacity;
440440
if (this.resultArray != null) {
441441
this.lengthData =
442442
Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.vectorized
19+
20+
import org.scalatest.BeforeAndAfterEach
21+
22+
import org.apache.spark.SparkFunSuite
23+
import org.apache.spark.sql.catalyst.util.ArrayData
24+
import org.apache.spark.sql.types._
25+
import org.apache.spark.unsafe.types.UTF8String
26+
27+
class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
28+
29+
var testVector: ColumnVector = _
30+
31+
private def allocate(capacity: Int, dt: DataType): ColumnVector = {
32+
new OnHeapColumnVector(capacity, dt)
33+
}
34+
35+
override def afterEach(): Unit = {
36+
testVector.close()
37+
}
38+
39+
test("boolean") {
40+
testVector = allocate(10, BooleanType)
41+
(0 until 10).foreach { i =>
42+
testVector.appendBoolean(i % 2 == 0)
43+
}
44+
45+
val array = new ColumnVector.Array(testVector)
46+
47+
(0 until 10).foreach { i =>
48+
assert(array.getBoolean(i) === (i % 2 == 0))
49+
}
50+
}
51+
52+
test("byte") {
53+
testVector = allocate(10, ByteType)
54+
(0 until 10).foreach { i =>
55+
testVector.appendByte(i.toByte)
56+
}
57+
58+
val array = new ColumnVector.Array(testVector)
59+
60+
(0 until 10).foreach { i =>
61+
assert(array.getByte(i) === (i.toByte))
62+
}
63+
}
64+
65+
test("short") {
66+
testVector = allocate(10, ShortType)
67+
(0 until 10).foreach { i =>
68+
testVector.appendShort(i.toShort)
69+
}
70+
71+
val array = new ColumnVector.Array(testVector)
72+
73+
(0 until 10).foreach { i =>
74+
assert(array.getShort(i) === (i.toShort))
75+
}
76+
}
77+
78+
test("int") {
79+
testVector = allocate(10, IntegerType)
80+
(0 until 10).foreach { i =>
81+
testVector.appendInt(i)
82+
}
83+
84+
val array = new ColumnVector.Array(testVector)
85+
86+
(0 until 10).foreach { i =>
87+
assert(array.getInt(i) === i)
88+
}
89+
}
90+
91+
test("long") {
92+
testVector = allocate(10, LongType)
93+
(0 until 10).foreach { i =>
94+
testVector.appendLong(i)
95+
}
96+
97+
val array = new ColumnVector.Array(testVector)
98+
99+
(0 until 10).foreach { i =>
100+
assert(array.getLong(i) === i)
101+
}
102+
}
103+
104+
test("float") {
105+
testVector = allocate(10, FloatType)
106+
(0 until 10).foreach { i =>
107+
testVector.appendFloat(i.toFloat)
108+
}
109+
110+
val array = new ColumnVector.Array(testVector)
111+
112+
(0 until 10).foreach { i =>
113+
assert(array.getFloat(i) === i.toFloat)
114+
}
115+
}
116+
117+
test("double") {
118+
testVector = allocate(10, DoubleType)
119+
(0 until 10).foreach { i =>
120+
testVector.appendDouble(i.toDouble)
121+
}
122+
123+
val array = new ColumnVector.Array(testVector)
124+
125+
(0 until 10).foreach { i =>
126+
assert(array.getDouble(i) === i.toDouble)
127+
}
128+
}
129+
130+
test("string") {
131+
testVector = allocate(10, StringType)
132+
(0 until 10).map { i =>
133+
val utf8 = s"str$i".getBytes("utf8")
134+
testVector.appendByteArray(utf8, 0, utf8.length)
135+
}
136+
137+
val array = new ColumnVector.Array(testVector)
138+
139+
(0 until 10).foreach { i =>
140+
assert(array.getUTF8String(i) === UTF8String.fromString(s"str$i"))
141+
}
142+
}
143+
144+
test("binary") {
145+
testVector = allocate(10, BinaryType)
146+
(0 until 10).map { i =>
147+
val utf8 = s"str$i".getBytes("utf8")
148+
testVector.appendByteArray(utf8, 0, utf8.length)
149+
}
150+
151+
val array = new ColumnVector.Array(testVector)
152+
153+
(0 until 10).foreach { i =>
154+
val utf8 = s"str$i".getBytes("utf8")
155+
assert(array.getBinary(i) === utf8)
156+
}
157+
}
158+
159+
test("array") {
160+
val arrayType = ArrayType(IntegerType, true)
161+
testVector = allocate(10, arrayType)
162+
163+
val data = testVector.arrayData()
164+
var i = 0
165+
while (i < 6) {
166+
data.putInt(i, i)
167+
i += 1
168+
}
169+
170+
// Populate it with arrays [0], [1, 2], [], [3, 4, 5]
171+
testVector.putArray(0, 0, 1)
172+
testVector.putArray(1, 1, 2)
173+
testVector.putArray(2, 3, 0)
174+
testVector.putArray(3, 3, 3)
175+
176+
val array = new ColumnVector.Array(testVector)
177+
178+
assert(array.getArray(0).toIntArray() === Array(0))
179+
assert(array.getArray(1).asInstanceOf[ArrayData].toIntArray() === Array(1, 2))
180+
assert(array.getArray(2).asInstanceOf[ArrayData].toIntArray() === Array.empty[Int])
181+
assert(array.getArray(3).asInstanceOf[ArrayData].toIntArray() === Array(3, 4, 5))
182+
}
183+
184+
test("struct") {
185+
val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
186+
testVector = allocate(10, schema)
187+
val c1 = testVector.getChildColumn(0)
188+
val c2 = testVector.getChildColumn(1)
189+
c1.putInt(0, 123)
190+
c2.putDouble(0, 3.45)
191+
c1.putInt(1, 456)
192+
c2.putDouble(1, 5.67)
193+
194+
val array = new ColumnVector.Array(testVector)
195+
196+
assert(array.getStruct(0, 2).asInstanceOf[ColumnarBatch.Row].getInt(0) === 123)
197+
assert(array.getStruct(0, 2).asInstanceOf[ColumnarBatch.Row].getDouble(1) === 3.45)
198+
assert(array.getStruct(1, 2).asInstanceOf[ColumnarBatch.Row].getInt(0) === 456)
199+
assert(array.getStruct(1, 2).asInstanceOf[ColumnarBatch.Row].getDouble(1) === 5.67)
200+
}
201+
202+
test("[SPARK-22092] off-heap column vector reallocation corrupts array data") {
203+
val arrayType = ArrayType(IntegerType, true)
204+
testVector = new OffHeapColumnVector(8, arrayType)
205+
206+
val data = testVector.arrayData()
207+
(0 until 8).foreach(i => data.putInt(i, i))
208+
(0 until 8).foreach(i => testVector.putArray(i, i, 1))
209+
210+
// Increase vector's capacity and reallocate the data to new bigger buffers.
211+
testVector.reserve(16)
212+
213+
// Check that none of the values got lost/overwritten.
214+
val array = new ColumnVector.Array(testVector)
215+
(0 until 8).foreach { i =>
216+
assert(array.getArray(i).toIntArray() === Array(i))
217+
}
218+
}
219+
220+
test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") {
221+
val structType = new StructType().add("int", IntegerType).add("double", DoubleType)
222+
testVector = new OffHeapColumnVector(8, structType)
223+
(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i))
224+
testVector.reserve(16)
225+
(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
226+
}
227+
}

0 commit comments

Comments
 (0)