Skip to content

Conversation

@wankunde
Copy link
Contributor

What changes were proposed in this pull request?

Add a memory reserve policy for WritableColumnVector:

  • If the vector capacity < VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve requested capacity * 2 memory.
  • If the vector capacity >= VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve requested capacity * VECTORIZED_HUGE_VECTOR_RESERVE_RATIO memory.
  • Free the WritableColumnVector memory if the vector capacity >= VECTORIZED_HUGE_VECTOR_THRESHOLD

which will reuse the allocated array object for small column vectors and free the memory for huge column vectors.

Why are the changes needed?

When spark reads a data file into a WritableColumnVector, the memory allocated by the WritableColumnVectors is not freed until the VectorizedColumnReader completes.

It will save memory allocation time by reusing the allocated array objects. But it also takes up too many unused memory after the current large vector batch has been read.

Add a memory reserve policy for this scenario which will reuse the allocated array object for small column vectors and free the memory for huge column vectors.

image
image

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added UT

@github-actions github-actions bot added the SQL label Jun 29, 2023
@wankunde wankunde changed the title [SPARK-44239][SQL] Free memory allocated by huge column vector [SPARK-44239][SQL] Free memory allocated by large vectors when vectors are reset Jun 29, 2023
@wankunde wankunde changed the title [SPARK-44239][SQL] Free memory allocated by large vectors when vectors are reset [WIP][SPARK-44239][SQL] Free memory allocated by large vectors when vectors are reset Jun 30, 2023
@wankunde wankunde changed the title [WIP][SPARK-44239][SQL] Free memory allocated by large vectors when vectors are reset [SPARK-44239][SQL] Free memory allocated by large vectors when vectors are reset Jul 3, 2023
@wankunde
Copy link
Contributor Author

wankunde commented Jul 5, 2023

Hi, @cloud-fan @sunchao @viirya Could you help to review this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it worth mention here this ratio is only effective when VECTORIZED_HUGE_VECTOR_THRESHOLD is enabled and required mem larger that threshold

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make this abstract as well? also for defaultCapacity. Does seem to be necessary in the base abstract class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may lead to conter-intuitive behavior for example If the threshold is 1000, then require 999 might look more memory-hungry than 1001. How about we make it

(requiredCapacity - hugeThreshold) * hugeReserveRatio + hugeThreshold * 2L

to be more consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @liuzqt for your review.

For (requiredCapacity - hugeThreshold) * hugeReserveRatio + hugeThreshold * 2L is equal to requiredCapacity * hugeReserveRatio + hugeThreshold * (2L - hugeReserveRatio). If hugeThreshold= 100M, and the requiredCapacity = 1000, before this change, spark will only allocate 2000 bytes while after this change, spark will allocate 1000 * 1.2 + 100M * 0.8 ~= 80M bytes, will small column vectors take up too much memory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry for the confusion, the above formula only apply for the else case (i.e, requiredCapacity >= hugeThreshold), for requiredCapacity < hugeThreshold it's still requiredCapacity * 2L as in your code.

if (hugeThreshold < 0 || requiredCapacity < hugeThreshold) {
    currentCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
} else {
    // we only change this
    currentCapacity = (int) Math.min(MAX_CAPACITY, (requiredCapacity - hugeThreshold) * hugeReserveRatio + hugeThreshold * 2L);
}

the idea is that for requiredCapacity >= hugeThreshold, the exceeding part we multiply by hugeReserveRatio

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your idea, I have updated the formula.

@liuzqt
Copy link
Contributor

liuzqt commented Aug 2, 2023

LGTM, @cloud-fan mind taking another look?

@wankunde
Copy link
Contributor Author

Hi, @liuzqt @cloud-fan any thoughts about this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use .bytesConf(ByteUnit.BYTE), so that people can set it to 1g which is more convenient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it an existing bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR, there is no existing bug, after this PR, for Parquet tables whose columns have associated DEFAULT values, the result may be incorrect.
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java#L84C15-L100

So I add a flag protected boolean hasDefaultValue = false; to indicate if the current column vector has default value and not to clean the data if it's true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit over-designed as we only have one policy, can we inline the logic in WritableColumnVector? And I feel it's not well-designed to track the previously allocated memory size in a policy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll inline the policy logic.

And do you think we should track the previously allocated memory size?

Copy link
Contributor

@cloud-fan cloud-fan Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we reset vectors with default value? can't we reset the default value as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value was set into the columnVector before reading data.

The default value is in ParquetColumnVector and it's internal vector final WritableColumnVector vector does not know the default value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not match the config doc. It should be requiredCapacity * hugeVectorReserveRatio

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced about it. We can reset it and set the default value again, can't we?

Copy link
Contributor Author

@wankunde wankunde Aug 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, it seems to be another issue.

If a column vector has default values, we will always set isConstant to true. So we don't need the hasDefaultValue field. But we should also set the isConstant to true for all its children vectors.
Change code : 128c8ff

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just put it in ColumnVectorSuite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a new method releaseMemory to share the code between reset and close?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's mark these variables as final if the value won't change

