Skip to content

Commit a6f6e07

Browse files
eason-yuchen-liuWweiL
authored andcommitted
[SPARK-48939][AVRO] Support reading Avro with recursive schema reference
Continue the discussion from #47425 to this PR because I can't push to Yuchen's account ### What changes were proposed in this pull request? The builtin ProtoBuf connector first supports recursive schema reference. It is approached by letting users specify an option “recursive.fields.max.depth”, and at the start of the execution, unroll the recursive field by this level. It converts a problem of dynamic schema for each row to a fixed schema which is supported by Spark. Avro can just adopt a similar method. This PR defines an option "recursiveFieldMaxDepth" to both Avro data source and from_avro function. With this option, Spark can support Avro recursive schema up to certain depth. ### Why are the changes needed? Recursive reference denotes the case that the type of a field can be defined before in the parent nodes. A simple example is: ``` { "type": "record", "name": "LongList", "fields" : [ {"name": "value", "type": "long"}, {"name": "next", "type": ["null", "LongList"]} ] } ``` This is written in Avro Schema DSL and represents a linked list data structure. Spark currently will throw an error on this schema. Many users used schema like this, so we should support it. ### Does this PR introduce any user-facing change? Yes. Previously, it will throw error on recursive schemas like above. With this change, it will still throw the same error by default but when users specify the option to a number greater than 0, the schema will be unrolled to that depth. ### How was this patch tested? Added new unit tests and integration tests to AvroSuite and AvroFunctionSuite. ### Was this patch authored or co-authored using generative AI tooling? No. Co-authored-by: Wei Liu <wei.liudatabricks.com> Closes #48043 from WweiL/yuchen-avro-recursive-schema. Lead-authored-by: Yuchen Liu <[email protected]> Co-authored-by: Wei Liu <[email protected]> Co-authored-by: Yuchen Liu <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent fbf81eb commit a6f6e07

File tree

15 files changed

+488
-87
lines changed

15 files changed

+488
-87
lines changed

common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ private[spark] object LogKeys {
266266
case object FEATURE_NAME extends LogKey
267267
case object FETCH_SIZE extends LogKey
268268
case object FIELD_NAME extends LogKey
269+
case object FIELD_TYPE extends LogKey
269270
case object FILES extends LogKey
270271
case object FILE_ABSOLUTE_PATH extends LogKey
271272
case object FILE_END_OFFSET extends LogKey
@@ -652,6 +653,7 @@ private[spark] object LogKeys {
652653
case object RECEIVER_IDS extends LogKey
653654
case object RECORDS extends LogKey
654655
case object RECOVERY_STATE extends LogKey
656+
case object RECURSIVE_DEPTH extends LogKey
655657
case object REDACTED_STATEMENT extends LogKey
656658
case object REDUCE_ID extends LogKey
657659
case object REGEX extends LogKey

connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ private[sql] case class AvroDataToCatalyst(
4242
val dt = SchemaConverters.toSqlType(
4343
expectedSchema,
4444
avroOptions.useStableIdForUnionType,
45-
avroOptions.stableIdPrefixForUnionType).dataType
45+
avroOptions.stableIdPrefixForUnionType,
46+
avroOptions.recursiveFieldMaxDepth).dataType
4647
parseMode match {
4748
// With PermissiveMode, the output Catalyst row might contain columns of null values for
4849
// corrupt records, even if some of the columns are not nullable in the user-provided schema.
@@ -69,7 +70,8 @@ private[sql] case class AvroDataToCatalyst(
6970
dataType,
7071
avroOptions.datetimeRebaseModeInRead,
7172
avroOptions.useStableIdForUnionType,
72-
avroOptions.stableIdPrefixForUnionType)
73+
avroOptions.stableIdPrefixForUnionType,
74+
avroOptions.recursiveFieldMaxDepth)
7375

7476
@transient private var decoder: BinaryDecoder = _
7577

connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,25 @@ private[sql] class AvroDeserializer(
5151
datetimeRebaseSpec: RebaseSpec,
5252
filters: StructFilters,
5353
useStableIdForUnionType: Boolean,
54-
stableIdPrefixForUnionType: String) {
54+
stableIdPrefixForUnionType: String,
55+
recursiveFieldMaxDepth: Int) {
5556

5657
def this(
5758
rootAvroType: Schema,
5859
rootCatalystType: DataType,
5960
datetimeRebaseMode: String,
6061
useStableIdForUnionType: Boolean,
61-
stableIdPrefixForUnionType: String) = {
62+
stableIdPrefixForUnionType: String,
63+
recursiveFieldMaxDepth: Int) = {
6264
this(
6365
rootAvroType,
6466
rootCatalystType,
6567
positionalFieldMatch = false,
6668
RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)),
6769
new NoopFilters,
6870
useStableIdForUnionType,
69-
stableIdPrefixForUnionType)
71+
stableIdPrefixForUnionType,
72+
recursiveFieldMaxDepth)
7073
}
7174

7275
private lazy val decimalConversions = new DecimalConversion()
@@ -128,7 +131,8 @@ private[sql] class AvroDeserializer(
128131
s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})"
129132

130133
val realDataType = SchemaConverters.toSqlType(
131-
avroType, useStableIdForUnionType, stableIdPrefixForUnionType).dataType
134+
avroType, useStableIdForUnionType, stableIdPrefixForUnionType,
135+
recursiveFieldMaxDepth).dataType
132136

133137
(avroType.getType, catalystType) match {
134138
case (NULL, NullType) => (updater, ordinal, _) =>

connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ private[sql] class AvroFileFormat extends FileFormat
145145
datetimeRebaseMode,
146146
avroFilters,
147147
parsedOptions.useStableIdForUnionType,
148-
parsedOptions.stableIdPrefixForUnionType)
148+
parsedOptions.stableIdPrefixForUnionType,
149+
parsedOptions.recursiveFieldMaxDepth)
149150
override val stopPosition = file.start + file.length
150151

151152
override def hasNext: Boolean = hasNextRow

connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.SparkSession
2828
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
2929
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode}
30+
import org.apache.spark.sql.errors.QueryCompilationErrors
3031
import org.apache.spark.sql.internal.SQLConf
3132

3233
/**
@@ -136,6 +137,15 @@ private[sql] class AvroOptions(
136137

137138
val stableIdPrefixForUnionType: String = parameters
138139
.getOrElse(STABLE_ID_PREFIX_FOR_UNION_TYPE, "member_")
140+
141+
val recursiveFieldMaxDepth: Int =
142+
parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1)
143+
144+
if (recursiveFieldMaxDepth > RECURSIVE_FIELD_MAX_DEPTH_LIMIT) {
145+
throw QueryCompilationErrors.avroOptionsException(
146+
RECURSIVE_FIELD_MAX_DEPTH,
147+
s"Should not be greater than $RECURSIVE_FIELD_MAX_DEPTH_LIMIT.")
148+
}
139149
}
140150

141151
private[sql] object AvroOptions extends DataSourceOptions {
@@ -170,4 +180,25 @@ private[sql] object AvroOptions extends DataSourceOptions {
170180
// When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure the prefix for fields
171181
// of Avro Union type.
172182
val STABLE_ID_PREFIX_FOR_UNION_TYPE = newOption("stableIdentifierPrefixForUnionType")
183+
184+
/**
185+
* Adds support for recursive fields. If this option is not specified or is set to 0, recursive
186+
* fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive
187+
* fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15.
188+
* Values larger than 15 are not allowed in order to avoid inadvertently creating very large
189+
* schemas. If an avro message has depth beyond this limit, the Spark struct returned is
190+
* truncated after the recursion limit.
191+
*
192+
* Examples: Consider an Avro schema with a recursive field:
193+
* {"type" : "record", "name" : "Node", "fields" : [{"name": "Id", "type": "int"},
194+
* {"name": "Next", "type": ["null", "Node"]}]}
195+
* The following lists the parsed schema with different values for this setting.
196+
* 1: `struct<Id: int>`
197+
* 2: `struct<Id: int, Next: struct<Id: int>>`
198+
* 3: `struct<Id: int, Next: struct<Id: int, Next: struct<Id: int>>>`
199+
* and so on.
200+
*/
201+
val RECURSIVE_FIELD_MAX_DEPTH = newOption("recursiveFieldMaxDepth")
202+
203+
val RECURSIVE_FIELD_MAX_DEPTH_LIMIT: Int = 15
173204
}

connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ private[sql] object AvroUtils extends Logging {
6565
SchemaConverters.toSqlType(
6666
avroSchema,
6767
parsedOptions.useStableIdForUnionType,
68-
parsedOptions.stableIdPrefixForUnionType).dataType match {
68+
parsedOptions.stableIdPrefixForUnionType,
69+
parsedOptions.recursiveFieldMaxDepth).dataType match {
6970
case t: StructType => Some(t)
7071
case _ => throw new RuntimeException(
7172
s"""Avro schema cannot be converted to a Spark SQL StructType:

0 commit comments

Comments
 (0)