From 2a67fb9256cbd3191014df2a2e8b37a7c6a3655e Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Tue, 16 Jul 2024 11:36:06 -0700 Subject: [PATCH 01/25] support in main function --- .../spark/sql/avro/SchemaConverters.scala | 188 ++++++++++++------ 1 file changed, 129 insertions(+), 59 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index b2285aa966dd..5154ca152205 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -27,6 +27,7 @@ import org.apache.avro.LogicalTypes.{Date, Decimal, LocalTimestampMicros, LocalT import org.apache.avro.Schema.Type._ import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types._ import org.apache.spark.sql.types.Decimal.minBytesForPrecision @@ -36,7 +37,7 @@ import org.apache.spark.sql.types.Decimal.minBytesForPrecision * versa. */ @DeveloperApi -object SchemaConverters { +object SchemaConverters extends Logging { private lazy val nullSchema = Schema.create(Schema.Type.NULL) /** @@ -54,8 +55,10 @@ object SchemaConverters { def toSqlType( avroSchema: Schema, useStableIdForUnionType: Boolean, - stableIdPrefixForUnionType: String): SchemaType = { - toSqlTypeHelper(avroSchema, Set.empty, useStableIdForUnionType, stableIdPrefixForUnionType) + stableIdPrefixForUnionType: String, + recursiveFieldMaxDepth: Int = -1): SchemaType = { + toSqlTypeHelper(avroSchema, Map.empty, useStableIdForUnionType, stableIdPrefixForUnionType, + recursiveFieldMaxDepth) } /** * Converts an Avro schema to a corresponding Spark SQL schema. @@ -63,7 +66,7 @@ object SchemaConverters { * @since 2.4.0 */ def toSqlType(avroSchema: Schema): SchemaType = { - toSqlType(avroSchema, false, "") + toSqlType(avroSchema, false, "", -1) } @deprecated("using toSqlType(..., useStableIdForUnionType: Boolean) instead", "4.0.0") @@ -71,9 +74,10 @@ object SchemaConverters { val avroOptions = AvroOptions(options) toSqlTypeHelper( avroSchema, - Set.empty, + Map.empty, avroOptions.useStableIdForUnionType, - avroOptions.stableIdPrefixForUnionType) + avroOptions.stableIdPrefixForUnionType, + avroOptions.recursiveFieldMaxDepth) } // The property specifies Catalyst type of the given field @@ -81,9 +85,10 @@ object SchemaConverters { private def toSqlTypeHelper( avroSchema: Schema, - existingRecordNames: Set[String], + existingRecordNames: Map[String, Int], useStableIdForUnionType: Boolean, - stableIdPrefixForUnionType: String): SchemaType = { + stableIdPrefixForUnionType: String, + recursiveFieldMaxDepth: Int): SchemaType = { avroSchema.getType match { case INT => avroSchema.getLogicalType match { case _: Date => SchemaType(DateType, nullable = false) @@ -128,62 +133,112 @@ object SchemaConverters { case NULL => SchemaType(NullType, nullable = true) case RECORD => - if (existingRecordNames.contains(avroSchema.getFullName)) { + val recursiveDepth: Int = existingRecordNames.getOrElse(avroSchema.getFullName, 0) + if (recursiveDepth > 0 && recursiveFieldMaxDepth < 0) { throw new IncompatibleSchemaException(s""" |Found recursive reference in Avro schema, which can not be processed by Spark: |${avroSchema.toString(true)} """.stripMargin) - } - val newRecordNames = existingRecordNames + avroSchema.getFullName - val fields = avroSchema.getFields.asScala.map { f => - val schemaType = toSqlTypeHelper( - f.schema(), - newRecordNames, - useStableIdForUnionType, - stableIdPrefixForUnionType) - StructField(f.name, schemaType.dataType, schemaType.nullable) - } + } else if (recursiveDepth > 0 && recursiveDepth >= recursiveFieldMaxDepth) { + println( + s"The field ${avroSchema.getFullName} of type ${avroSchema.getType.getName} is " + + s"dropped at recursive depth $recursiveDepth." + ) + null + } else { + val newRecordNames = + existingRecordNames + (avroSchema.getFullName -> (recursiveDepth + 1)) + val fields = avroSchema.getFields.asScala.map { f => + val schemaType = toSqlTypeHelper( + f.schema(), + newRecordNames, + useStableIdForUnionType, + stableIdPrefixForUnionType, + recursiveFieldMaxDepth) + if (schemaType == null) { + null + } + else { + StructField(f.name, schemaType.dataType, schemaType.nullable) + } + }.filter(_ != null).toSeq + fields match { + case Nil => + convertEmptyAvroToStructWithDummyField(avroSchema.getFullName) - SchemaType(StructType(fields.toArray), nullable = false) + case fds => SchemaType(StructType(fds), nullable = false) + } + } case ARRAY => val schemaType = toSqlTypeHelper( avroSchema.getElementType, existingRecordNames, useStableIdForUnionType, - stableIdPrefixForUnionType) - SchemaType( - ArrayType(schemaType.dataType, containsNull = schemaType.nullable), - nullable = false) + stableIdPrefixForUnionType, + recursiveFieldMaxDepth) + if (schemaType == null) { + println( + s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + + "does not have any fields left likely due to recursive depth limit." + ) + null + } else { + SchemaType( + ArrayType(schemaType.dataType, containsNull = schemaType.nullable), + nullable = false) + } case MAP => val schemaType = toSqlTypeHelper(avroSchema.getValueType, - existingRecordNames, useStableIdForUnionType, stableIdPrefixForUnionType) - SchemaType( - MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable), - nullable = false) + existingRecordNames, useStableIdForUnionType, stableIdPrefixForUnionType, + recursiveFieldMaxDepth) + if (schemaType == null) { + println( + s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + + "does not have any fields left likely due to recursive depth limit." + ) + null + } else { + SchemaType( + MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable), + nullable = false) + } case UNION => if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { // In case of a union with null, eliminate it and make a recursive call val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema) - if (remainingUnionTypes.size == 1) { - toSqlTypeHelper( - remainingUnionTypes.head, - existingRecordNames, - useStableIdForUnionType, - stableIdPrefixForUnionType).copy(nullable = true) + val schemaType = + if (remainingUnionTypes.size == 1) { + toSqlTypeHelper( + remainingUnionTypes.head, + existingRecordNames, + useStableIdForUnionType, + stableIdPrefixForUnionType, + recursiveFieldMaxDepth) + } else { + toSqlTypeHelper( + Schema.createUnion(remainingUnionTypes.asJava), + existingRecordNames, + useStableIdForUnionType, + stableIdPrefixForUnionType, + recursiveFieldMaxDepth) + } + if (schemaType == null) { + println( + s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + + "does not have any fields left likely due to recursive depth limit." + ) + null } else { - toSqlTypeHelper( - Schema.createUnion(remainingUnionTypes.asJava), - existingRecordNames, - useStableIdForUnionType, - stableIdPrefixForUnionType).copy(nullable = true) + schemaType.copy(nullable = true) } } else avroSchema.getTypes.asScala.map(_.getType).toSeq match { case Seq(t1) => toSqlTypeHelper(avroSchema.getTypes.get(0), - existingRecordNames, useStableIdForUnionType, stableIdPrefixForUnionType) + existingRecordNames, useStableIdForUnionType, stableIdPrefixForUnionType, + recursiveFieldMaxDepth) case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => SchemaType(LongType, nullable = false) case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => @@ -201,29 +256,38 @@ object SchemaConverters { s, existingRecordNames, useStableIdForUnionType, - stableIdPrefixForUnionType) - - val fieldName = if (useStableIdForUnionType) { - // Avro's field name may be case sensitive, so field names for two named type - // could be "a" and "A" and we need to distinguish them. In this case, we throw - // an exception. - // Stable id prefix can be empty so the name of the field can be just the type. - val tempFieldName = s"${stableIdPrefixForUnionType}${s.getName}" - if (!fieldNameSet.add(tempFieldName.toLowerCase(Locale.ROOT))) { - throw new IncompatibleSchemaException( - "Cannot generate stable identifier for Avro union type due to name " + - s"conflict of type name ${s.getName}") - } - tempFieldName + stableIdPrefixForUnionType, + recursiveFieldMaxDepth) + if (schemaType == null) { + null } else { - s"member$i" + val fieldName = if (useStableIdForUnionType) { + // Avro's field name may be case sensitive, so field names for two named type + // could be "a" and "A" and we need to distinguish them. In this case, we throw + // an exception. + // Stable id prefix can be empty so the name of the field can be just the type. + val tempFieldName = s"${stableIdPrefixForUnionType}${s.getName}" + if (!fieldNameSet.add(tempFieldName.toLowerCase(Locale.ROOT))) { + throw new IncompatibleSchemaException( + "Cannot generate stable identifier for Avro union type due to name " + + s"conflict of type name ${s.getName}") + } + tempFieldName + } else { + s"member$i" + } + + // All fields are nullable because only one of them is set at a time + StructField(fieldName, schemaType.dataType, nullable = true) } + }.filter(_ != null).toSeq - // All fields are nullable because only one of them is set at a time - StructField(fieldName, schemaType.dataType, nullable = true) - } + fields match { + case Nil => + convertEmptyAvroToStructWithDummyField(avroSchema.getFullName) - SchemaType(StructType(fields.toArray), nullable = false) + case fds => SchemaType(StructType(fds), nullable = false) + } } case other => throw new IncompatibleSchemaException(s"Unsupported type $other") @@ -303,6 +367,12 @@ object SchemaConverters { schema } } + + private def convertEmptyAvroToStructWithDummyField(fieldName: String): SchemaType = { + log.info(s"Keep $fieldName which is empty struct by inserting a dummy field.") + SchemaType( + StructType(StructField("__dummy_field_in_empty_struct", StringType) :: Nil), nullable = true) + } } private[avro] class IncompatibleSchemaException( From 7dc41e29e273d1059de7ba8392c178ffd7a458e9 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Tue, 16 Jul 2024 13:45:55 -0700 Subject: [PATCH 02/25] Add recursiveFieldMaxDepth option --- .../spark/sql/avro/AvroDataToCatalyst.scala | 6 +- .../spark/sql/avro/AvroDeserializer.scala | 12 ++-- .../spark/sql/avro/AvroFileFormat.scala | 3 +- .../apache/spark/sql/avro/AvroOptions.scala | 5 ++ .../org/apache/spark/sql/avro/AvroUtils.scala | 3 +- .../spark/sql/avro/SchemaConverters.scala | 31 ++++++---- .../v2/avro/AvroPartitionReaderFactory.scala | 3 +- .../AvroCatalystDataConversionSuite.scala | 3 +- .../spark/sql/avro/AvroRowReaderSuite.scala | 3 +- .../spark/sql/avro/AvroSerdeSuite.scala | 3 +- .../org/apache/spark/sql/avro/AvroSuite.scala | 59 +++++++++++++++++-- 11 files changed, 102 insertions(+), 29 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 7d80998d96eb..0b85b208242c 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -42,7 +42,8 @@ private[sql] case class AvroDataToCatalyst( val dt = SchemaConverters.toSqlType( expectedSchema, avroOptions.useStableIdForUnionType, - avroOptions.stableIdPrefixForUnionType).dataType + avroOptions.stableIdPrefixForUnionType, + avroOptions.recursiveFieldMaxDepth).dataType parseMode match { // With PermissiveMode, the output Catalyst row might contain columns of null values for // corrupt records, even if some of the columns are not nullable in the user-provided schema. @@ -69,7 +70,8 @@ private[sql] case class AvroDataToCatalyst( dataType, avroOptions.datetimeRebaseModeInRead, avroOptions.useStableIdForUnionType, - avroOptions.stableIdPrefixForUnionType) + avroOptions.stableIdPrefixForUnionType, + avroOptions.recursiveFieldMaxDepth) @transient private var decoder: BinaryDecoder = _ diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 139c45adb442..ea06767a391b 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -51,14 +51,16 @@ private[sql] class AvroDeserializer( datetimeRebaseSpec: RebaseSpec, filters: StructFilters, useStableIdForUnionType: Boolean, - stableIdPrefixForUnionType: String) { + stableIdPrefixForUnionType: String, + recursiveFieldMaxDepth: Int) { def this( rootAvroType: Schema, rootCatalystType: DataType, datetimeRebaseMode: String, useStableIdForUnionType: Boolean, - stableIdPrefixForUnionType: String) = { + stableIdPrefixForUnionType: String, + recursiveFieldMaxDepth: Int) = { this( rootAvroType, rootCatalystType, @@ -66,7 +68,8 @@ private[sql] class AvroDeserializer( RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)), new NoopFilters, useStableIdForUnionType, - stableIdPrefixForUnionType) + stableIdPrefixForUnionType, + recursiveFieldMaxDepth) } private lazy val decimalConversions = new DecimalConversion() @@ -128,7 +131,8 @@ private[sql] class AvroDeserializer( s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})" val realDataType = SchemaConverters.toSqlType( - avroType, useStableIdForUnionType, stableIdPrefixForUnionType).dataType + avroType, useStableIdForUnionType, stableIdPrefixForUnionType, + recursiveFieldMaxDepth).dataType (avroType.getType, catalystType) match { case (NULL, NullType) => (updater, ordinal, _) => diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 372f24b54f5c..264c3a1f48ab 100755 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -145,7 +145,8 @@ private[sql] class AvroFileFormat extends FileFormat datetimeRebaseMode, avroFilters, parsedOptions.useStableIdForUnionType, - parsedOptions.stableIdPrefixForUnionType) + parsedOptions.stableIdPrefixForUnionType, + parsedOptions.recursiveFieldMaxDepth) override val stopPosition = file.start + file.length override def hasNext: Boolean = hasNextRow diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 4332904339f1..55afa748ed79 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -136,6 +136,9 @@ private[sql] class AvroOptions( val stableIdPrefixForUnionType: String = parameters .getOrElse(STABLE_ID_PREFIX_FOR_UNION_TYPE, "member_") + + val recursiveFieldMaxDepth: Int = + parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1) } private[sql] object AvroOptions extends DataSourceOptions { @@ -170,4 +173,6 @@ private[sql] object AvroOptions extends DataSourceOptions { // When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure the prefix for fields // of Avro Union type. val STABLE_ID_PREFIX_FOR_UNION_TYPE = newOption("stableIdentifierPrefixForUnionType") + + val RECURSIVE_FIELD_MAX_DEPTH = newOption("recursiveFieldMaxDepth") } diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index 7cbc30f1fb3d..594ebb4716c4 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -65,7 +65,8 @@ private[sql] object AvroUtils extends Logging { SchemaConverters.toSqlType( avroSchema, parsedOptions.useStableIdForUnionType, - parsedOptions.stableIdPrefixForUnionType).dataType match { + parsedOptions.stableIdPrefixForUnionType, + parsedOptions.recursiveFieldMaxDepth).dataType match { case t: StructType => Some(t) case _ => throw new RuntimeException( s"""Avro schema cannot be converted to a Spark SQL StructType: diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 5154ca152205..69d0116e0cb7 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -140,7 +140,7 @@ object SchemaConverters extends Logging { |${avroSchema.toString(true)} """.stripMargin) } else if (recursiveDepth > 0 && recursiveDepth >= recursiveFieldMaxDepth) { - println( + log.info( s"The field ${avroSchema.getFullName} of type ${avroSchema.getType.getName} is " + s"dropped at recursive depth $recursiveDepth." ) @@ -164,8 +164,11 @@ object SchemaConverters extends Logging { }.filter(_ != null).toSeq fields match { case Nil => - convertEmptyAvroToStructWithDummyField(avroSchema.getFullName) - + log.info( + s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + + "does not have any fields left likely due to recursive depth limit." + ) + null case fds => SchemaType(StructType(fds), nullable = false) } } @@ -178,7 +181,7 @@ object SchemaConverters extends Logging { stableIdPrefixForUnionType, recursiveFieldMaxDepth) if (schemaType == null) { - println( + log.info( s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + "does not have any fields left likely due to recursive depth limit." ) @@ -194,7 +197,7 @@ object SchemaConverters extends Logging { existingRecordNames, useStableIdForUnionType, stableIdPrefixForUnionType, recursiveFieldMaxDepth) if (schemaType == null) { - println( + log.info( s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + "does not have any fields left likely due to recursive depth limit." ) @@ -226,7 +229,7 @@ object SchemaConverters extends Logging { recursiveFieldMaxDepth) } if (schemaType == null) { - println( + log.info( s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + "does not have any fields left likely due to recursive depth limit." ) @@ -284,7 +287,11 @@ object SchemaConverters extends Logging { fields match { case Nil => - convertEmptyAvroToStructWithDummyField(avroSchema.getFullName) + log.info( + s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as " + + "it does not have any fields left likely due to recursive depth limit." + ) + null case fds => SchemaType(StructType(fds), nullable = false) } @@ -368,11 +375,11 @@ object SchemaConverters extends Logging { } } - private def convertEmptyAvroToStructWithDummyField(fieldName: String): SchemaType = { - log.info(s"Keep $fieldName which is empty struct by inserting a dummy field.") - SchemaType( - StructType(StructField("__dummy_field_in_empty_struct", StringType) :: Nil), nullable = true) - } +// private def convertEmptyAvroToStructWithDummyField(fieldName: String): SchemaType = { +// log.info(s"Keep $fieldName which is empty struct by inserting a dummy field.") +// SchemaType( +// StructType(StructField("__dummy_field_in_empty_struct", StringType) :: Nil), nullable = true) +// } } private[avro] class IncompatibleSchemaException( diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 1083c9916072..a13faf3b5156 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -105,7 +105,8 @@ case class AvroPartitionReaderFactory( datetimeRebaseMode, avroFilters, options.useStableIdForUnionType, - options.stableIdPrefixForUnionType) + options.stableIdPrefixForUnionType, + options.recursiveFieldMaxDepth) override val stopPosition = partitionedFile.start + partitionedFile.length override def next(): Boolean = hasNextRow diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index 388347537a4d..311eda3a1b6a 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -291,7 +291,8 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite RebaseSpec(LegacyBehaviorPolicy.CORRECTED), filters, false, - "") + "", + -1) val deserialized = deserializer.deserialize(data) expected match { case None => assert(deserialized == None) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala index 9b3bb929a700..c1ab96a63eb2 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala @@ -77,7 +77,8 @@ class AvroRowReaderSuite RebaseSpec(CORRECTED), new NoopFilters, false, - "") + "", + -1) override val stopPosition = fileSize override def hasNext: Boolean = hasNextRow diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala index cbcbc2e7e76a..3643a95abe19 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala @@ -228,7 +228,8 @@ object AvroSerdeSuite { RebaseSpec(CORRECTED), new NoopFilters, false, - "") + "", + -1) } /** diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 42c13f5e2087..d6a0ede34329 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -2187,7 +2187,7 @@ abstract class AvroSuite } } - private def checkSchemaWithRecursiveLoop(avroSchema: String): Unit = { + private def checkSchemaWithRecursiveLoopError(avroSchema: String): Unit = { val message = intercept[IncompatibleSchemaException] { SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema), false, "") }.getMessage @@ -2196,7 +2196,7 @@ abstract class AvroSuite } test("Detect recursive loop") { - checkSchemaWithRecursiveLoop(""" + checkSchemaWithRecursiveLoopError(""" |{ | "type": "record", | "name": "LongList", @@ -2207,7 +2207,7 @@ abstract class AvroSuite |} """.stripMargin) - checkSchemaWithRecursiveLoop(""" + checkSchemaWithRecursiveLoopError(""" |{ | "type": "record", | "name": "LongList", @@ -2229,7 +2229,7 @@ abstract class AvroSuite |} """.stripMargin) - checkSchemaWithRecursiveLoop(""" + checkSchemaWithRecursiveLoopError(""" |{ | "type": "record", | "name": "LongList", @@ -2240,7 +2240,7 @@ abstract class AvroSuite |} """.stripMargin) - checkSchemaWithRecursiveLoop(""" + checkSchemaWithRecursiveLoopError(""" |{ | "type": "record", | "name": "LongList", @@ -2252,6 +2252,55 @@ abstract class AvroSuite """.stripMargin) } + private def checkSchemaWithRecursiveLoop( + avroSchema: String, expectedSchema: StructType, recursiveFieldMaxDepth: Int): Unit = { + val sparkSchema = + SchemaConverters.toSqlType( + new Schema.Parser().parse(avroSchema), false, "", recursiveFieldMaxDepth).dataType + + assert(sparkSchema === expectedSchema) + } + + + + test("dev test") { + val catalystSchema = + StructType(Seq( + StructField("Id", IntegerType), + StructField("Name", + StructType( + Seq( + StructField("Id", IntegerType), + StructField("Name", StructType(Seq(StructField("Id", IntegerType))))))))) + + val avroSchema = s""" + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [ + | {"name": "Id", "type": "int"}, + | {"name": "Name", "type": ["null", "test_schema"]} + | ] + |} + """.stripMargin + + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(2, Row(3, null)), Row(1, null))), + catalystSchema) + + withTempPath { tempDir => + df.write.format("avro").option("avroSchema", avroSchema).save(tempDir.getPath) + checkAnswer( + spark.read + .format("avro") + .option("avroSchema", avroSchema) + .option("recursiveFieldMaxDepth", 10) + .load(tempDir.getPath), + df) + } + + } + test("log a warning of ignoreExtension deprecation") { val logAppender = new LogAppender("deprecated Avro option 'ignoreExtension'") withTempPath { dir => From 3d8d99e8d8a91a678ee876e2a28555403876bc04 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 17 Jul 2024 11:04:49 -0700 Subject: [PATCH 03/25] add unit tests and integration test --- .../spark/sql/avro/SchemaConverters.scala | 11 +- .../org/apache/spark/sql/avro/AvroSuite.scala | 112 ++++++++++++++++-- 2 files changed, 109 insertions(+), 14 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 69d0116e0cb7..4e699994f3f0 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -57,8 +57,12 @@ object SchemaConverters extends Logging { useStableIdForUnionType: Boolean, stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int = -1): SchemaType = { - toSqlTypeHelper(avroSchema, Map.empty, useStableIdForUnionType, stableIdPrefixForUnionType, - recursiveFieldMaxDepth) + val schema = toSqlTypeHelper(avroSchema, Map.empty, useStableIdForUnionType, + stableIdPrefixForUnionType, recursiveFieldMaxDepth) + if (schema == null) { + return SchemaType(StructType(Nil), nullable = true) + } + schema } /** * Converts an Avro schema to a corresponding Spark SQL schema. @@ -72,9 +76,8 @@ object SchemaConverters extends Logging { @deprecated("using toSqlType(..., useStableIdForUnionType: Boolean) instead", "4.0.0") def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType = { val avroOptions = AvroOptions(options) - toSqlTypeHelper( + toSqlType( avroSchema, - Map.empty, avroOptions.useStableIdForUnionType, avroOptions.stableIdPrefixForUnionType, avroOptions.recursiveFieldMaxDepth) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index d6a0ede34329..2f21efb21e67 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -459,8 +459,10 @@ abstract class AvroSuite val e = intercept[Exception] { checkUnionStableId( List( - Schema.createRecord("myRecord", "", null, false, List[Schema.Field]().asJava), - Schema.createRecord("myrecord", "", null, false, List[Schema.Field]().asJava)), + Schema.createRecord("myRecord", "", null, false, List[Schema.Field]( + new Schema.Field("F", Schema.create(Type.FLOAT))).asJava), + Schema.createRecord("myrecord", "", null, false, List[Schema.Field]( + new Schema.Field("F", Schema.create(Type.FLOAT))).asJava)), "", Seq()) } @@ -2187,7 +2189,7 @@ abstract class AvroSuite } } - private def checkSchemaWithRecursiveLoopError(avroSchema: String): Unit = { + private def checkSchemaWithRecursiveLoop(avroSchema: String): Unit = { val message = intercept[IncompatibleSchemaException] { SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema), false, "") }.getMessage @@ -2196,7 +2198,7 @@ abstract class AvroSuite } test("Detect recursive loop") { - checkSchemaWithRecursiveLoopError(""" + checkSchemaWithRecursiveLoop(""" |{ | "type": "record", | "name": "LongList", @@ -2207,7 +2209,7 @@ abstract class AvroSuite |} """.stripMargin) - checkSchemaWithRecursiveLoopError(""" + checkSchemaWithRecursiveLoop(""" |{ | "type": "record", | "name": "LongList", @@ -2229,7 +2231,7 @@ abstract class AvroSuite |} """.stripMargin) - checkSchemaWithRecursiveLoopError(""" + checkSchemaWithRecursiveLoop(""" |{ | "type": "record", | "name": "LongList", @@ -2240,7 +2242,7 @@ abstract class AvroSuite |} """.stripMargin) - checkSchemaWithRecursiveLoopError(""" + checkSchemaWithRecursiveLoop(""" |{ | "type": "record", | "name": "LongList", @@ -2252,7 +2254,7 @@ abstract class AvroSuite """.stripMargin) } - private def checkSchemaWithRecursiveLoop( + private def checkSparkSchemaEquals( avroSchema: String, expectedSchema: StructType, recursiveFieldMaxDepth: Int): Unit = { val sparkSchema = SchemaConverters.toSqlType( @@ -2261,7 +2263,96 @@ abstract class AvroSuite assert(sparkSchema === expectedSchema) } - + test("Recursive schema resolve to be empty struct") { + val avroSchema = """ + |{ + | "type": "record", + | "name": "LongList", + | "fields": [ + | { + | "name": "value", + | "type": { + | "type": "record", + | "name": "foo", + | "fields": [ + | { + | "name": "parent", + | "type": "LongList" + | } + | ] + | } + | } + | ] + |} + """.stripMargin + val expectedSchema = StructType(Seq()) + for (i <- 0 to 5) { + checkSparkSchemaEquals(avroSchema, expectedSchema, i) + } + } + + test("Translate recursive schema - 1") { + val avroSchema = """ + |{ + | "type": "record", + | "name": "LongList", + | "fields" : [ + | {"name": "value", "type": "long"}, // each element has a long + | {"name": "next", "type": ["null", "LongList"]} // optional next element + | ] + |} + """.stripMargin + val nonRecursiveFields = new StructType().add("value", LongType, nullable = false) + var expectedSchema = nonRecursiveFields + checkSparkSchemaEquals(avroSchema, expectedSchema, 0) + for (i <- 1 to 5) { + checkSparkSchemaEquals(avroSchema, expectedSchema, i) + expectedSchema = nonRecursiveFields.add("next", expectedSchema) + } + } + + test("Translate recursive schema - 2") { + val avroSchema = """ + |{ + | "type": "record", + | "name": "LongList", + | "fields" : [ + | {"name": "value", "type": "long"}, + | {"name": "array", "type": {"type": "array", "items": "LongList"}} + | ] + |} + """.stripMargin + val nonRecursiveFields = new StructType().add("value", LongType, nullable = false) + var expectedSchema = nonRecursiveFields + checkSparkSchemaEquals(avroSchema, expectedSchema, 0) + for (i <- 1 to 5) { + checkSparkSchemaEquals(avroSchema, expectedSchema, i) + expectedSchema = + nonRecursiveFields.add("array", new ArrayType(expectedSchema, false), nullable = false) + } + } + + test("Translate recursive schema - 3") { + val avroSchema = """ + |{ + | "type": "record", + | "name": "LongList", + | "fields" : [ + | {"name": "value", "type": "long"}, + | {"name": "map", "type": {"type": "map", "values": "LongList"}} + | ] + |} + """.stripMargin + val nonRecursiveFields = new StructType().add("value", LongType, nullable = false) + var expectedSchema = nonRecursiveFields + checkSparkSchemaEquals(avroSchema, expectedSchema, 0) + for (i <- 1 to 5) { + checkSparkSchemaEquals(avroSchema, expectedSchema, i) + expectedSchema = + nonRecursiveFields.add("map", + new MapType(StringType, expectedSchema, false), nullable = false) + } + } test("dev test") { val catalystSchema = @@ -2793,7 +2884,7 @@ abstract class AvroSuite } test("SPARK-40667: validate Avro Options") { - assert(AvroOptions.getAllOptions.size == 11) + assert(AvroOptions.getAllOptions.size == 12) // Please add validation on any new Avro options here assert(AvroOptions.isValidOption("ignoreExtension")) assert(AvroOptions.isValidOption("mode")) @@ -2806,6 +2897,7 @@ abstract class AvroSuite assert(AvroOptions.isValidOption("datetimeRebaseMode")) assert(AvroOptions.isValidOption("enableStableIdentifiersForUnionType")) assert(AvroOptions.isValidOption("stableIdentifierPrefixForUnionType")) + assert(AvroOptions.isValidOption("recursiveFieldMaxDepth")) } test("SPARK-46633: read file with empty blocks") { From 997ff7e0f6a767f31f1583077e14c2f231f347e0 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 17 Jul 2024 15:56:49 -0700 Subject: [PATCH 04/25] add integration tests --- .../spark/sql/avro/SchemaConverters.scala | 27 ++---- .../org/apache/spark/sql/avro/AvroSuite.scala | 88 +++++++++++-------- 2 files changed, 55 insertions(+), 60 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 4e699994f3f0..5c81c852e0b3 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -59,9 +59,8 @@ object SchemaConverters extends Logging { recursiveFieldMaxDepth: Int = -1): SchemaType = { val schema = toSqlTypeHelper(avroSchema, Map.empty, useStableIdForUnionType, stableIdPrefixForUnionType, recursiveFieldMaxDepth) - if (schema == null) { - return SchemaType(StructType(Nil), nullable = true) - } + // the top level record should never return null + assert(schema != null) schema } /** @@ -165,15 +164,8 @@ object SchemaConverters extends Logging { StructField(f.name, schemaType.dataType, schemaType.nullable) } }.filter(_ != null).toSeq - fields match { - case Nil => - log.info( - s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + - "does not have any fields left likely due to recursive depth limit." - ) - null - case fds => SchemaType(StructType(fds), nullable = false) - } + + SchemaType(StructType(fields), nullable = false) } case ARRAY => @@ -288,16 +280,7 @@ object SchemaConverters extends Logging { } }.filter(_ != null).toSeq - fields match { - case Nil => - log.info( - s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as " + - "it does not have any fields left likely due to recursive depth limit." - ) - null - - case fds => SchemaType(StructType(fds), nullable = false) - } + SchemaType(StructType(fields), nullable = false) } case other => throw new IncompatibleSchemaException(s"Unsupported type $other") diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 33579c5d9f09..b509030ce3a7 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -2263,7 +2263,27 @@ abstract class AvroSuite assert(sparkSchema === expectedSchema) } - test("Recursive schema resolve to be empty struct") { + test("Translate recursive schema - 1") { + val avroSchema = """ + |{ + | "type": "record", + | "name": "LongList", + | "fields" : [ + | {"name": "value", "type": "long"}, // each element has a long + | {"name": "next", "type": ["null", "LongList"]} // optional next element + | ] + |} + """.stripMargin + val nonRecursiveFields = new StructType().add("value", LongType, nullable = false) + var expectedSchema = nonRecursiveFields + checkSparkSchemaEquals(avroSchema, expectedSchema, 0) + for (i <- 1 to 5) { + checkSparkSchemaEquals(avroSchema, expectedSchema, i) + expectedSchema = nonRecursiveFields.add("next", expectedSchema) + } + } + + test("Translate recursive schema - 2") { val avroSchema = """ |{ | "type": "record", @@ -2285,33 +2305,17 @@ abstract class AvroSuite | ] |} """.stripMargin - val expectedSchema = StructType(Seq()) - for (i <- 0 to 5) { - checkSparkSchemaEquals(avroSchema, expectedSchema, i) - } - } - - test("Translate recursive schema - 1") { - val avroSchema = """ - |{ - | "type": "record", - | "name": "LongList", - | "fields" : [ - | {"name": "value", "type": "long"}, // each element has a long - | {"name": "next", "type": ["null", "LongList"]} // optional next element - | ] - |} - """.stripMargin - val nonRecursiveFields = new StructType().add("value", LongType, nullable = false) + val nonRecursiveFields = new StructType().add("value", StructType(Seq()), nullable = false) var expectedSchema = nonRecursiveFields checkSparkSchemaEquals(avroSchema, expectedSchema, 0) for (i <- 1 to 5) { checkSparkSchemaEquals(avroSchema, expectedSchema, i) - expectedSchema = nonRecursiveFields.add("next", expectedSchema) + expectedSchema = new StructType().add("value", + new StructType().add("parent", expectedSchema, nullable = false), nullable = false) } } - test("Translate recursive schema - 2") { + test("Translate recursive schema - 3") { val avroSchema = """ |{ | "type": "record", @@ -2332,7 +2336,7 @@ abstract class AvroSuite } } - test("Translate recursive schema - 3") { + test("Translate recursive schema - 4") { val avroSchema = """ |{ | "type": "record", @@ -2354,33 +2358,33 @@ abstract class AvroSuite } } - test("dev test") { + test("recursive schema integration test") { val catalystSchema = StructType(Seq( StructField("Id", IntegerType), - StructField("Name", - StructType( - Seq( - StructField("Id", IntegerType), - StructField("Name", StructType(Seq(StructField("Id", IntegerType))))))))) + StructField("Name", StructType(Seq( + StructField("Id", IntegerType), + StructField("Name", StructType(Seq( + StructField("Id", IntegerType), + StructField("Name", NullType))))))))) val avroSchema = s""" - |{ - | "type" : "record", - | "name" : "test_schema", - | "fields" : [ - | {"name": "Id", "type": "int"}, - | {"name": "Name", "type": ["null", "test_schema"]} - | ] - |} + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [ + | {"name": "Id", "type": "int"}, + | {"name": "Name", "type": ["null", "test_schema"]} + | ] + |} """.stripMargin val df = spark.createDataFrame( - spark.sparkContext.parallelize(Seq(Row(2, Row(3, null)), Row(1, null))), + spark.sparkContext.parallelize(Seq(Row(2, Row(3, Row(4, null))), Row(1, null))), catalystSchema) withTempPath { tempDir => - df.write.format("avro").option("avroSchema", avroSchema).save(tempDir.getPath) + df.write.format("avro").save(tempDir.getPath) checkAnswer( spark.read .format("avro") @@ -2388,6 +2392,14 @@ abstract class AvroSuite .option("recursiveFieldMaxDepth", 10) .load(tempDir.getPath), df) + + checkAnswer( + spark.read + .format("avro") + .option("avroSchema", avroSchema) + .option("recursiveFieldMaxDepth", 1) + .load(tempDir.getPath), + df.select("Id")) } } From a64a92768a8eb08cbcf70fa3dc4dd5fdcdff020a Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Thu, 18 Jul 2024 10:10:35 -0700 Subject: [PATCH 05/25] revert change to existing tests --- .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index b509030ce3a7..522598e56f97 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -459,10 +459,8 @@ abstract class AvroSuite val e = intercept[Exception] { checkUnionStableId( List( - Schema.createRecord("myRecord", "", null, false, List[Schema.Field]( - new Schema.Field("F", Schema.create(Type.FLOAT))).asJava), - Schema.createRecord("myrecord", "", null, false, List[Schema.Field]( - new Schema.Field("F", Schema.create(Type.FLOAT))).asJava)), + Schema.createRecord("myRecord", "", null, false, List[Schema.Field]().asJava), + Schema.createRecord("myrecord", "", null, false, List[Schema.Field]().asJava)), "", Seq()) } From bfcec5ea8ff7ffe2036e8dbbaeb02c4740fa67b4 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Thu, 18 Jul 2024 10:35:28 -0700 Subject: [PATCH 06/25] change the handling of max depth = 0 to align with ProtoBuf --- .../scala/org/apache/spark/sql/avro/SchemaConverters.scala | 2 +- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 5c81c852e0b3..e2b3112dbcd3 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -136,7 +136,7 @@ object SchemaConverters extends Logging { case RECORD => val recursiveDepth: Int = existingRecordNames.getOrElse(avroSchema.getFullName, 0) - if (recursiveDepth > 0 && recursiveFieldMaxDepth < 0) { + if (recursiveDepth > 0 && recursiveFieldMaxDepth <= 0) { throw new IncompatibleSchemaException(s""" |Found recursive reference in Avro schema, which can not be processed by Spark: |${avroSchema.toString(true)} diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 522598e56f97..78cdd25ef352 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -2274,7 +2274,6 @@ abstract class AvroSuite """.stripMargin val nonRecursiveFields = new StructType().add("value", LongType, nullable = false) var expectedSchema = nonRecursiveFields - checkSparkSchemaEquals(avroSchema, expectedSchema, 0) for (i <- 1 to 5) { checkSparkSchemaEquals(avroSchema, expectedSchema, i) expectedSchema = nonRecursiveFields.add("next", expectedSchema) @@ -2305,7 +2304,6 @@ abstract class AvroSuite """.stripMargin val nonRecursiveFields = new StructType().add("value", StructType(Seq()), nullable = false) var expectedSchema = nonRecursiveFields - checkSparkSchemaEquals(avroSchema, expectedSchema, 0) for (i <- 1 to 5) { checkSparkSchemaEquals(avroSchema, expectedSchema, i) expectedSchema = new StructType().add("value", @@ -2326,7 +2324,6 @@ abstract class AvroSuite """.stripMargin val nonRecursiveFields = new StructType().add("value", LongType, nullable = false) var expectedSchema = nonRecursiveFields - checkSparkSchemaEquals(avroSchema, expectedSchema, 0) for (i <- 1 to 5) { checkSparkSchemaEquals(avroSchema, expectedSchema, i) expectedSchema = @@ -2347,7 +2344,6 @@ abstract class AvroSuite """.stripMargin val nonRecursiveFields = new StructType().add("value", LongType, nullable = false) var expectedSchema = nonRecursiveFields - checkSparkSchemaEquals(avroSchema, expectedSchema, 0) for (i <- 1 to 5) { checkSparkSchemaEquals(avroSchema, expectedSchema, i) expectedSchema = From e1a2051e72a3a71054f376c44ac8588164c8c973 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Thu, 18 Jul 2024 12:12:42 -0700 Subject: [PATCH 07/25] add one test for from_avro --- .../spark/sql/avro/AvroFunctionsSuite.scala | 32 ++++++++++++++++++- .../org/apache/spark/sql/avro/AvroSuite.scala | 1 - 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index c807685db0f0..7102c00b53a9 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.functions.{col, lit, struct} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} class AvroFunctionsSuite extends QueryTest with SharedSparkSession { import testImplicits._ @@ -367,4 +367,34 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { stop = 138))) } } + + test("roundtrip in to_avro and from_avro - recursive schema") { + val catalystSchema = + StructType(Seq( + StructField("Id", IntegerType), + StructField("Name", StructType(Seq( + StructField("Id", IntegerType), + StructField("Name", StructType(Seq( + StructField("Id", IntegerType))))))))) + + val avroSchema = s""" + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [ + | {"name": "Id", "type": "int"}, + | {"name": "Name", "type": ["null", "test_schema"]} + | ] + |} + """.stripMargin + + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(2, Row(3, Row(4))), Row(1, null))), + catalystSchema).select(struct("Id", "Name").as("struct")) + + val avroStructDF = df.select(functions.to_avro($"struct", avroSchema).as("avro")) + checkAnswer(avroStructDF.select( + functions.from_avro($"avro", avroSchema, Map( + "recursiveFieldMaxDepth" -> "3").asJava)), df) + } } diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 78cdd25ef352..af45b850a9ca 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -2395,7 +2395,6 @@ abstract class AvroSuite .load(tempDir.getPath), df.select("Id")) } - } test("log a warning of ignoreExtension deprecation") { From 29bb2ce258a43ea09f0dd6cb4d884a963931697d Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Thu, 18 Jul 2024 15:18:25 -0700 Subject: [PATCH 08/25] add doc --- .../org/apache/spark/sql/avro/AvroOptions.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 55afa748ed79..8e89eac2b85d 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -137,6 +137,22 @@ private[sql] class AvroOptions( val stableIdPrefixForUnionType: String = parameters .getOrElse(STABLE_ID_PREFIX_FOR_UNION_TYPE, "member_") + /** + * Adds support for recursive fields. If this option is not specified or is set to 1, recursive + * fields are not permitted. Setting it to 1 allows recursive field to be recursed once, + * and 2 allows it to be recursed twice and so on, up to 10. Values larger than 10 are not + * allowed in order avoid inadvertently creating very large schemas. If a avro message has depth + * beyond this limit, the Spark struct returned is truncated after the recursion limit. + * + * Examples. Consider a Avro with a recursive field: + * {"type" : "record", "name" : "node", "fields" : [{"name": "Id", "type": "int"}, + * {"name": "Next", "type": ["null", "node"]}]} + * The following lists the schema with different values for this setting. + * 1: `struct` + * 2: `struct>` + * 3: `struct>>` + * and so on. + */ val recursiveFieldMaxDepth: Int = parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1) } From 1fd5a976688915ca02ccb759fd25e35264e0f04b Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Thu, 18 Jul 2024 15:27:27 -0700 Subject: [PATCH 09/25] give a upper bound to maxDepth as Protobuf --- .../scala/org/apache/spark/sql/avro/SchemaConverters.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index e2b3112dbcd3..92a6e867b3d3 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -136,10 +136,11 @@ object SchemaConverters extends Logging { case RECORD => val recursiveDepth: Int = existingRecordNames.getOrElse(avroSchema.getFullName, 0) - if (recursiveDepth > 0 && recursiveFieldMaxDepth <= 0) { + if (recursiveDepth > 0 && (recursiveFieldMaxDepth <= 0 || recursiveFieldMaxDepth > 10)) { throw new IncompatibleSchemaException(s""" - |Found recursive reference in Avro schema, which can not be processed by Spark: - |${avroSchema.toString(true)} + |Found recursive reference in Avro schema, which can not be processed by Spark by + | default: ${avroSchema.toString(true)}. Try setting the option `recursiveFieldMaxDepth` + | to 0 - 10. Going beyond 10 levels of recursion is not allowed. """.stripMargin) } else if (recursiveDepth > 0 && recursiveDepth >= recursiveFieldMaxDepth) { log.info( From eb0376bbd54ca105e40d5ce5da53c7e58c8abce6 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Thu, 18 Jul 2024 15:30:02 -0700 Subject: [PATCH 10/25] minor --- .../main/scala/org/apache/spark/sql/avro/AvroOptions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 8e89eac2b85d..12978ee83b9f 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -144,10 +144,10 @@ private[sql] class AvroOptions( * allowed in order avoid inadvertently creating very large schemas. If a avro message has depth * beyond this limit, the Spark struct returned is truncated after the recursion limit. * - * Examples. Consider a Avro with a recursive field: + * Examples: Consider an Avro schema with a recursive field: * {"type" : "record", "name" : "node", "fields" : [{"name": "Id", "type": "int"}, * {"name": "Next", "type": ["null", "node"]}]} - * The following lists the schema with different values for this setting. + * The following lists the parsed schema with different values for this setting. * 1: `struct` * 2: `struct>` * 3: `struct>>` From 9fbdd5551a69416190f29139ff6252b4c90a2b0e Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Thu, 18 Jul 2024 15:36:01 -0700 Subject: [PATCH 11/25] minor --- .../src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 12978ee83b9f..be90d52fd292 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -150,7 +150,7 @@ private[sql] class AvroOptions( * The following lists the parsed schema with different values for this setting. * 1: `struct` * 2: `struct>` - * 3: `struct>>` + * 3: `struct>>` * and so on. */ val recursiveFieldMaxDepth: Int = From 52192135490a2a4a025202a44c64ca71c8e4326e Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Mon, 22 Jul 2024 10:58:24 -0700 Subject: [PATCH 12/25] add doc for the option --- docs/sql-data-sources-avro.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index 3721f92d9326..69b1ca30e1d7 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -353,6 +353,13 @@ Data source options of Avro can be set via: read 4.0.0 + + recursiveFieldMaxDepth + -1 + If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 10. Values larger than 10 are not allowed in order avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. + read + 4.0.0 + ## Configuration From 7a6c17d0bfadc1143a40131b1c185783c2bb7144 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Mon, 22 Jul 2024 10:58:45 -0700 Subject: [PATCH 13/25] update the comment for the doc --- .../apache/spark/sql/avro/AvroOptions.scala | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index be90d52fd292..16c43d9c6dd2 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -137,22 +137,6 @@ private[sql] class AvroOptions( val stableIdPrefixForUnionType: String = parameters .getOrElse(STABLE_ID_PREFIX_FOR_UNION_TYPE, "member_") - /** - * Adds support for recursive fields. If this option is not specified or is set to 1, recursive - * fields are not permitted. Setting it to 1 allows recursive field to be recursed once, - * and 2 allows it to be recursed twice and so on, up to 10. Values larger than 10 are not - * allowed in order avoid inadvertently creating very large schemas. If a avro message has depth - * beyond this limit, the Spark struct returned is truncated after the recursion limit. - * - * Examples: Consider an Avro schema with a recursive field: - * {"type" : "record", "name" : "node", "fields" : [{"name": "Id", "type": "int"}, - * {"name": "Next", "type": ["null", "node"]}]} - * The following lists the parsed schema with different values for this setting. - * 1: `struct` - * 2: `struct>` - * 3: `struct>>` - * and so on. - */ val recursiveFieldMaxDepth: Int = parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1) } @@ -190,5 +174,22 @@ private[sql] object AvroOptions extends DataSourceOptions { // of Avro Union type. val STABLE_ID_PREFIX_FOR_UNION_TYPE = newOption("stableIdentifierPrefixForUnionType") + /** + * Adds support for recursive fields. If this option is not specified or is set to 0, recursive + * fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive + * fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 10. + * Values larger than 10 are not allowed in order avoid inadvertently creating very large schemas. + * If an avro message has depth beyond this limit, the Spark struct returned is truncated after + * the recursion limit. + * + * Examples: Consider an Avro schema with a recursive field: + * {"type" : "record", "name" : "Node", "fields" : [{"name": "Id", "type": "int"}, + * {"name": "Next", "type": ["null", "Node"]}]} + * The following lists the parsed schema with different values for this setting. + * 1: `struct` + * 2: `struct>` + * 3: `struct>>` + * and so on. + */ val RECURSIVE_FIELD_MAX_DEPTH = newOption("recursiveFieldMaxDepth") } From 026fd4aab2c1412805228455dcfdeee6cd7af762 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Mon, 22 Jul 2024 15:55:17 -0700 Subject: [PATCH 14/25] change the upper limit to 12 --- .../main/scala/org/apache/spark/sql/avro/AvroOptions.scala | 4 ++-- .../scala/org/apache/spark/sql/avro/SchemaConverters.scala | 4 ++-- docs/sql-data-sources-avro.md | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 16c43d9c6dd2..7fc08add698d 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -177,8 +177,8 @@ private[sql] object AvroOptions extends DataSourceOptions { /** * Adds support for recursive fields. If this option is not specified or is set to 0, recursive * fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive - * fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 10. - * Values larger than 10 are not allowed in order avoid inadvertently creating very large schemas. + * fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 12. + * Values larger than 12 are not allowed in order avoid inadvertently creating very large schemas. * If an avro message has depth beyond this limit, the Spark struct returned is truncated after * the recursion limit. * diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 92a6e867b3d3..a03156192250 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -136,11 +136,11 @@ object SchemaConverters extends Logging { case RECORD => val recursiveDepth: Int = existingRecordNames.getOrElse(avroSchema.getFullName, 0) - if (recursiveDepth > 0 && (recursiveFieldMaxDepth <= 0 || recursiveFieldMaxDepth > 10)) { + if (recursiveDepth > 0 && (recursiveFieldMaxDepth <= 0 || recursiveFieldMaxDepth > 12)) { throw new IncompatibleSchemaException(s""" |Found recursive reference in Avro schema, which can not be processed by Spark by | default: ${avroSchema.toString(true)}. Try setting the option `recursiveFieldMaxDepth` - | to 0 - 10. Going beyond 10 levels of recursion is not allowed. + | to 1 - 12. Going beyond 12 levels of recursion is not allowed. """.stripMargin) } else if (recursiveDepth > 0 && recursiveDepth >= recursiveFieldMaxDepth) { log.info( diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index 69b1ca30e1d7..d096c6b0ff60 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -356,7 +356,7 @@ Data source options of Avro can be set via: recursiveFieldMaxDepth -1 - If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 10. Values larger than 10 are not allowed in order avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. + If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 12. Values larger than 12 are not allowed in order avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. read 4.0.0 From ae13fd0eaa5d44cf79f7aa7bdbae22187307a4eb Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Mon, 22 Jul 2024 15:58:42 -0700 Subject: [PATCH 15/25] increase to 15 --- .../main/scala/org/apache/spark/sql/avro/AvroOptions.scala | 4 ++-- .../scala/org/apache/spark/sql/avro/SchemaConverters.scala | 4 ++-- docs/sql-data-sources-avro.md | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 7fc08add698d..faee37f857e8 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -177,8 +177,8 @@ private[sql] object AvroOptions extends DataSourceOptions { /** * Adds support for recursive fields. If this option is not specified or is set to 0, recursive * fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive - * fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 12. - * Values larger than 12 are not allowed in order avoid inadvertently creating very large schemas. + * fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15. + * Values larger than 15 are not allowed in order avoid inadvertently creating very large schemas. * If an avro message has depth beyond this limit, the Spark struct returned is truncated after * the recursion limit. * diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index a03156192250..dfb8e2540ae7 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -136,11 +136,11 @@ object SchemaConverters extends Logging { case RECORD => val recursiveDepth: Int = existingRecordNames.getOrElse(avroSchema.getFullName, 0) - if (recursiveDepth > 0 && (recursiveFieldMaxDepth <= 0 || recursiveFieldMaxDepth > 12)) { + if (recursiveDepth > 0 && (recursiveFieldMaxDepth <= 0 || recursiveFieldMaxDepth > 15)) { throw new IncompatibleSchemaException(s""" |Found recursive reference in Avro schema, which can not be processed by Spark by | default: ${avroSchema.toString(true)}. Try setting the option `recursiveFieldMaxDepth` - | to 1 - 12. Going beyond 12 levels of recursion is not allowed. + | to 1 - 15. Going beyond 15 levels of recursion is not allowed. """.stripMargin) } else if (recursiveDepth > 0 && recursiveDepth >= recursiveFieldMaxDepth) { log.info( diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index d096c6b0ff60..91278abaa30b 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -356,7 +356,7 @@ Data source options of Avro can be set via: recursiveFieldMaxDepth -1 - If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 12. Values larger than 12 are not allowed in order avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. + If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15. Values larger than 15 are not allowed in order avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. read 4.0.0 From ac2fe30f3eabeb7efb2c14b674cffd9e56edf627 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Tue, 23 Jul 2024 10:16:45 -0700 Subject: [PATCH 16/25] delete dead code --- .../scala/org/apache/spark/sql/avro/SchemaConverters.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index dfb8e2540ae7..a8c0a2a2d756 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -361,12 +361,6 @@ object SchemaConverters extends Logging { schema } } - -// private def convertEmptyAvroToStructWithDummyField(fieldName: String): SchemaType = { -// log.info(s"Keep $fieldName which is empty struct by inserting a dummy field.") -// SchemaType( -// StructType(StructField("__dummy_field_in_empty_struct", StringType) :: Nil), nullable = true) -// } } private[avro] class IncompatibleSchemaException( From f6859ce3f5672303a8d557c6faf094f4d1082ecd Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 24 Jul 2024 09:36:44 -0700 Subject: [PATCH 17/25] naming changes --- .../scala/org/apache/spark/sql/avro/AvroOptions.scala | 6 +++--- .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index faee37f857e8..1abdbe637578 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -178,9 +178,9 @@ private[sql] object AvroOptions extends DataSourceOptions { * Adds support for recursive fields. If this option is not specified or is set to 0, recursive * fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive * fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15. - * Values larger than 15 are not allowed in order avoid inadvertently creating very large schemas. - * If an avro message has depth beyond this limit, the Spark struct returned is truncated after - * the recursion limit. + * Values larger than 15 are not allowed in order to avoid inadvertently creating very large + * schemas. If an avro message has depth beyond this limit, the Spark struct returned is + * truncated after the recursion limit. * * Examples: Consider an Avro schema with a recursive field: * {"type" : "record", "name" : "Node", "fields" : [{"name": "Id", "type": "int"}, diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index af45b850a9ca..17174030a585 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -2261,7 +2261,7 @@ abstract class AvroSuite assert(sparkSchema === expectedSchema) } - test("Translate recursive schema - 1") { + test("Translate recursive schema - union") { val avroSchema = """ |{ | "type": "record", @@ -2280,7 +2280,7 @@ abstract class AvroSuite } } - test("Translate recursive schema - 2") { + test("Translate recursive schema - record") { val avroSchema = """ |{ | "type": "record", @@ -2311,7 +2311,7 @@ abstract class AvroSuite } } - test("Translate recursive schema - 3") { + test("Translate recursive schema - array") { val avroSchema = """ |{ | "type": "record", @@ -2331,7 +2331,7 @@ abstract class AvroSuite } } - test("Translate recursive schema - 4") { + test("Translate recursive schema - map") { val avroSchema = """ |{ | "type": "record", From 5c902a5cf6a1f9b6d3ff6244ccf038bdc74b68ac Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 24 Jul 2024 09:45:20 -0700 Subject: [PATCH 18/25] Move input check to the options --- .../main/scala/org/apache/spark/sql/avro/AvroOptions.scala | 5 +++++ .../scala/org/apache/spark/sql/avro/SchemaConverters.scala | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 1abdbe637578..ce14c95905d4 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -22,6 +22,7 @@ import java.net.URI import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -139,6 +140,10 @@ private[sql] class AvroOptions( val recursiveFieldMaxDepth: Int = parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1) + + if (recursiveFieldMaxDepth > 15) { + throw new IllegalArgumentException(s"Valid range of $RECURSIVE_FIELD_MAX_DEPTH is 0 - 15.") + } } private[sql] object AvroOptions extends DataSourceOptions { diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index a8c0a2a2d756..b2e465fdeb9d 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -136,7 +136,7 @@ object SchemaConverters extends Logging { case RECORD => val recursiveDepth: Int = existingRecordNames.getOrElse(avroSchema.getFullName, 0) - if (recursiveDepth > 0 && (recursiveFieldMaxDepth <= 0 || recursiveFieldMaxDepth > 15)) { + if (recursiveDepth > 0 && recursiveFieldMaxDepth <= 0) { throw new IncompatibleSchemaException(s""" |Found recursive reference in Avro schema, which can not be processed by Spark by | default: ${avroSchema.toString(true)}. Try setting the option `recursiveFieldMaxDepth` From ff767253a425581f8ac7d0d4fc16e5339ac47703 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 24 Jul 2024 10:25:29 -0700 Subject: [PATCH 19/25] Use the new logging framework --- .../org/apache/spark/internal/LogKey.scala | 2 ++ .../apache/spark/sql/avro/AvroOptions.scala | 1 - .../spark/sql/avro/SchemaConverters.scala | 30 +++++++++++-------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 51ef112a677d..dde6926e9440 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -266,6 +266,7 @@ private[spark] object LogKeys { case object FEATURE_NAME extends LogKey case object FETCH_SIZE extends LogKey case object FIELD_NAME extends LogKey + case object FIELD_TYPE extends LogKey case object FILES extends LogKey case object FILE_ABSOLUTE_PATH extends LogKey case object FILE_END_OFFSET extends LogKey @@ -648,6 +649,7 @@ private[spark] object LogKeys { case object RECEIVER_ID extends LogKey case object RECEIVER_IDS extends LogKey case object RECORDS extends LogKey + case object RECURSIVE_DEPTH extends LogKey case object RECOVERY_STATE extends LogKey case object REDACTED_STATEMENT extends LogKey case object REDUCE_ID extends LogKey diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index ce14c95905d4..d93082bb2e11 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -22,7 +22,6 @@ import java.net.URI import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index b2e465fdeb9d..2b2b35fedbf4 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -28,6 +28,8 @@ import org.apache.avro.Schema.Type._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKeys.{FIELD_NAME, FIELD_TYPE, RECURSIVE_DEPTH} +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types._ import org.apache.spark.sql.types.Decimal.minBytesForPrecision @@ -143,9 +145,10 @@ object SchemaConverters extends Logging { | to 1 - 15. Going beyond 15 levels of recursion is not allowed. """.stripMargin) } else if (recursiveDepth > 0 && recursiveDepth >= recursiveFieldMaxDepth) { - log.info( - s"The field ${avroSchema.getFullName} of type ${avroSchema.getType.getName} is " + - s"dropped at recursive depth $recursiveDepth." + logInfo( + log"The field ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " + + log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} is dropped at recursive depth " + + log"${MDC(RECURSIVE_DEPTH, recursiveDepth)}." ) null } else { @@ -177,9 +180,10 @@ object SchemaConverters extends Logging { stableIdPrefixForUnionType, recursiveFieldMaxDepth) if (schemaType == null) { - log.info( - s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + - "does not have any fields left likely due to recursive depth limit." + logInfo( + log"Dropping ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " + + log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} as it does not have any " + + log"fields left likely due to recursive depth limit." ) null } else { @@ -193,9 +197,10 @@ object SchemaConverters extends Logging { existingRecordNames, useStableIdForUnionType, stableIdPrefixForUnionType, recursiveFieldMaxDepth) if (schemaType == null) { - log.info( - s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + - "does not have any fields left likely due to recursive depth limit." + logInfo( + log"Dropping ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " + + log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} as it does not have any " + + log"fields left likely due to recursive depth limit." ) null } else { @@ -225,9 +230,10 @@ object SchemaConverters extends Logging { recursiveFieldMaxDepth) } if (schemaType == null) { - log.info( - s"Dropping ${avroSchema.getFullName} of type ${avroSchema.getType.getName} as it " + - "does not have any fields left likely due to recursive depth limit." + logInfo( + log"Dropping ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " + + log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} as it does not have any " + + log"fields left likely due to recursive depth limit." ) null } else { From 34d37badad152973186f9cf89f7035c5c78d7470 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 24 Jul 2024 10:43:47 -0700 Subject: [PATCH 20/25] minor --- docs/sql-data-sources-avro.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index 91278abaa30b..be3bd87f7b10 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -356,7 +356,7 @@ Data source options of Avro can be set via: recursiveFieldMaxDepth -1 - If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15. Values larger than 15 are not allowed in order avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. + If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15. Values larger than 15 are not allowed in order to avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. read 4.0.0 From 979dc658b927169c6f30df4bae55208c6f4a866e Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 24 Jul 2024 14:14:07 -0700 Subject: [PATCH 21/25] Use the new error framework & add test for wrong input --- .../apache/spark/sql/avro/AvroOptions.scala | 25 +++- .../org/apache/spark/sql/avro/AvroSuite.scala | 121 ++++++++++-------- 2 files changed, 93 insertions(+), 53 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index d93082bb2e11..619128e42a54 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -23,6 +23,7 @@ import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkRuntimeException import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} @@ -141,7 +142,8 @@ private[sql] class AvroOptions( parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1) if (recursiveFieldMaxDepth > 15) { - throw new IllegalArgumentException(s"Valid range of $RECURSIVE_FIELD_MAX_DEPTH is 0 - 15.") + throw AvroOptionsError.avroInvalidOptionValue( + RECURSIVE_FIELD_MAX_DEPTH, "Should not be greater than 15.") } } @@ -197,3 +199,24 @@ private[sql] object AvroOptions extends DataSourceOptions { */ val RECURSIVE_FIELD_MAX_DEPTH = newOption("recursiveFieldMaxDepth") } + +abstract class AvroOptionsException( + errorClass: String, + messageParameters: Map[String, String], + cause: Throwable) + extends SparkRuntimeException( + errorClass, + messageParameters, + cause) + +object AvroOptionsError { + def avroInvalidOptionValue(optionName: String, message: String): AvroInvalidOptionValue = { + new AvroInvalidOptionValue(optionName, message) + } +} + +class AvroInvalidOptionValue(optionName: String, message: String) + extends AvroOptionsException( + "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE", + Map("optionName" -> optionName, "message" -> message), + cause = null) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 17174030a585..2389a37975c7 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -2187,7 +2187,8 @@ abstract class AvroSuite } } - private def checkSchemaWithRecursiveLoop(avroSchema: String): Unit = { + private def checkSchemaWithRecursiveLoop(avroSchema: String, recursiveFieldMaxDepth: Int): + Unit = { val message = intercept[IncompatibleSchemaException] { SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema), false, "") }.getMessage @@ -2196,60 +2197,66 @@ abstract class AvroSuite } test("Detect recursive loop") { - checkSchemaWithRecursiveLoop(""" - |{ - | "type": "record", - | "name": "LongList", - | "fields" : [ - | {"name": "value", "type": "long"}, // each element has a long - | {"name": "next", "type": ["null", "LongList"]} // optional next element - | ] - |} - """.stripMargin) + for (recursiveFieldMaxDepth <- Seq(-1, 0)) { + checkSchemaWithRecursiveLoop( + """ + |{ + | "type": "record", + | "name": "LongList", + | "fields" : [ + | {"name": "value", "type": "long"}, // each element has a long + | {"name": "next", "type": ["null", "LongList"]} // optional next element + | ] + |} + """.stripMargin, recursiveFieldMaxDepth) - checkSchemaWithRecursiveLoop(""" - |{ - | "type": "record", - | "name": "LongList", - | "fields": [ - | { - | "name": "value", - | "type": { - | "type": "record", - | "name": "foo", - | "fields": [ - | { - | "name": "parent", - | "type": "LongList" - | } - | ] - | } - | } - | ] - |} - """.stripMargin) + checkSchemaWithRecursiveLoop( + """ + |{ + | "type": "record", + | "name": "LongList", + | "fields": [ + | { + | "name": "value", + | "type": { + | "type": "record", + | "name": "foo", + | "fields": [ + | { + | "name": "parent", + | "type": "LongList" + | } + | ] + | } + | } + | ] + |} + """.stripMargin, recursiveFieldMaxDepth) - checkSchemaWithRecursiveLoop(""" - |{ - | "type": "record", - | "name": "LongList", - | "fields" : [ - | {"name": "value", "type": "long"}, - | {"name": "array", "type": {"type": "array", "items": "LongList"}} - | ] - |} - """.stripMargin) + checkSchemaWithRecursiveLoop( + """ + |{ + | "type": "record", + | "name": "LongList", + | "fields" : [ + | {"name": "value", "type": "long"}, + | {"name": "array", "type": {"type": "array", "items": "LongList"}} + | ] + |} + """.stripMargin, recursiveFieldMaxDepth) - checkSchemaWithRecursiveLoop(""" - |{ - | "type": "record", - | "name": "LongList", - | "fields" : [ - | {"name": "value", "type": "long"}, - | {"name": "map", "type": {"type": "map", "values": "LongList"}} - | ] - |} - """.stripMargin) + checkSchemaWithRecursiveLoop( + """ + |{ + | "type": "record", + | "name": "LongList", + | "fields" : [ + | {"name": "value", "type": "long"}, + | {"name": "map", "type": {"type": "map", "values": "LongList"}} + | ] + |} + """.stripMargin, recursiveFieldMaxDepth) + } } private def checkSparkSchemaEquals( @@ -2379,6 +2386,16 @@ abstract class AvroSuite withTempPath { tempDir => df.write.format("avro").save(tempDir.getPath) + + val exc = intercept[AvroInvalidOptionValue] { + spark.read + .format("avro") + .option("avroSchema", avroSchema) + .option("recursiveFieldMaxDepth", 16) + .load(tempDir.getPath) + } + assert(exc.getMessage.contains("Should not be greater than 15.")) + checkAnswer( spark.read .format("avro") From 1c7ee68b2ea8907dbb653d7fa90748c7e1811ea6 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 24 Jul 2024 14:14:43 -0700 Subject: [PATCH 22/25] sort LogKey --- .../utils/src/main/scala/org/apache/spark/internal/LogKey.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index dde6926e9440..cc4849971730 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -649,8 +649,8 @@ private[spark] object LogKeys { case object RECEIVER_ID extends LogKey case object RECEIVER_IDS extends LogKey case object RECORDS extends LogKey - case object RECURSIVE_DEPTH extends LogKey case object RECOVERY_STATE extends LogKey + case object RECURSIVE_DEPTH extends LogKey case object REDACTED_STATEMENT extends LogKey case object REDUCE_ID extends LogKey case object REGEX extends LogKey From fbc7d1201afad395c2820d47f3f4344c759983b0 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 24 Jul 2024 14:29:05 -0700 Subject: [PATCH 23/25] create a constant for max limit --- .../main/scala/org/apache/spark/sql/avro/AvroOptions.scala | 7 +++++-- .../scala/org/apache/spark/sql/avro/SchemaConverters.scala | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 619128e42a54..d944d2138260 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -141,9 +141,10 @@ private[sql] class AvroOptions( val recursiveFieldMaxDepth: Int = parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1) - if (recursiveFieldMaxDepth > 15) { + if (recursiveFieldMaxDepth > RECURSIVE_FIELD_MAX_DEPTH_LIMIT) { throw AvroOptionsError.avroInvalidOptionValue( - RECURSIVE_FIELD_MAX_DEPTH, "Should not be greater than 15.") + RECURSIVE_FIELD_MAX_DEPTH, + s"Should not be greater than $RECURSIVE_FIELD_MAX_DEPTH_LIMIT.") } } @@ -198,6 +199,8 @@ private[sql] object AvroOptions extends DataSourceOptions { * and so on. */ val RECURSIVE_FIELD_MAX_DEPTH = newOption("recursiveFieldMaxDepth") + + val RECURSIVE_FIELD_MAX_DEPTH_LIMIT: Int = 15 } abstract class AvroOptionsException( diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 2b2b35fedbf4..b88445d6f316 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -30,6 +30,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.{FIELD_NAME, FIELD_TYPE, RECURSIVE_DEPTH} import org.apache.spark.internal.MDC +import org.apache.spark.sql.avro.AvroOptions.RECURSIVE_FIELD_MAX_DEPTH_LIMIT import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types._ import org.apache.spark.sql.types.Decimal.minBytesForPrecision @@ -142,7 +143,7 @@ object SchemaConverters extends Logging { throw new IncompatibleSchemaException(s""" |Found recursive reference in Avro schema, which can not be processed by Spark by | default: ${avroSchema.toString(true)}. Try setting the option `recursiveFieldMaxDepth` - | to 1 - 15. Going beyond 15 levels of recursion is not allowed. + | to 1 - $RECURSIVE_FIELD_MAX_DEPTH_LIMIT. """.stripMargin) } else if (recursiveDepth > 0 && recursiveDepth >= recursiveFieldMaxDepth) { logInfo( From 5569961db8ff17821076a7c46d137457d47b2ba6 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 24 Jul 2024 14:32:58 -0700 Subject: [PATCH 24/25] minor --- .../spark/sql/avro/SchemaConverters.scala | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index b88445d6f316..01a2a52eb861 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -214,22 +214,19 @@ object SchemaConverters extends Logging { if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { // In case of a union with null, eliminate it and make a recursive call val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema) - val schemaType = + val remainingSchema = if (remainingUnionTypes.size == 1) { - toSqlTypeHelper( - remainingUnionTypes.head, - existingRecordNames, - useStableIdForUnionType, - stableIdPrefixForUnionType, - recursiveFieldMaxDepth) + remainingUnionTypes.head } else { - toSqlTypeHelper( - Schema.createUnion(remainingUnionTypes.asJava), - existingRecordNames, - useStableIdForUnionType, - stableIdPrefixForUnionType, - recursiveFieldMaxDepth) + Schema.createUnion(remainingUnionTypes.asJava) } + val schemaType = toSqlTypeHelper( + remainingSchema, + existingRecordNames, + useStableIdForUnionType, + stableIdPrefixForUnionType, + recursiveFieldMaxDepth) + if (schemaType == null) { logInfo( log"Dropping ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " + From ac5a940b46b65c826041a2cd8c5a8e08a4c8ec96 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 24 Jul 2024 17:38:54 -0700 Subject: [PATCH 25/25] Add a section of example in documentation --- docs/sql-data-sources-avro.md | 40 ++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index be3bd87f7b10..86aa8d0c88a0 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -356,7 +356,7 @@ Data source options of Avro can be set via: recursiveFieldMaxDepth -1 - If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15. Values larger than 15 are not allowed in order to avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. + If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15. Values larger than 15 are not allowed in order to avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. An example of usage can be found in section Handling circular references of Avro fields read 4.0.0 @@ -635,3 +635,41 @@ You can also specify the whole output Avro schema with the option `avroSchema`, decimal + +## Handling circular references of Avro fields +In Avro, a circular reference occurs when the type of a field is defined in one of the parent records. This can cause issues when parsing the data, as it can result in infinite loops or other unexpected behavior. +To read Avro data with schema that has circular reference, users can use the `recursiveFieldMaxDepth` option to specify the maximum number of levels of recursion to allow when parsing the schema. By default, `spark-avro` will not permit recursive fields by setting `recursiveFieldMaxDepth` to -1. However, you can set this option to 1 to 15 if needed. + +Setting `recursiveFieldMaxDepth` to 1 drops all recursive fields, setting it to 2 allows it to be recursed once, and setting it to 3 allows it to be recursed twice. A `recursiveFieldMaxDepth` value greater than 15 is not allowed, as it can lead to performance issues and even stack overflows. + +SQL Schema for the below Avro message will vary based on the value of `recursiveFieldMaxDepth`. + +
+
+This div is only used to make markdown editor/viewer happy and does not display on web + +```avro +
+ +{% highlight avro %} +{ + "type": "record", + "name": "Node", + "fields": [ + {"name": "Id", "type": "int"}, + {"name": "Next", "type": ["null", "Node"]} + ] +} + +// The Avro schema defined above, would be converted into a Spark SQL columns with the following +// structure based on `recursiveFieldMaxDepth` value. + +1: struct +2: struct> +3: struct>> + +{% endhighlight %} +
+``` +
+