Copy link
Contributor

@cloud-fan cloud-fan Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we set this as 1 and see if there is any test failures? If not we can change it back to -1 and merge it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
When VECTORIZED_HUGE_VECTOR_THRESHOLD = 1, there are two UT failures, as expected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.doc("spark will reserve requiredCapacity * this ratio memory next time. This is only " +
.doc("Spark reserves requiredCapacity * this ratio memory next time. This is only " +

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid using the name of variable such as requiredCapacity? Also, it's a bit difficult to understand by reading the description here. I think you should explain this more explicitly including the behaviour of reserving * 2 by default.

@wankunde wankunde force-pushed the vector branch 2 times, most recently from cc1c1b2 to 5012869 Compare August 21, 2023 02:07
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if we can move the declartion of this function to the parent class, then reset and close can both be implementd in the parent class. The child classes only need to implement releaseMemory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change make sense. Do you know why there was no problem before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, alter table t add column s array<int> default array(1, 2) , spark will create a vector for column s and , and a vector for the items of this column.
Before this PR, both those two vectors will not be reset.
After this PR, the second vector will be reset without this change.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this function.

@wankunde
Copy link
Contributor Author

[error] spark-sql: Failed binary compatibility check against org.apache.spark:spark-sql_2.12:3.4.0! Found 2 potential problems (filtered 417)
[error]  * abstract method close()Unit in class org.apache.spark.sql.vectorized.ColumnVector does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.close")
[error]  * abstract method close()Unit in class org.apache.spark.sql.vectorized.ColumnVector does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.close")

}

@Override
public void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a bug in MIMA... anyway, it's fine to have this workaround for MIMA

"reserves required memory * 2 memory; otherwise, spark reserves " +
"required memory * this ratio memory, and will release this column vector memory before " +
"reading the next batch rows.")
.version("3.5.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one last comment: 3.5.0 is already at RC2 and it's too late to merge this feature to 3.5. Can we update it to 4.0.0?

.doc("When the required memory is larger than this, spark reserves required memory * " +
s"${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and release this column " +
s"vector memory before reading the next batch rows. -1 means disabling the optimization.")
.version("3.5.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@wankunde wankunde requested a review from cloud-fan August 30, 2023 09:27
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 27f9ac2 Aug 30, 2023
@wankunde wankunde deleted the vector branch January 3, 2024 06:57
numNulls = 0;
}

if (hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be hugeVectorThreshold > -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if hugeVectorThreshold == 0 or hugeVectorThreshold is a small value, the ColumnVector will always releaseMemory() and reserve new memory, this may be slower than before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, but according to the doc and impl, this should be > -1, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the doc and the code doesn't matched, Sorry.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you send a followup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the later reply, filed a followup PR: https://github.com/apache/spark/pull/47988/files

public abstract class WritableColumnVector extends ColumnVector {
private final byte[] byte8 = new byte[8];

protected abstract void releaseMemory();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan do we treat WritableColumnVector as public api? if so, we should give it a default implementation instead of abstract method, otherwise, third-party subclass without implementing this method will be failed with

java.lang.AbstractMethodError: org.apache.spark.sql.execution.vectorized.WritableColumnVector.releaseMemory()V
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.close(WritableColumnVector.java:92)
	at io.glutenproject.vectorized.ArrowWritableColumnVector.close(ArrowWritableColumnVector.java:362)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a public API, I think third-party lib should update and re-compile the code when upgrading Spark versions if private APIs were used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks for the information.

turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…s are reset (apache#237)

Add a memory reserve policy for WritableColumnVector:
* If the vector capacity < VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve requested capacity * 2 memory.
* If the vector capacity >= VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve requested capacity * VECTORIZED_HUGE_VECTOR_RESERVE_RATIO memory.
* Free the WritableColumnVector memory if the vector capacity >= VECTORIZED_HUGE_VECTOR_THRESHOLD

which will reuse the allocated array object for small column vectors and free the memory for huge column vectors.

When spark reads a data file into a WritableColumnVector, the memory allocated by the WritableColumnVectors is not freed until the VectorizedColumnReader completes.

It will save memory allocation time by reusing the allocated array objects. But it also takes up too many unused memory after the current large vector batch has been read.

Add a memory reserve policy for this scenario which will reuse the allocated array object for small column vectors and free the memory for huge column vectors.

![image](https://github.com/apache/spark/assets/3626747/a7a487bd-f184-4b24-bea0-75e530702887)
![image](https://github.com/apache/spark/assets/3626747/01d0268f-68e7-416f-b9b3-6c9d60919596)

No

Added UT

Closes apache#41782 from wankunde/vector.

Lead-authored-by: Kun Wan <[email protected]>

Signed-off-by: Wenchen Fan <[email protected]>
Co-authored-by: Kun Wan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants