From e32996eec05c6fe28236b22cb61ab76dd9be1548 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Thu, 29 Jun 2023 00:32:07 +0800 Subject: [PATCH 01/21] Add vector reserve policy --- .../apache/spark/sql/internal/SQLConf.scala | 21 ++++++ .../vectorized/OffHeapColumnVector.java | 16 +++++ .../vectorized/OnHeapColumnVector.java | 17 +++++ .../vectorized/VectorReservePolicy.java | 71 +++++++++++++++++++ .../vectorized/WritableColumnVector.java | 13 ++-- .../columnar/VectorReservePolicySuite.scala | 54 ++++++++++++++ .../vectorized/ColumnarBatchSuite.scala | 2 +- 7 files changed, 187 insertions(+), 7 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 33b28f60f2a93..c43dc67cb3deb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -487,6 +487,23 @@ object SQLConf { .intConf .createWithDefault(10000) + val VECTORIZED_HUGE_VECTOR_RESERVE_RATIO = + buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio") + .doc("spark will reserve requiredCapacity * this ratio memory next time") + .version("3.5.0") + .doubleConf + .createWithDefault(1.1) + + val VECTORIZED_HUGE_VECTOR_THRESHOLD = + buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorThreshold") + .doc("When the in memory column vector is larger than this, spark will reserve " + + s"requiredCapacity * ${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and " + + "free this column vector before reading next batch data. -1 means disabling the " + + "optimization.") + .version("3.5.0") + .intConf + .createWithDefault(-1) + val IN_MEMORY_PARTITION_PRUNING = buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning") .internal() @@ -4630,6 +4647,10 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) + def vectorizedHugeVectorThreshold: Int = getConf(VECTORIZED_HUGE_VECTOR_THRESHOLD) + + def vectorizedHugeVectorReserveRatio: Double = getConf(VECTORIZED_HUGE_VECTOR_RESERVE_RATIO) + def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 5674a091f6d6c..c0461e3b19506 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -84,6 +84,22 @@ public long valuesNativeAddress() { return data; } + @Override + public void reset() { + super.reset(); + if (vectorReservePolicy.shouldCleanData()) { + capacity = vectorReservePolicy.defaultCapacity; + Platform.freeMemory(nulls); + Platform.freeMemory(data); + Platform.freeMemory(lengthData); + Platform.freeMemory(offsetData); + nulls = 0; + data = 0; + lengthData = 0; + offsetData = 0; + } + } + @Override public void close() { super.close(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 6e4a9c643e89c..14f28e72f93e4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -80,6 +80,23 @@ public OnHeapColumnVector(int capacity, DataType type) { reset(); } + @Override + public void reset() { + super.reset(); + if (vectorReservePolicy.shouldCleanData()) { + capacity = vectorReservePolicy.defaultCapacity; + nulls = null; + byteData = null; + shortData = null; + intData = null; + longData = null; + floatData = null; + doubleData = null; + arrayLengths = null; + arrayOffsets = null; + } + } + @Override public void close() { super.close(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java new file mode 100644 index 0000000000000..3bcb666ffe121 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.unsafe.array.ByteArrayMethods; + +public abstract class VectorReservePolicy { + + protected int defaultCapacity; + + abstract int nextCapacity(int requiredCapacity); + + boolean shouldCleanData() { + return false; + } + + /** + * Upper limit for the maximum capacity for this column. + */ + @VisibleForTesting + protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; +} + +class DefaultVectorReservePolicy extends VectorReservePolicy { + + int hugeThreshold; + double hugeReserveRatio; + + int currentCapacity; + + public DefaultVectorReservePolicy(int defaultCapacity) { + this.defaultCapacity = defaultCapacity; + this.currentCapacity = defaultCapacity; + SQLConf conf = SQLConf.get(); + hugeThreshold = conf.vectorizedHugeVectorThreshold(); + hugeReserveRatio = conf.vectorizedHugeVectorReserveRatio(); + } + + @Override + public int nextCapacity(int requiredCapacity) { + if (hugeThreshold < 0 || requiredCapacity < hugeThreshold) { + currentCapacity = Math.min(MAX_CAPACITY, requiredCapacity * 2); + } else { + currentCapacity = Math.min(MAX_CAPACITY, (int) (requiredCapacity * hugeReserveRatio)); + } + return currentCapacity; + } + + @Override + public boolean shouldCleanData() { + return currentCapacity > hugeThreshold; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index a8e4aad60c222..af6acd209bdc5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -95,7 +95,7 @@ public void reserve(int requiredCapacity) { if (requiredCapacity < 0) { throwUnsupportedException(requiredCapacity, null); } else if (requiredCapacity > capacity) { - int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L); + int newCapacity = vectorReservePolicy.nextCapacity(requiredCapacity); if (requiredCapacity <= newCapacity) { try { reserveInternal(newCapacity); @@ -867,11 +867,7 @@ public final boolean isAllNull() { */ protected int capacity; - /** - * Upper limit for the maximum capacity for this column. - */ - @VisibleForTesting - protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; + protected VectorReservePolicy vectorReservePolicy; /** * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks. @@ -922,6 +918,7 @@ protected boolean isArray() { protected WritableColumnVector(int capacity, DataType dataType) { super(dataType); this.capacity = capacity; + this.vectorReservePolicy = new DefaultVectorReservePolicy(capacity); if (isArray()) { DataType childType; @@ -955,4 +952,8 @@ protected WritableColumnVector(int capacity, DataType dataType) { this.childColumns = null; } } + + public int getCapacity() { + return capacity; + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala new file mode 100644 index 0000000000000..6cbbb7ea20619 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSessionBase +import org.apache.spark.sql.types.ByteType + +class VectorReservePolicySuite extends SparkFunSuite with SharedSparkSessionBase { + + test("Test column vector reserve policy") { + withSQLConf( + SQLConf.VECTORIZED_HUGE_VECTOR_THRESHOLD.key -> "300", + SQLConf.VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key -> "1.2") { + val dataType = ByteType + + Array(new OnHeapColumnVector(80, dataType), + new OffHeapColumnVector(80, dataType)).foreach { vector => + try { + // For small vector, new capacity = request capacity * 2 and will not reset this. + vector.appendBytes(100, 0) + assert(vector.getCapacity == 200) + vector.reset() + assert(vector.getCapacity == 200) + + // For huge vector, new capacity = old capacity * 1.1 for huge vector and will be reset + vector.appendBytes(300, 0) + assert(vector.getCapacity == 360) + vector.reset() + assert(vector.getCapacity == 80) + } finally { + vector.close() + } + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index b34af3c8c633c..950ad1c3eb907 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -1565,7 +1565,7 @@ class ColumnarBatchSuite extends SparkFunSuite { test("exceeding maximum capacity should throw an error") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => val column = allocate(1, ByteType, memMode) - column.MAX_CAPACITY = 15 + column.vectorReservePolicy.MAX_CAPACITY = 15 column.appendBytes(5, 0.toByte) // Successfully allocate twice the requested capacity assert(column.capacity == 10) From 4dfbbdbadbe2714041f59131414865c1e6cdb478 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Thu, 29 Jun 2023 17:34:13 +0800 Subject: [PATCH 02/21] Allocate default databuf array --- .../spark/sql/execution/vectorized/OffHeapColumnVector.java | 1 + .../spark/sql/execution/vectorized/OnHeapColumnVector.java | 1 + 2 files changed, 2 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index c0461e3b19506..6b4fcc7f5fcdf 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -97,6 +97,7 @@ public void reset() { data = 0; lengthData = 0; offsetData = 0; + reserveInternal(capacity); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 14f28e72f93e4..64154ed1cfff0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -94,6 +94,7 @@ public void reset() { doubleData = null; arrayLengths = null; arrayOffsets = null; + reserveInternal(capacity); } } From c97a0c4835d84757008251c9182d30f1e28bd331 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 30 Jun 2023 11:22:52 +0800 Subject: [PATCH 03/21] Bug fix for column vector with default values --- .../parquet/ParquetColumnVector.java | 2 ++ .../vectorized/OffHeapColumnVector.java | 4 +-- .../vectorized/OnHeapColumnVector.java | 4 +-- .../vectorized/VectorReservePolicy.java | 28 +++++++++++-------- .../vectorized/WritableColumnVector.java | 18 ++++++++---- .../vectorized/ColumnarBatchSuite.scala | 2 +- 6 files changed, 36 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java index e8ae6f290bc4d..5f02050c36a8f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java @@ -84,6 +84,8 @@ final class ParquetColumnVector { if (defaultValue == null) { vector.setAllNull(); return; + } else { + vector.setHasDefaultValue(); } // For Parquet tables whose columns have associated DEFAULT values, this reader must return // those values instead of NULL when the corresponding columns are not present in storage. diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 6b4fcc7f5fcdf..93335aebfde66 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -87,8 +87,8 @@ public long valuesNativeAddress() { @Override public void reset() { super.reset(); - if (vectorReservePolicy.shouldCleanData()) { - capacity = vectorReservePolicy.defaultCapacity; + if (reservePolicy.shouldCleanData()) { + capacity = reservePolicy.defaultCapacity; Platform.freeMemory(nulls); Platform.freeMemory(data); Platform.freeMemory(lengthData); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 64154ed1cfff0..19a8a5f2cf61c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -83,8 +83,8 @@ public OnHeapColumnVector(int capacity, DataType type) { @Override public void reset() { super.reset(); - if (vectorReservePolicy.shouldCleanData()) { - capacity = vectorReservePolicy.defaultCapacity; + if (reservePolicy.shouldCleanData()) { + capacity = reservePolicy.defaultCapacity; nulls = null; byteData = null; shortData = null; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java index 3bcb666ffe121..f30c0ec6831bf 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java @@ -26,25 +26,31 @@ public abstract class VectorReservePolicy { protected int defaultCapacity; - abstract int nextCapacity(int requiredCapacity); - - boolean shouldCleanData() { - return false; - } - /** * Upper limit for the maximum capacity for this column. */ @VisibleForTesting protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; + + /** + * True if this column has default values. Return the default values instead of NULL when the + * corresponding columns are not present in storage. We can not reset the data of column vectors + * that has default values. + */ + protected boolean hasDefaultValue; + + abstract int nextCapacity(int requiredCapacity); + + boolean shouldCleanData() { + return false; + } } class DefaultVectorReservePolicy extends VectorReservePolicy { - int hugeThreshold; - double hugeReserveRatio; - - int currentCapacity; + private int hugeThreshold; + private double hugeReserveRatio; + private int currentCapacity; public DefaultVectorReservePolicy(int defaultCapacity) { this.defaultCapacity = defaultCapacity; @@ -66,6 +72,6 @@ public int nextCapacity(int requiredCapacity) { @Override public boolean shouldCleanData() { - return currentCapacity > hugeThreshold; + return !hasDefaultValue && currentCapacity > hugeThreshold; } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index af6acd209bdc5..fa558bf414b53 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -21,8 +21,6 @@ import java.nio.ByteBuffer; import java.util.Optional; -import com.google.common.annotations.VisibleForTesting; - import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; import org.apache.spark.sql.catalyst.util.GenericArrayData; @@ -31,7 +29,6 @@ import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarMap; -import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -95,7 +92,7 @@ public void reserve(int requiredCapacity) { if (requiredCapacity < 0) { throwUnsupportedException(requiredCapacity, null); } else if (requiredCapacity > capacity) { - int newCapacity = vectorReservePolicy.nextCapacity(requiredCapacity); + int newCapacity = reservePolicy.nextCapacity(requiredCapacity); if (requiredCapacity <= newCapacity) { try { reserveInternal(newCapacity); @@ -848,6 +845,15 @@ public final void addElementsAppended(int num) { */ public final void setIsConstant() { isConstant = true; } + public final void setHasDefaultValue() { + if (childColumns != null) { + for (WritableColumnVector c : childColumns) { + c.setHasDefaultValue(); + } + } + reservePolicy.hasDefaultValue = true; + } + /** * Marks this column only contains null values. */ @@ -867,7 +873,7 @@ public final boolean isAllNull() { */ protected int capacity; - protected VectorReservePolicy vectorReservePolicy; + public VectorReservePolicy reservePolicy; /** * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks. @@ -918,7 +924,7 @@ protected boolean isArray() { protected WritableColumnVector(int capacity, DataType dataType) { super(dataType); this.capacity = capacity; - this.vectorReservePolicy = new DefaultVectorReservePolicy(capacity); + this.reservePolicy = new DefaultVectorReservePolicy(capacity); if (isArray()) { DataType childType; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 950ad1c3eb907..d2a5161d26a80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -1565,7 +1565,7 @@ class ColumnarBatchSuite extends SparkFunSuite { test("exceeding maximum capacity should throw an error") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => val column = allocate(1, ByteType, memMode) - column.vectorReservePolicy.MAX_CAPACITY = 15 + column.reservePolicy.MAX_CAPACITY = 15 column.appendBytes(5, 0.toByte) // Successfully allocate twice the requested capacity assert(column.capacity == 10) From 9b9305a831b703f2171aef752048739e48ea4e35 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 30 Jun 2023 14:01:51 +0800 Subject: [PATCH 04/21] Fix code style --- .../spark/sql/execution/vectorized/VectorReservePolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java index f30c0ec6831bf..fcec0632e9be9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java @@ -52,7 +52,7 @@ class DefaultVectorReservePolicy extends VectorReservePolicy { private double hugeReserveRatio; private int currentCapacity; - public DefaultVectorReservePolicy(int defaultCapacity) { + DefaultVectorReservePolicy(int defaultCapacity) { this.defaultCapacity = defaultCapacity; this.currentCapacity = defaultCapacity; SQLConf conf = SQLConf.get(); From 7cdf5147bc8f9484f623195f2b78bec9941e615d Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 30 Jun 2023 14:14:27 +0800 Subject: [PATCH 05/21] Fix code style --- .../spark/sql/execution/vectorized/VectorReservePolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java index fcec0632e9be9..e7fc728de6411 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java @@ -37,7 +37,7 @@ public abstract class VectorReservePolicy { * corresponding columns are not present in storage. We can not reset the data of column vectors * that has default values. */ - protected boolean hasDefaultValue; + protected boolean hasDefaultValue = false; abstract int nextCapacity(int requiredCapacity); From ba0d8bc7ad061a7a0c1d0141255643a945e56478 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 30 Jun 2023 15:34:06 +0800 Subject: [PATCH 06/21] Bug fix --- .../spark/sql/execution/vectorized/VectorReservePolicy.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java index e7fc728de6411..596f326cd80e9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java @@ -63,9 +63,9 @@ class DefaultVectorReservePolicy extends VectorReservePolicy { @Override public int nextCapacity(int requiredCapacity) { if (hugeThreshold < 0 || requiredCapacity < hugeThreshold) { - currentCapacity = Math.min(MAX_CAPACITY, requiredCapacity * 2); + currentCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L); } else { - currentCapacity = Math.min(MAX_CAPACITY, (int) (requiredCapacity * hugeReserveRatio)); + currentCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * hugeReserveRatio); } return currentCapacity; } From dd24ec4d301ea1f7f0b5c885bbd5a8e6895a128c Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Mon, 3 Jul 2023 11:42:21 +0800 Subject: [PATCH 07/21] Constant vector can not be reset --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/execution/vectorized/OffHeapColumnVector.java | 2 +- .../spark/sql/execution/vectorized/OnHeapColumnVector.java | 2 +- .../spark/sql/execution/vectorized/VectorReservePolicy.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c43dc67cb3deb..751b98cb83a7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -492,7 +492,7 @@ object SQLConf { .doc("spark will reserve requiredCapacity * this ratio memory next time") .version("3.5.0") .doubleConf - .createWithDefault(1.1) + .createWithDefault(1.2) val VECTORIZED_HUGE_VECTOR_THRESHOLD = buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorThreshold") diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 93335aebfde66..c8c3e80c3255b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -87,7 +87,7 @@ public long valuesNativeAddress() { @Override public void reset() { super.reset(); - if (reservePolicy.shouldCleanData()) { + if (!isConstant && reservePolicy.shouldCleanData()) { capacity = reservePolicy.defaultCapacity; Platform.freeMemory(nulls); Platform.freeMemory(data); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 19a8a5f2cf61c..9663717ddfd65 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -83,7 +83,7 @@ public OnHeapColumnVector(int capacity, DataType type) { @Override public void reset() { super.reset(); - if (reservePolicy.shouldCleanData()) { + if (!isConstant && reservePolicy.shouldCleanData()) { capacity = reservePolicy.defaultCapacity; nulls = null; byteData = null; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java index 596f326cd80e9..75739a636c718 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java @@ -72,6 +72,6 @@ public int nextCapacity(int requiredCapacity) { @Override public boolean shouldCleanData() { - return !hasDefaultValue && currentCapacity > hugeThreshold; + return hugeThreshold > 0 && !hasDefaultValue && currentCapacity > hugeThreshold; } } From fb289220cdad025e0bb190ae3c890afcda9ebab2 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 28 Jul 2023 11:43:52 +0800 Subject: [PATCH 08/21] Make shouldCleanData() abstract --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 +++- .../spark/sql/execution/vectorized/VectorReservePolicy.java | 4 +--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 751b98cb83a7a..2497b6c1c0e76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -489,7 +489,9 @@ object SQLConf { val VECTORIZED_HUGE_VECTOR_RESERVE_RATIO = buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio") - .doc("spark will reserve requiredCapacity * this ratio memory next time") + .doc("spark will reserve requiredCapacity * this ratio memory next time. This is only " + + "effective when spark.sql.inMemoryColumnarStorage.hugeVectorThreshold > 0 and required " + + "memory larger than that threshold.") .version("3.5.0") .doubleConf .createWithDefault(1.2) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java index 75739a636c718..f838e9cb67de8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java @@ -41,9 +41,7 @@ public abstract class VectorReservePolicy { abstract int nextCapacity(int requiredCapacity); - boolean shouldCleanData() { - return false; - } + abstract boolean shouldCleanData(); } class DefaultVectorReservePolicy extends VectorReservePolicy { From 21f7a58e6b798dc9dfca975985dc38a216370666 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Mon, 31 Jul 2023 17:17:37 +0800 Subject: [PATCH 09/21] retrigger test --- .../sql/execution/columnar/VectorReservePolicySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala index 6cbbb7ea20619..e23efcc1c5d70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala @@ -34,13 +34,13 @@ class VectorReservePolicySuite extends SparkFunSuite with SharedSparkSessionBase Array(new OnHeapColumnVector(80, dataType), new OffHeapColumnVector(80, dataType)).foreach { vector => try { - // For small vector, new capacity = request capacity * 2 and will not reset this. + // The new capacity of small vector = request capacity * 2 and will not be reset vector.appendBytes(100, 0) assert(vector.getCapacity == 200) vector.reset() assert(vector.getCapacity == 200) - // For huge vector, new capacity = old capacity * 1.1 for huge vector and will be reset + // The new capacity of huge vector = old capacity * 1.1 for huge vector and will be reset vector.appendBytes(300, 0) assert(vector.getCapacity == 360) vector.reset() From e47eb8a68520f995a6345b458d08eabfa15da678 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Tue, 1 Aug 2023 14:52:07 +0800 Subject: [PATCH 10/21] Update the formula of the reserved memory --- .../spark/sql/execution/vectorized/VectorReservePolicy.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java index f838e9cb67de8..e7af7845c5866 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java @@ -63,7 +63,8 @@ public int nextCapacity(int requiredCapacity) { if (hugeThreshold < 0 || requiredCapacity < hugeThreshold) { currentCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L); } else { - currentCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * hugeReserveRatio); + currentCapacity = (int) Math.min(MAX_CAPACITY, + (requiredCapacity - hugeThreshold) * hugeReserveRatio + hugeThreshold * 2L); } return currentCapacity; } From 4594f34cf77c0073f804f865c284ee6092d8b22b Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Wed, 2 Aug 2023 12:49:25 +0800 Subject: [PATCH 11/21] Fix UT --- .../sql/execution/columnar/VectorReservePolicySuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala index e23efcc1c5d70..52fa85cb1c2f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala @@ -40,9 +40,10 @@ class VectorReservePolicySuite extends SparkFunSuite with SharedSparkSessionBase vector.reset() assert(vector.getCapacity == 200) - // The new capacity of huge vector = old capacity * 1.1 for huge vector and will be reset + // The new capacity of huge vector = (request capacity - HUGE_VECTOR_THRESHOLD) * 1.2 + + // HUGE_VECTOR_THRESHOLD * 2 = (300 - 300) * 1.2 + 300 * 2 and will be reset. vector.appendBytes(300, 0) - assert(vector.getCapacity == 360) + assert(vector.getCapacity == 600) vector.reset() assert(vector.getCapacity == 80) } finally { From 5d03e7926ddab1ea41b4e78db3a978e6bb848d4c Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Tue, 15 Aug 2023 18:46:48 +0800 Subject: [PATCH 12/21] Inline write vector reserve policy --- .../apache/spark/sql/internal/SQLConf.scala | 4 +- .../vectorized/OffHeapColumnVector.java | 5 +- .../vectorized/OnHeapColumnVector.java | 5 +- .../vectorized/VectorReservePolicy.java | 76 ------------------- .../vectorized/WritableColumnVector.java | 37 ++++++++- ...cySuite.scala => VectorReserveSuite.scala} | 4 +- .../vectorized/ColumnarBatchSuite.scala | 2 +- 7 files changed, 44 insertions(+), 89 deletions(-) delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java rename sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/{VectorReservePolicySuite.scala => VectorReserveSuite.scala} (93%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2497b6c1c0e76..61cc493147625 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -503,7 +503,7 @@ object SQLConf { "free this column vector before reading next batch data. -1 means disabling the " + "optimization.") .version("3.5.0") - .intConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(-1) val IN_MEMORY_PARTITION_PRUNING = @@ -4649,7 +4649,7 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) - def vectorizedHugeVectorThreshold: Int = getConf(VECTORIZED_HUGE_VECTOR_THRESHOLD) + def vectorizedHugeVectorThreshold: Int = getConf(VECTORIZED_HUGE_VECTOR_THRESHOLD).toInt def vectorizedHugeVectorReserveRatio: Double = getConf(VECTORIZED_HUGE_VECTOR_RESERVE_RATIO) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index c8c3e80c3255b..66add7bf9e92f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -87,8 +87,9 @@ public long valuesNativeAddress() { @Override public void reset() { super.reset(); - if (!isConstant && reservePolicy.shouldCleanData()) { - capacity = reservePolicy.defaultCapacity; + if (!isConstant && !hasDefaultValue && hugeVectorThreshold > 0 && + capacity > hugeVectorThreshold) { + capacity = defaultCapacity; Platform.freeMemory(nulls); Platform.freeMemory(data); Platform.freeMemory(lengthData); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 9663717ddfd65..ec60b5237ea4c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -83,8 +83,9 @@ public OnHeapColumnVector(int capacity, DataType type) { @Override public void reset() { super.reset(); - if (!isConstant && reservePolicy.shouldCleanData()) { - capacity = reservePolicy.defaultCapacity; + if (!isConstant && !hasDefaultValue && hugeVectorThreshold > 0 && + capacity > hugeVectorThreshold) { + capacity = defaultCapacity; nulls = null; byteData = null; shortData = null; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java deleted file mode 100644 index e7af7845c5866..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorReservePolicy.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.vectorized; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.spark.sql.internal.SQLConf; -import org.apache.spark.unsafe.array.ByteArrayMethods; - -public abstract class VectorReservePolicy { - - protected int defaultCapacity; - - /** - * Upper limit for the maximum capacity for this column. - */ - @VisibleForTesting - protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; - - /** - * True if this column has default values. Return the default values instead of NULL when the - * corresponding columns are not present in storage. We can not reset the data of column vectors - * that has default values. - */ - protected boolean hasDefaultValue = false; - - abstract int nextCapacity(int requiredCapacity); - - abstract boolean shouldCleanData(); -} - -class DefaultVectorReservePolicy extends VectorReservePolicy { - - private int hugeThreshold; - private double hugeReserveRatio; - private int currentCapacity; - - DefaultVectorReservePolicy(int defaultCapacity) { - this.defaultCapacity = defaultCapacity; - this.currentCapacity = defaultCapacity; - SQLConf conf = SQLConf.get(); - hugeThreshold = conf.vectorizedHugeVectorThreshold(); - hugeReserveRatio = conf.vectorizedHugeVectorReserveRatio(); - } - - @Override - public int nextCapacity(int requiredCapacity) { - if (hugeThreshold < 0 || requiredCapacity < hugeThreshold) { - currentCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L); - } else { - currentCapacity = (int) Math.min(MAX_CAPACITY, - (requiredCapacity - hugeThreshold) * hugeReserveRatio + hugeThreshold * 2L); - } - return currentCapacity; - } - - @Override - public boolean shouldCleanData() { - return hugeThreshold > 0 && !hasDefaultValue && currentCapacity > hugeThreshold; - } -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index fa558bf414b53..4b6189f3df139 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; import java.util.Optional; +import com.google.common.annotations.VisibleForTesting; + import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; import org.apache.spark.sql.catalyst.util.GenericArrayData; @@ -29,6 +31,7 @@ import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -92,7 +95,11 @@ public void reserve(int requiredCapacity) { if (requiredCapacity < 0) { throwUnsupportedException(requiredCapacity, null); } else if (requiredCapacity > capacity) { - int newCapacity = reservePolicy.nextCapacity(requiredCapacity); + int newCapacity = + hugeVectorThreshold < 0 || requiredCapacity < hugeVectorThreshold ? + (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L) : + (int) Math.min(MAX_CAPACITY, (requiredCapacity - hugeVectorThreshold) * + hugeVectorReserveRatio + hugeVectorThreshold * 2L); if (requiredCapacity <= newCapacity) { try { reserveInternal(newCapacity); @@ -851,7 +858,7 @@ public final void setHasDefaultValue() { c.setHasDefaultValue(); } } - reservePolicy.hasDefaultValue = true; + hasDefaultValue = true; } /** @@ -873,7 +880,27 @@ public final boolean isAllNull() { */ protected int capacity; - public VectorReservePolicy reservePolicy; + /** + * The default number of rows that can be stored in this column. + */ + protected int defaultCapacity; + + /** + * Upper limit for the maximum capacity for this column. + */ + @VisibleForTesting + protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; + + /** + * True if this column has default values. Return the default values instead of NULL when the + * corresponding columns are not present in storage. We can not reset the data of column vectors + * that has default values. + */ + protected boolean hasDefaultValue = false; + + protected int hugeVectorThreshold; + + protected double hugeVectorReserveRatio; /** * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks. @@ -924,7 +951,9 @@ protected boolean isArray() { protected WritableColumnVector(int capacity, DataType dataType) { super(dataType); this.capacity = capacity; - this.reservePolicy = new DefaultVectorReservePolicy(capacity); + this.defaultCapacity = capacity; + this.hugeVectorThreshold = SQLConf.get().vectorizedHugeVectorThreshold(); + this.hugeVectorReserveRatio = SQLConf.get().vectorizedHugeVectorReserveRatio(); if (isArray()) { DataType childType; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReserveSuite.scala similarity index 93% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReserveSuite.scala index 52fa85cb1c2f8..494e983340ae5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReservePolicySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReserveSuite.scala @@ -23,9 +23,9 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSessionBase import org.apache.spark.sql.types.ByteType -class VectorReservePolicySuite extends SparkFunSuite with SharedSparkSessionBase { +class VectorReserveSuite extends SparkFunSuite with SharedSparkSessionBase { - test("Test column vector reserve policy") { + test("SPARK-44239: Test column vector reserve policy") { withSQLConf( SQLConf.VECTORIZED_HUGE_VECTOR_THRESHOLD.key -> "300", SQLConf.VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key -> "1.2") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index d2a5161d26a80..b34af3c8c633c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -1565,7 +1565,7 @@ class ColumnarBatchSuite extends SparkFunSuite { test("exceeding maximum capacity should throw an error") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => val column = allocate(1, ByteType, memMode) - column.reservePolicy.MAX_CAPACITY = 15 + column.MAX_CAPACITY = 15 column.appendBytes(5, 0.toByte) // Successfully allocate twice the requested capacity assert(column.capacity == 10) From 3e560ebfb47df257bd7b7ccb0fe7f88a458d7be3 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 18 Aug 2023 14:23:24 +0800 Subject: [PATCH 13/21] update code --- .../vectorized/WritableColumnVector.java | 3 +- .../columnar/VectorReserveSuite.scala | 55 ------------------- .../vectorized/ColumnVectorSuite.scala | 32 ++++++++++- 3 files changed, 32 insertions(+), 58 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReserveSuite.scala diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 4b6189f3df139..383f1bdd1e095 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -98,8 +98,7 @@ public void reserve(int requiredCapacity) { int newCapacity = hugeVectorThreshold < 0 || requiredCapacity < hugeVectorThreshold ? (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L) : - (int) Math.min(MAX_CAPACITY, (requiredCapacity - hugeVectorThreshold) * - hugeVectorReserveRatio + hugeVectorThreshold * 2L); + (int) Math.min(MAX_CAPACITY, requiredCapacity * hugeVectorReserveRatio); if (requiredCapacity <= newCapacity) { try { reserveInternal(newCapacity); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReserveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReserveSuite.scala deleted file mode 100644 index 494e983340ae5..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/VectorReserveSuite.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.columnar - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSessionBase -import org.apache.spark.sql.types.ByteType - -class VectorReserveSuite extends SparkFunSuite with SharedSparkSessionBase { - - test("SPARK-44239: Test column vector reserve policy") { - withSQLConf( - SQLConf.VECTORIZED_HUGE_VECTOR_THRESHOLD.key -> "300", - SQLConf.VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key -> "1.2") { - val dataType = ByteType - - Array(new OnHeapColumnVector(80, dataType), - new OffHeapColumnVector(80, dataType)).foreach { vector => - try { - // The new capacity of small vector = request capacity * 2 and will not be reset - vector.appendBytes(100, 0) - assert(vector.getCapacity == 200) - vector.reset() - assert(vector.getCapacity == 200) - - // The new capacity of huge vector = (request capacity - HUGE_VECTOR_THRESHOLD) * 1.2 + - // HUGE_VECTOR_THRESHOLD * 2 = (300 - 300) * 1.2 + 300 * 2 and will be reset. - vector.appendBytes(300, 0) - assert(vector.getCapacity == 600) - vector.reset() - assert(vector.getCapacity == 80) - } finally { - vector.close() - } - } - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 5e06eb729ea3e..a489b05038f26 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.vectorized import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.execution.columnar.ColumnAccessor import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarArray import org.apache.spark.unsafe.types.UTF8String -class ColumnVectorSuite extends SparkFunSuite { +class ColumnVectorSuite extends SparkFunSuite with SQLHelper { private def withVector( vector: WritableColumnVector)( block: WritableColumnVector => Unit): Unit = { @@ -588,6 +590,34 @@ class ColumnVectorSuite extends SparkFunSuite { } } + test("SPARK-44239: Test column vector reserve policy") { + withSQLConf( + SQLConf.VECTORIZED_HUGE_VECTOR_THRESHOLD.key -> "300", + SQLConf.VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key -> "1.2") { + val dataType = ByteType + + Array(new OnHeapColumnVector(80, dataType), + new OffHeapColumnVector(80, dataType)).foreach { vector => + try { + // The new capacity of small vector = request capacity * 2 and will not be reset + vector.appendBytes(100, 0) + assert(vector.getCapacity == 200) + vector.reset() + assert(vector.getCapacity == 200) + + // The new capacity of huge vector = (request capacity - HUGE_VECTOR_THRESHOLD) * 1.2 + + // HUGE_VECTOR_THRESHOLD * 2 = 300 * 1.2 and will be reset. + vector.appendBytes(300, 0) + assert(vector.getCapacity == 360) + vector.reset() + assert(vector.getCapacity == 80) + } finally { + vector.close() + } + } + } + } + DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt => val structType = new StructType().add(dt.typeName, dt) testVectors("ColumnarRow " + dt.typeName, 10, structType) { v => From d17ccc6cf06fa7f944e3fd6d9c06fcd15d106d10 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 18 Aug 2023 14:42:13 +0800 Subject: [PATCH 14/21] Remove hasDefault field --- .../datasources/parquet/ParquetColumnVector.java | 2 -- .../vectorized/OffHeapColumnVector.java | 2 +- .../execution/vectorized/OnHeapColumnVector.java | 2 +- .../vectorized/WritableColumnVector.java | 16 ---------------- 4 files changed, 2 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java index 5f02050c36a8f..e8ae6f290bc4d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java @@ -84,8 +84,6 @@ final class ParquetColumnVector { if (defaultValue == null) { vector.setAllNull(); return; - } else { - vector.setHasDefaultValue(); } // For Parquet tables whose columns have associated DEFAULT values, this reader must return // those values instead of NULL when the corresponding columns are not present in storage. diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 66add7bf9e92f..49bf0815da812 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -87,7 +87,7 @@ public long valuesNativeAddress() { @Override public void reset() { super.reset(); - if (!isConstant && !hasDefaultValue && hugeVectorThreshold > 0 && + if (!isConstant && hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) { capacity = defaultCapacity; Platform.freeMemory(nulls); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index ec60b5237ea4c..2f98de2556b25 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -83,7 +83,7 @@ public OnHeapColumnVector(int capacity, DataType type) { @Override public void reset() { super.reset(); - if (!isConstant && !hasDefaultValue && hugeVectorThreshold > 0 && + if (!isConstant && hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) { capacity = defaultCapacity; nulls = null; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 383f1bdd1e095..6d91c08c428ef 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -851,15 +851,6 @@ public final void addElementsAppended(int num) { */ public final void setIsConstant() { isConstant = true; } - public final void setHasDefaultValue() { - if (childColumns != null) { - for (WritableColumnVector c : childColumns) { - c.setHasDefaultValue(); - } - } - hasDefaultValue = true; - } - /** * Marks this column only contains null values. */ @@ -890,13 +881,6 @@ public final boolean isAllNull() { @VisibleForTesting protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; - /** - * True if this column has default values. Return the default values instead of NULL when the - * corresponding columns are not present in storage. We can not reset the data of column vectors - * that has default values. - */ - protected boolean hasDefaultValue = false; - protected int hugeVectorThreshold; protected double hugeVectorReserveRatio; From 0f5f97788cb104e3d3c79a0da44246d0bb244105 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sat, 19 Aug 2023 11:32:33 +0800 Subject: [PATCH 15/21] Test hugeVectorThreshold=1 --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../vectorized/OffHeapColumnVector.java | 29 ++++++++--------- .../vectorized/OnHeapColumnVector.java | 32 ++++++++----------- .../vectorized/WritableColumnVector.java | 2 +- 4 files changed, 29 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 61cc493147625..6cc205175c09c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -504,7 +504,7 @@ object SQLConf { "optimization.") .version("3.5.0") .bytesConf(ByteUnit.BYTE) - .createWithDefault(-1) + .createWithDefault(1) val IN_MEMORY_PARTITION_PRUNING = buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning") diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 49bf0815da812..fa8468fd566c3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -84,20 +84,24 @@ public long valuesNativeAddress() { return data; } + private void releaseMemory() { + Platform.freeMemory(nulls); + Platform.freeMemory(data); + Platform.freeMemory(lengthData); + Platform.freeMemory(offsetData); + nulls = 0; + data = 0; + lengthData = 0; + offsetData = 0; + } + @Override public void reset() { super.reset(); if (!isConstant && hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) { capacity = defaultCapacity; - Platform.freeMemory(nulls); - Platform.freeMemory(data); - Platform.freeMemory(lengthData); - Platform.freeMemory(offsetData); - nulls = 0; - data = 0; - lengthData = 0; - offsetData = 0; + releaseMemory(); reserveInternal(capacity); } } @@ -105,14 +109,7 @@ public void reset() { @Override public void close() { super.close(); - Platform.freeMemory(nulls); - Platform.freeMemory(data); - Platform.freeMemory(lengthData); - Platform.freeMemory(offsetData); - nulls = 0; - data = 0; - lengthData = 0; - offsetData = 0; + releaseMemory(); } // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 2f98de2556b25..12fbf6271aaca 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -80,21 +80,25 @@ public OnHeapColumnVector(int capacity, DataType type) { reset(); } + private void releaseMemory() { + nulls = null; + byteData = null; + shortData = null; + intData = null; + longData = null; + floatData = null; + doubleData = null; + arrayLengths = null; + arrayOffsets = null; + } + @Override public void reset() { super.reset(); if (!isConstant && hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) { capacity = defaultCapacity; - nulls = null; - byteData = null; - shortData = null; - intData = null; - longData = null; - floatData = null; - doubleData = null; - arrayLengths = null; - arrayOffsets = null; + releaseMemory(); reserveInternal(capacity); } } @@ -102,15 +106,7 @@ public void reset() { @Override public void close() { super.close(); - nulls = null; - byteData = null; - shortData = null; - intData = null; - longData = null; - floatData = null; - doubleData = null; - arrayLengths = null; - arrayOffsets = null; + releaseMemory(); } // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 6d91c08c428ef..7b9ead8d613d8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -873,7 +873,7 @@ public final boolean isAllNull() { /** * The default number of rows that can be stored in this column. */ - protected int defaultCapacity; + protected final int defaultCapacity; /** * Upper limit for the maximum capacity for this column. From bfef94e4a00c5abb3d264d02f8c55d0f47980515 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sun, 20 Aug 2023 21:45:44 +0800 Subject: [PATCH 16/21] The children column vectors should also be constant if the parent column vector is constant --- .../sql/execution/vectorized/WritableColumnVector.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 7b9ead8d613d8..ea6faad2f78ed 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -849,7 +849,14 @@ public final void addElementsAppended(int num) { /** * Marks this column as being constant. */ - public final void setIsConstant() { isConstant = true; } + public final void setIsConstant() { + if (childColumns != null) { + for (WritableColumnVector c : childColumns) { + c.setIsConstant(); + } + } + isConstant = true; + } /** * Marks this column only contains null values. From 88822b375eea672838b98598ba993b2e14ef64f1 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Mon, 21 Aug 2023 09:59:17 +0800 Subject: [PATCH 17/21] Update config description --- .../org/apache/spark/sql/internal/SQLConf.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6cc205175c09c..c293aeadeedd9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -489,22 +489,23 @@ object SQLConf { val VECTORIZED_HUGE_VECTOR_RESERVE_RATIO = buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio") - .doc("spark will reserve requiredCapacity * this ratio memory next time. This is only " + - "effective when spark.sql.inMemoryColumnarStorage.hugeVectorThreshold > 0 and required " + - "memory larger than that threshold.") + .doc("When spark.sql.inMemoryColumnarStorage.hugeVectorThreshold <= 0 or the required " + + "memory is smaller than spark.sql.inMemoryColumnarStorage.hugeVectorThreshold, spark " + + "reserves required memory * 2 memory; otherwise, spark reserves " + + "required memory * this ratio memory, and will release this column vector memory before " + + "reading the next batch rows.") .version("3.5.0") .doubleConf .createWithDefault(1.2) val VECTORIZED_HUGE_VECTOR_THRESHOLD = buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorThreshold") - .doc("When the in memory column vector is larger than this, spark will reserve " + - s"requiredCapacity * ${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and " + - "free this column vector before reading next batch data. -1 means disabling the " + - "optimization.") + .doc("When the required memory is larger than this, spark reserves required memory * " + + s"${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and release this column " + + s"vector memory before reading the next batch rows. -1 means disabling the optimization.") .version("3.5.0") .bytesConf(ByteUnit.BYTE) - .createWithDefault(1) + .createWithDefault(-1) val IN_MEMORY_PARTITION_PRUNING = buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning") From ccb02c0925deb972c00c2ca79bdf36ac6bbdffc5 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Mon, 21 Aug 2023 11:53:11 +0800 Subject: [PATCH 18/21] Move reset and close method into the parent class --- .../vectorized/OffHeapColumnVector.java | 19 +------------------ .../vectorized/OnHeapColumnVector.java | 19 +------------------ .../vectorized/WritableColumnVector.java | 9 +++++++++ 3 files changed, 11 insertions(+), 36 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index fa8468fd566c3..28ad425afa910 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -84,7 +84,7 @@ public long valuesNativeAddress() { return data; } - private void releaseMemory() { + protected void releaseMemory() { Platform.freeMemory(nulls); Platform.freeMemory(data); Platform.freeMemory(lengthData); @@ -95,23 +95,6 @@ private void releaseMemory() { offsetData = 0; } - @Override - public void reset() { - super.reset(); - if (!isConstant && hugeVectorThreshold > 0 && - capacity > hugeVectorThreshold) { - capacity = defaultCapacity; - releaseMemory(); - reserveInternal(capacity); - } - } - - @Override - public void close() { - super.close(); - releaseMemory(); - } - // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 12fbf6271aaca..6f67ca9ab9e0c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -80,7 +80,7 @@ public OnHeapColumnVector(int capacity, DataType type) { reset(); } - private void releaseMemory() { + protected void releaseMemory() { nulls = null; byteData = null; shortData = null; @@ -92,23 +92,6 @@ private void releaseMemory() { arrayOffsets = null; } - @Override - public void reset() { - super.reset(); - if (!isConstant && hugeVectorThreshold > 0 && - capacity > hugeVectorThreshold) { - capacity = defaultCapacity; - releaseMemory(); - reserveInternal(capacity); - } - } - - @Override - public void close() { - super.close(); - releaseMemory(); - } - // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index ea6faad2f78ed..10577496a351b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -53,6 +53,8 @@ public abstract class WritableColumnVector extends ColumnVector { private final byte[] byte8 = new byte[8]; + protected abstract void releaseMemory(); + /** * Resets this column for writing. The currently stored values are no longer accessible. */ @@ -69,6 +71,12 @@ public void reset() { putNotNulls(0, capacity); numNulls = 0; } + + if (hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) { + capacity = defaultCapacity; + releaseMemory(); + reserveInternal(capacity); + } } @Override @@ -85,6 +93,7 @@ public void close() { dictionaryIds = null; } dictionary = null; + releaseMemory(); } public void reserveAdditional(int additionalCapacity) { From c5100243c282cdd24d471cff28f564522239f1f6 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Mon, 21 Aug 2023 14:45:08 +0800 Subject: [PATCH 19/21] Remove int getCapacity() method --- .../sql/execution/vectorized/WritableColumnVector.java | 4 ---- .../sql/execution/vectorized/ColumnVectorSuite.scala | 8 ++++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 10577496a351b..ac8da471f0033 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -986,8 +986,4 @@ protected WritableColumnVector(int capacity, DataType dataType) { this.childColumns = null; } } - - public int getCapacity() { - return capacity; - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index a489b05038f26..b2b2729e90e16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -601,16 +601,16 @@ class ColumnVectorSuite extends SparkFunSuite with SQLHelper { try { // The new capacity of small vector = request capacity * 2 and will not be reset vector.appendBytes(100, 0) - assert(vector.getCapacity == 200) + assert(vector.capacity == 200) vector.reset() - assert(vector.getCapacity == 200) + assert(vector.capacity == 200) // The new capacity of huge vector = (request capacity - HUGE_VECTOR_THRESHOLD) * 1.2 + // HUGE_VECTOR_THRESHOLD * 2 = 300 * 1.2 and will be reset. vector.appendBytes(300, 0) - assert(vector.getCapacity == 360) + assert(vector.capacity == 360) vector.reset() - assert(vector.getCapacity == 80) + assert(vector.capacity == 80) } finally { vector.close() } From 27078034b267764b67a05a1bca379c1a87e2dc65 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Mon, 21 Aug 2023 23:09:57 +0800 Subject: [PATCH 20/21] Fix Mina error --- .../spark/sql/execution/vectorized/OffHeapColumnVector.java | 5 +++++ .../spark/sql/execution/vectorized/OnHeapColumnVector.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 28ad425afa910..bc2636caefd08 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -95,6 +95,11 @@ protected void releaseMemory() { offsetData = 0; } + @Override + public void close() { + super.close(); + } + // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 6f67ca9ab9e0c..56a96907f0f08 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -92,6 +92,11 @@ protected void releaseMemory() { arrayOffsets = null; } + @Override + public void close() { + super.close(); + } + // // APIs dealing with nulls // From d429aa0a01cf0877d6959032e417d55710e3ccfe Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 25 Aug 2023 17:19:29 +0800 Subject: [PATCH 21/21] Update config version --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c293aeadeedd9..0c5077a362c6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -494,7 +494,7 @@ object SQLConf { "reserves required memory * 2 memory; otherwise, spark reserves " + "required memory * this ratio memory, and will release this column vector memory before " + "reading the next batch rows.") - .version("3.5.0") + .version("4.0.0") .doubleConf .createWithDefault(1.2) @@ -503,7 +503,7 @@ object SQLConf { .doc("When the required memory is larger than this, spark reserves required memory * " + s"${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and release this column " + s"vector memory before reading the next batch rows. -1 means disabling the optimization.") - .version("3.5.0") + .version("4.0.0") .bytesConf(ByteUnit.BYTE) .createWithDefault(-1)