Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,26 @@ object SQLConf {
.intConf
.createWithDefault(10000)

val VECTORIZED_HUGE_VECTOR_RESERVE_RATIO =
buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio")
.doc("When spark.sql.inMemoryColumnarStorage.hugeVectorThreshold <= 0 or the required " +
"memory is smaller than spark.sql.inMemoryColumnarStorage.hugeVectorThreshold, spark " +
"reserves required memory * 2 memory; otherwise, spark reserves " +
"required memory * this ratio memory, and will release this column vector memory before " +
"reading the next batch rows.")
.version("4.0.0")
.doubleConf
.createWithDefault(1.2)

val VECTORIZED_HUGE_VECTOR_THRESHOLD =
buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorThreshold")
.doc("When the required memory is larger than this, spark reserves required memory * " +
s"${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and release this column " +
s"vector memory before reading the next batch rows. -1 means disabling the optimization.")
.version("4.0.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(-1)
Copy link
Contributor

@cloud-fan cloud-fan Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we set this as 1 and see if there is any test failures? If not we can change it back to -1 and merge it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
When VECTORIZED_HUGE_VECTOR_THRESHOLD = 1, there are two UT failures, as expected.


val IN_MEMORY_PARTITION_PRUNING =
buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning")
.internal()
Expand Down Expand Up @@ -4662,6 +4682,10 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)

def vectorizedHugeVectorThreshold: Int = getConf(VECTORIZED_HUGE_VECTOR_THRESHOLD).toInt

def vectorizedHugeVectorReserveRatio: Double = getConf(VECTORIZED_HUGE_VECTOR_RESERVE_RATIO)

def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)

def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ public long valuesNativeAddress() {
return data;
}

