Skip to content

Commit af6499f

Browse files
chenhao-dbcloud-fan
andcommitted
[SPARK-52181] Increase variant size limit to 128MiB
### What changes were proposed in this pull request? Increase variant size limit from 16MiB to 128MiB. It is difficult to control the size limit with a flag, because the limit is accessed in many places where the SQL config is not available (e.g., `VariantVal.toString`). Future memory instability is possible, but this change won't break any existing workload. ### Why are the changes needed? It enhances the ability of the variant data type to process larger data. ### Does this PR introduce _any_ user-facing change? Yes, as stated above. ### How was this patch tested? Unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50913 from chenhao-db/variant_large_size_limit. Lead-authored-by: Chenhao Li <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent c8327c5 commit af6499f

File tree

6 files changed

+18
-17
lines changed

6 files changed

+18
-17
lines changed

common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -542,12 +542,13 @@ private void buildJson(JsonParser parser) throws IOException {
542542
}
543543

544544
// Choose the smallest unsigned integer type that can store `value`. It must be within
545-
// `[0, U24_MAX]`.
545+
// `[0, SIZE_LIMIT]`.
546546
private int getIntegerSize(int value) {
547-
assert value >= 0 && value <= U24_MAX;
547+
assert value >= 0 && value <= SIZE_LIMIT;
548548
if (value <= U8_MAX) return 1;
549549
if (value <= U16_MAX) return 2;
550-
return U24_SIZE;
550+
if (value <= U24_MAX) return 3;
551+
return 4;
551552
}
552553

553554
private void parseFloatingPoint(JsonParser parser) throws IOException {

common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ public class VariantUtil {
137137
public static final int U24_SIZE = 3;
138138
public static final int U32_SIZE = 4;
139139

140-
// Both variant value and variant metadata need to be no longer than 16MiB.
141-
public static final int SIZE_LIMIT = U24_MAX + 1;
140+
// Both variant value and variant metadata need to be no longer than 128MiB.
141+
public static final int SIZE_LIMIT = 128 * 1024 * 1024;
142142

143143
public static final int MAX_DECIMAL4_PRECISION = 9;
144144
public static final int MAX_DECIMAL8_PRECISION = 18;

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,9 @@ class VariantExpressionEvalUtilsSuite extends SparkFunSuite {
133133
checkException(json, "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION",
134134
Map("badRecord" -> json, "failFastMode" -> "FAILFAST"))
135135
}
136-
for (json <- Seq("\"" + "a" * (16 * 1024 * 1024) + "\"",
137-
(0 to 4 * 1024 * 1024).mkString("[", ",", "]"))) {
136+
for (json <- Seq((0 to 32 * 1024 * 1024).mkString("[", ",", "]"))) {
138137
checkException(json, "VARIANT_SIZE_LIMIT",
139-
Map("sizeLimit" -> "16.0 MiB", "functionName" -> "`parse_json`"))
138+
Map("sizeLimit" -> "128.0 MiB", "functionName" -> "`parse_json`"))
140139
}
141140
}
142141

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,21 @@ class VariantExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
9393
check(Array(primitiveHeader(INT1), 0), Array[Byte](3, 0, 0))
9494
check(Array(primitiveHeader(INT1), 0), Array[Byte](2, 0, 0))
9595

96-
// Construct binary values that are over 1 << 24 bytes, but otherwise valid.
96+
// Construct binary values that are over SIZE_LIMIT bytes, but otherwise valid.
9797
val bigVersion = Array[Byte]((VERSION | (3 << 6)).toByte)
98-
val a = Array.fill(1 << 24)('a'.toByte)
98+
val a = Array.fill(SIZE_LIMIT)('a'.toByte)
9999
val hugeMetadata = bigVersion ++ Array[Byte](2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1) ++
100100
a ++ Array[Byte]('b')
101101
check(Array(primitiveHeader(TRUE)), hugeMetadata, "VARIANT_CONSTRUCTOR_SIZE_LIMIT")
102102

103103
// The keys are 'aaa....' and 'b'. Values are "yyy..." and 'true'.
104-
val y = Array.fill(1 << 24)('y'.toByte)
104+
val y = Array.fill(SIZE_LIMIT)('y'.toByte)
105105
val hugeObject = Array[Byte](objectHeader(true, 4, 4)) ++
106106
/* size */ padded(Array(2), 4) ++
107107
/* id list */ padded(Array(0, 1), 4) ++
108-
// Second value starts at offset 5 + (1 << 24), which is `5001` little-endian. The last value
109-
// is 1 byte, so the one-past-the-end value is `6001`
110-
/* offset list */ Array[Byte](0, 0, 0, 0, 5, 0, 0, 1, 6, 0, 0, 1) ++
108+
// Second value starts at offset 5 + (SIZE_LIMIT), which is `5008` little-endian. The last
109+
// value is 1 byte, so the one-past-the-end value is `6008`
110+
/* offset list */ Array[Byte](0, 0, 0, 0, 5, 0, 0, 8, 6, 0, 0, 8) ++
111111
/* field data */ Array[Byte](primitiveHeader(LONG_STR), 0, 0, 0, 1) ++ y ++ Array[Byte](
112112
primitiveHeader(TRUE)
113113
)

sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -824,8 +824,8 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
824824
parameters = Map("schema" -> "\"STRUCT<a: VARIANT, b: VARIANT>\""))
825825

826826
// In singleVariantColumn mode, from_csv normally treats all inputs as valid. The only exception
827-
// case is the input exceeds the variant size limit (16MiB).
828-
val largeInput = "a" * (16 * 1024 * 1024)
827+
// case is the input exceeds the variant size limit (128MiB).
828+
val largeInput = "a" * (128 * 1024 * 1024)
829829
checkAnswer(
830830
Seq(largeInput).toDF("value").select(
831831
from_csv(

sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ class VariantEndToEndSuite extends QueryTest with SharedSparkSession {
135135
check("{1:2}", null)
136136
check("{\"a\":1", null)
137137
check("{\"a\":[a,b,c]}", null)
138-
check("\"" + "a" * (16 * 1024 * 1024) + "\"", null)
138+
check("\"" + "a" * (16 * 1024 * 1024) + "\"")
139+
check("\"" + "a" * (128 * 1024 * 1024) + "\"", null)
139140
}
140141

141142
test("to_json with nested variant") {

0 commit comments

Comments
 (0)