diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index beaf61f071232..8c8b33921e321 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -487,6 +487,26 @@ object SQLConf { .intConf .createWithDefault(10000) + val VECTORIZED_HUGE_VECTOR_RESERVE_RATIO = + buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio") + .doc("When spark.sql.inMemoryColumnarStorage.hugeVectorThreshold <= 0 or the required " + + "memory is smaller than spark.sql.inMemoryColumnarStorage.hugeVectorThreshold, spark " + + "reserves required memory * 2 memory; otherwise, spark reserves " + + "required memory * this ratio memory, and will release this column vector memory before " + + "reading the next batch rows.") + .version("4.0.0") + .doubleConf + .createWithDefault(1.2) + + val VECTORIZED_HUGE_VECTOR_THRESHOLD = + buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorThreshold") + .doc("When the required memory is larger than this, spark reserves required memory * " + + s"${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and release this column " + + s"vector memory before reading the next batch rows. -1 means disabling the optimization.") + .version("4.0.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(-1) + val IN_MEMORY_PARTITION_PRUNING = buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning") .internal() @@ -4662,6 +4682,10 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) + def vectorizedHugeVectorThreshold: Int = getConf(VECTORIZED_HUGE_VECTOR_THRESHOLD).toInt + + def vectorizedHugeVectorReserveRatio: Double = getConf(VECTORIZED_HUGE_VECTOR_RESERVE_RATIO) + def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 5674a091f6d6c..bc2636caefd08 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -84,9 +84,7 @@ public long valuesNativeAddress() { return data; } - @Override - public void close() { - super.close(); + protected void releaseMemory() { Platform.freeMemory(nulls); Platform.freeMemory(data); Platform.freeMemory(lengthData); @@ -97,6 +95,11 @@ public void close() { offsetData = 0; } + @Override + public void close() { + super.close(); + } + // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 6e4a9c643e89c..56a96907f0f08 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -80,9 +80,7 @@ public OnHeapColumnVector(int capacity, DataType type) { reset(); } - @Override - public void close() { - super.close(); + protected void releaseMemory() { nulls = null; byteData = null; shortData = null; @@ -94,6 +92,11 @@ public void close() { arrayOffsets = null; } + @Override + public void close() { + super.close(); + } + // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index a8e4aad60c222..ac8da471f0033 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -53,6 +53,8 @@ public abstract class WritableColumnVector extends ColumnVector { private final byte[] byte8 = new byte[8]; + protected abstract void releaseMemory(); + /** * Resets this column for writing. The currently stored values are no longer accessible. */ @@ -69,6 +71,12 @@ public void reset() { putNotNulls(0, capacity); numNulls = 0; } + + if (hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) { + capacity = defaultCapacity; + releaseMemory(); + reserveInternal(capacity); + } } @Override @@ -85,6 +93,7 @@ public void close() { dictionaryIds = null; } dictionary = null; + releaseMemory(); } public void reserveAdditional(int additionalCapacity) { @@ -95,7 +104,10 @@ public void reserve(int requiredCapacity) { if (requiredCapacity < 0) { throwUnsupportedException(requiredCapacity, null); } else if (requiredCapacity > capacity) { - int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L); + int newCapacity = + hugeVectorThreshold < 0 || requiredCapacity < hugeVectorThreshold ? + (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L) : + (int) Math.min(MAX_CAPACITY, requiredCapacity * hugeVectorReserveRatio); if (requiredCapacity <= newCapacity) { try { reserveInternal(newCapacity); @@ -846,7 +858,14 @@ public final void addElementsAppended(int num) { /** * Marks this column as being constant. */ - public final void setIsConstant() { isConstant = true; } + public final void setIsConstant() { + if (childColumns != null) { + for (WritableColumnVector c : childColumns) { + c.setIsConstant(); + } + } + isConstant = true; + } /** * Marks this column only contains null values. @@ -867,12 +886,21 @@ public final boolean isAllNull() { */ protected int capacity; + /** + * The default number of rows that can be stored in this column. + */ + protected final int defaultCapacity; + /** * Upper limit for the maximum capacity for this column. */ @VisibleForTesting protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; + protected int hugeVectorThreshold; + + protected double hugeVectorReserveRatio; + /** * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks. */ @@ -922,6 +950,9 @@ protected boolean isArray() { protected WritableColumnVector(int capacity, DataType dataType) { super(dataType); this.capacity = capacity; + this.defaultCapacity = capacity; + this.hugeVectorThreshold = SQLConf.get().vectorizedHugeVectorThreshold(); + this.hugeVectorReserveRatio = SQLConf.get().vectorizedHugeVectorReserveRatio(); if (isArray()) { DataType childType; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 5e06eb729ea3e..b2b2729e90e16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.vectorized import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.execution.columnar.ColumnAccessor import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarArray import org.apache.spark.unsafe.types.UTF8String -class ColumnVectorSuite extends SparkFunSuite { +class ColumnVectorSuite extends SparkFunSuite with SQLHelper { private def withVector( vector: WritableColumnVector)( block: WritableColumnVector => Unit): Unit = { @@ -588,6 +590,34 @@ class ColumnVectorSuite extends SparkFunSuite { } } + test("SPARK-44239: Test column vector reserve policy") { + withSQLConf( + SQLConf.VECTORIZED_HUGE_VECTOR_THRESHOLD.key -> "300", + SQLConf.VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key -> "1.2") { + val dataType = ByteType + + Array(new OnHeapColumnVector(80, dataType), + new OffHeapColumnVector(80, dataType)).foreach { vector => + try { + // The new capacity of small vector = request capacity * 2 and will not be reset + vector.appendBytes(100, 0) + assert(vector.capacity == 200) + vector.reset() + assert(vector.capacity == 200) + + // The new capacity of huge vector = (request capacity - HUGE_VECTOR_THRESHOLD) * 1.2 + + // HUGE_VECTOR_THRESHOLD * 2 = 300 * 1.2 and will be reset. + vector.appendBytes(300, 0) + assert(vector.capacity == 360) + vector.reset() + assert(vector.capacity == 80) + } finally { + vector.close() + } + } + } + } + DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt => val structType = new StructType().add(dt.typeName, dt) testVectors("ColumnarRow " + dt.typeName, 10, structType) { v =>