@Override
public void close() {
super.close();
protected void releaseMemory() {
Platform.freeMemory(nulls);
Platform.freeMemory(data);
Platform.freeMemory(lengthData);
Expand All @@ -97,6 +95,11 @@ public void close() {
offsetData = 0;
}

@Override
public void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a bug in MIMA... anyway, it's fine to have this workaround for MIMA

super.close();
}

//
// APIs dealing with nulls
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ public OnHeapColumnVector(int capacity, DataType type) {
reset();
}

@Override
public void close() {
super.close();
protected void releaseMemory() {
nulls = null;
byteData = null;
shortData = null;
Expand All @@ -94,6 +92,11 @@ public void close() {
arrayOffsets = null;
}

@Override
public void close() {
super.close();
}

//
// APIs dealing with nulls
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
public abstract class WritableColumnVector extends ColumnVector {
private final byte[] byte8 = new byte[8];

protected abstract void releaseMemory();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan do we treat WritableColumnVector as public api? if so, we should give it a default implementation instead of abstract method, otherwise, third-party subclass without implementing this method will be failed with

java.lang.AbstractMethodError: org.apache.spark.sql.execution.vectorized.WritableColumnVector.releaseMemory()V
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.close(WritableColumnVector.java:92)
	at io.glutenproject.vectorized.ArrowWritableColumnVector.close(ArrowWritableColumnVector.java:362)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a public API, I think third-party lib should update and re-compile the code when upgrading Spark versions if private APIs were used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks for the information.


/**
* Resets this column for writing. The currently stored values are no longer accessible.
*/
Expand All @@ -69,6 +71,12 @@ public void reset() {
putNotNulls(0, capacity);
numNulls = 0;
}

if (hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be hugeVectorThreshold > -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if hugeVectorThreshold == 0 or hugeVectorThreshold is a small value, the ColumnVector will always releaseMemory() and reserve new memory, this may be slower than before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, but according to the doc and impl, this should be > -1, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the doc and the code doesn't matched, Sorry.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you send a followup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the later reply, filed a followup PR: https://github.com/apache/spark/pull/47988/files

capacity = defaultCapacity;
releaseMemory();
reserveInternal(capacity);
}
}

@Override
Expand All @@ -85,6 +93,7 @@ public void close() {
dictionaryIds = null;
}
dictionary = null;
releaseMemory();
}

public void reserveAdditional(int additionalCapacity) {
Expand All @@ -95,7 +104,10 @@ public void reserve(int requiredCapacity) {
if (requiredCapacity < 0) {
throwUnsupportedException(requiredCapacity, null);
} else if (requiredCapacity > capacity) {
int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
int newCapacity =
hugeVectorThreshold < 0 || requiredCapacity < hugeVectorThreshold ?
(int) Math.min(MAX_CAPACITY, requiredCapacity * 2L) :
(int) Math.min(MAX_CAPACITY, requiredCapacity * hugeVectorReserveRatio);
if (requiredCapacity <= newCapacity) {
try {
reserveInternal(newCapacity);
Expand Down Expand Up @@ -846,7 +858,14 @@ public final void addElementsAppended(int num) {
/**
* Marks this column as being constant.
*/
public final void setIsConstant() { isConstant = true; }
public final void setIsConstant() {
if (childColumns != null) {
for (WritableColumnVector c : childColumns) {
c.setIsConstant();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change make sense. Do you know why there was no problem before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, alter table t add column s array<int> default array(1, 2) , spark will create a vector for column s and , and a vector for the items of this column.
Before this PR, both those two vectors will not be reset.
After this PR, the second vector will be reset without this change.

image

}
}
isConstant = true;
}

/**
* Marks this column only contains null values.
Expand All @@ -867,12 +886,21 @@ public final boolean isAllNull() {
*/
protected int capacity;

/**
* The default number of rows that can be stored in this column.
*/
protected final int defaultCapacity;

/**
* Upper limit for the maximum capacity for this column.
*/
@VisibleForTesting
protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;

protected int hugeVectorThreshold;

protected double hugeVectorReserveRatio;

/**
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
*/
Expand Down Expand Up @@ -922,6 +950,9 @@ protected boolean isArray() {
protected WritableColumnVector(int capacity, DataType dataType) {
super(dataType);
this.capacity = capacity;
this.defaultCapacity = capacity;
this.hugeVectorThreshold = SQLConf.get().vectorizedHugeVectorThreshold();
this.hugeVectorReserveRatio = SQLConf.get().vectorizedHugeVectorReserveRatio();

if (isArray()) {
DataType childType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.vectorized

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.execution.columnar.ColumnAccessor
import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarArray
import org.apache.spark.unsafe.types.UTF8String

class ColumnVectorSuite extends SparkFunSuite {
class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
private def withVector(
vector: WritableColumnVector)(
block: WritableColumnVector => Unit): Unit = {
Expand Down Expand Up @@ -588,6 +590,34 @@ class ColumnVectorSuite extends SparkFunSuite {
}
}

test("SPARK-44239: Test column vector reserve policy") {
withSQLConf(
SQLConf.VECTORIZED_HUGE_VECTOR_THRESHOLD.key -> "300",
SQLConf.VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key -> "1.2") {
val dataType = ByteType

Array(new OnHeapColumnVector(80, dataType),
new OffHeapColumnVector(80, dataType)).foreach { vector =>
try {
// The new capacity of small vector = request capacity * 2 and will not be reset
vector.appendBytes(100, 0)
assert(vector.capacity == 200)
vector.reset()
assert(vector.capacity == 200)

// The new capacity of huge vector = (request capacity - HUGE_VECTOR_THRESHOLD) * 1.2 +
// HUGE_VECTOR_THRESHOLD * 2 = 300 * 1.2 and will be reset.
vector.appendBytes(300, 0)
assert(vector.capacity == 360)
vector.reset()
assert(vector.capacity == 80)
} finally {
vector.close()
}
}
}
}

DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt =>
val structType = new StructType().add(dt.typeName, dt)
testVectors("ColumnarRow " + dt.typeName, 10, structType) { v =>
Expand Down