From 3fbcd6b7ff471d5324b46d34af6bd6b891807d27 Mon Sep 17 00:00:00 2001 From: Chenhao Li Date: Sat, 17 May 2025 00:23:32 +0200 Subject: [PATCH 1/2] [SPARK-52181] Increase variant size limit to 128MiB Increase variant size limit from 16MiB to 128MiB. It is difficult to control the size limit with a flag, because the limit is accessed in many places where the SQL config is not available (e.g., `VariantVal.toString`). Future memory instability is possible, but this change won't break any existing workload. It enhances the ability of the variant data type to process larger data. Yes, as stated above. Unit test. No. Closes #50913 from chenhao-db/variant_large_size_limit. Lead-authored-by: Chenhao Li Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../spark/types/variant/VariantBuilder.java | 7 ++++--- .../apache/spark/types/variant/VariantUtil.java | 4 ++-- .../VariantExpressionEvalUtilsSuite.scala | 5 ++--- .../variant/VariantExpressionSuite.scala | 12 ++++++------ .../apache/spark/sql/CsvFunctionsSuite.scala | 17 +++++++++++++++++ .../apache/spark/sql/VariantEndToEndSuite.scala | 3 ++- 6 files changed, 33 insertions(+), 15 deletions(-) diff --git a/common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java b/common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java index 2f575c55ea362..9c4487dbd3f75 100644 --- a/common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java +++ b/common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java @@ -542,12 +542,13 @@ private void buildJson(JsonParser parser) throws IOException { } // Choose the smallest unsigned integer type that can store `value`. It must be within - // `[0, U24_MAX]`. + // `[0, SIZE_LIMIT]`. private int getIntegerSize(int value) { - assert value >= 0 && value <= U24_MAX; + assert value >= 0 && value <= SIZE_LIMIT; if (value <= U8_MAX) return 1; if (value <= U16_MAX) return 2; - return U24_SIZE; + if (value <= U24_MAX) return 3; + return 4; } private void parseFloatingPoint(JsonParser parser) throws IOException { diff --git a/common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java b/common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java index 4205b76a530aa..63937f9dd217c 100644 --- a/common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java +++ b/common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java @@ -137,8 +137,8 @@ public class VariantUtil { public static final int U24_SIZE = 3; public static final int U32_SIZE = 4; - // Both variant value and variant metadata need to be no longer than 16MiB. - public static final int SIZE_LIMIT = U24_MAX + 1; + // Both variant value and variant metadata need to be no longer than 128MiB. + public static final int SIZE_LIMIT = 128 * 1024 * 1024; public static final int MAX_DECIMAL4_PRECISION = 9; public static final int MAX_DECIMAL8_PRECISION = 18; diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala index f599fead45015..8f8256ff1f6ed 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala @@ -133,10 +133,9 @@ class VariantExpressionEvalUtilsSuite extends SparkFunSuite { checkException(json, "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION", Map("badRecord" -> json, "failFastMode" -> "FAILFAST")) } - for (json <- Seq("\"" + "a" * (16 * 1024 * 1024) + "\"", - (0 to 4 * 1024 * 1024).mkString("[", ",", "]"))) { + for (json <- Seq((0 to 32 * 1024 * 1024).mkString("[", ",", "]"))) { checkException(json, "VARIANT_SIZE_LIMIT", - Map("sizeLimit" -> "16.0 MiB", "functionName" -> "`parse_json`")) + Map("sizeLimit" -> "128.0 MiB", "functionName" -> "`parse_json`")) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionSuite.scala index 1cd8cc6228efc..5d61d09cfc4b7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionSuite.scala @@ -93,21 +93,21 @@ class VariantExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { check(Array(primitiveHeader(INT1), 0), Array[Byte](3, 0, 0)) check(Array(primitiveHeader(INT1), 0), Array[Byte](2, 0, 0)) - // Construct binary values that are over 1 << 24 bytes, but otherwise valid. + // Construct binary values that are over SIZE_LIMIT bytes, but otherwise valid. val bigVersion = Array[Byte]((VERSION | (3 << 6)).toByte) - val a = Array.fill(1 << 24)('a'.toByte) + val a = Array.fill(SIZE_LIMIT)('a'.toByte) val hugeMetadata = bigVersion ++ Array[Byte](2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1) ++ a ++ Array[Byte]('b') check(Array(primitiveHeader(TRUE)), hugeMetadata, "VARIANT_CONSTRUCTOR_SIZE_LIMIT") // The keys are 'aaa....' and 'b'. Values are "yyy..." and 'true'. - val y = Array.fill(1 << 24)('y'.toByte) + val y = Array.fill(SIZE_LIMIT)('y'.toByte) val hugeObject = Array[Byte](objectHeader(true, 4, 4)) ++ /* size */ padded(Array(2), 4) ++ /* id list */ padded(Array(0, 1), 4) ++ - // Second value starts at offset 5 + (1 << 24), which is `5001` little-endian. The last value - // is 1 byte, so the one-past-the-end value is `6001` - /* offset list */ Array[Byte](0, 0, 0, 0, 5, 0, 0, 1, 6, 0, 0, 1) ++ + // Second value starts at offset 5 + (SIZE_LIMIT), which is `5008` little-endian. The last + // value is 1 byte, so the one-past-the-end value is `6008` + /* offset list */ Array[Byte](0, 0, 0, 0, 5, 0, 0, 8, 6, 0, 0, 8) ++ /* field data */ Array[Byte](primitiveHeader(LONG_STR), 0, 0, 0, 1) ++ y ++ Array[Byte]( primitiveHeader(TRUE) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 970ed5843b3c5..c1d1f2a4d0966 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -764,9 +764,26 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { exception = intercept[SparkUnsupportedOperationException] { df.select(from_csv(lit("data"), valueSchema, Map.empty[String, String])).collect() }, +<<<<<<< HEAD condition = "UNSUPPORTED_DATATYPE", parameters = Map("typeName" -> "\"VARIANT\"") ) +======= + condition = "INVALID_SINGLE_VARIANT_COLUMN", + parameters = Map("schema" -> "\"STRUCT\"")) + + // In singleVariantColumn mode, from_csv normally treats all inputs as valid. The only exception + // case is the input exceeds the variant size limit (128MiB). + val largeInput = "a" * (128 * 1024 * 1024) + checkAnswer( + Seq(largeInput).toDF("value").select( + from_csv( + $"value", + StructType.fromDDL("v variant, _corrupt_record string"), + Map("singleVariantColumn" -> "v") + ).cast("string")), + Seq(Row(s"""{null, $largeInput}"""))) +>>>>>>> af6499fd199 ([SPARK-52181] Increase variant size limit to 128MiB) } test("SPARK-47497: the input of to_csv must be StructType") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala index cf83ae30a6798..620d00b816510 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala @@ -135,7 +135,8 @@ class VariantEndToEndSuite extends QueryTest with SharedSparkSession { check("{1:2}", null) check("{\"a\":1", null) check("{\"a\":[a,b,c]}", null) - check("\"" + "a" * (16 * 1024 * 1024) + "\"", null) + check("\"" + "a" * (16 * 1024 * 1024) + "\"") + check("\"" + "a" * (128 * 1024 * 1024) + "\"", null) } test("to_json with nested variant") { From b8b9235d77c705b8d108e8ab8fba9a2a83613cc7 Mon Sep 17 00:00:00 2001 From: Chenhao Li Date: Fri, 16 May 2025 15:28:20 -0700 Subject: [PATCH 2/2] fix conflict --- .../apache/spark/sql/CsvFunctionsSuite.scala | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index c1d1f2a4d0966..970ed5843b3c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -764,26 +764,9 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { exception = intercept[SparkUnsupportedOperationException] { df.select(from_csv(lit("data"), valueSchema, Map.empty[String, String])).collect() }, -<<<<<<< HEAD condition = "UNSUPPORTED_DATATYPE", parameters = Map("typeName" -> "\"VARIANT\"") ) -======= - condition = "INVALID_SINGLE_VARIANT_COLUMN", - parameters = Map("schema" -> "\"STRUCT\"")) - - // In singleVariantColumn mode, from_csv normally treats all inputs as valid. The only exception - // case is the input exceeds the variant size limit (128MiB). - val largeInput = "a" * (128 * 1024 * 1024) - checkAnswer( - Seq(largeInput).toDF("value").select( - from_csv( - $"value", - StructType.fromDDL("v variant, _corrupt_record string"), - Map("singleVariantColumn" -> "v") - ).cast("string")), - Seq(Row(s"""{null, $largeInput}"""))) ->>>>>>> af6499fd199 ([SPARK-52181] Increase variant size limit to 128MiB) } test("SPARK-47497: the input of to_csv must be StructType") {