Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2a67fb9
support in main function
eason-yuchen-liu Jul 16, 2024
7dc41e2
Add recursiveFieldMaxDepth option
eason-yuchen-liu Jul 16, 2024
3d8d99e
add unit tests and integration test
eason-yuchen-liu Jul 17, 2024
59a69db
Merge branch 'apache:master' into avro-recursive-schema
eason-yuchen-liu Jul 17, 2024
997ff7e
add integration tests
eason-yuchen-liu Jul 17, 2024
a64a927
revert change to existing tests
eason-yuchen-liu Jul 18, 2024
bfcec5e
change the handling of max depth = 0 to align with ProtoBuf
eason-yuchen-liu Jul 18, 2024
e1a2051
add one test for from_avro
eason-yuchen-liu Jul 18, 2024
29bb2ce
add doc
eason-yuchen-liu Jul 18, 2024
1fd5a97
give a upper bound to maxDepth as Protobuf
eason-yuchen-liu Jul 18, 2024
eb0376b
minor
eason-yuchen-liu Jul 18, 2024
9fbdd55
minor
eason-yuchen-liu Jul 18, 2024
5219213
add doc for the option
eason-yuchen-liu Jul 22, 2024
7a6c17d
update the comment for the doc
eason-yuchen-liu Jul 22, 2024
026fd4a
change the upper limit to 12
eason-yuchen-liu Jul 22, 2024
ae13fd0
increase to 15
eason-yuchen-liu Jul 22, 2024
ac2fe30
delete dead code
eason-yuchen-liu Jul 23, 2024
f6859ce
naming changes
eason-yuchen-liu Jul 24, 2024
5c902a5
Move input check to the options
eason-yuchen-liu Jul 24, 2024
ff76725
Use the new logging framework
eason-yuchen-liu Jul 24, 2024
34d37ba
minor
eason-yuchen-liu Jul 24, 2024
979dc65
Use the new error framework & add test for wrong input
eason-yuchen-liu Jul 24, 2024
1c7ee68
sort LogKey
eason-yuchen-liu Jul 24, 2024
fbc7d12
create a constant for max limit
eason-yuchen-liu Jul 24, 2024
5569961
minor
eason-yuchen-liu Jul 24, 2024
ac5a940
Add a section of example in documentation
eason-yuchen-liu Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ private[spark] object LogKeys {
case object FEATURE_NAME extends LogKey
case object FETCH_SIZE extends LogKey
case object FIELD_NAME extends LogKey
case object FIELD_TYPE extends LogKey
case object FILES extends LogKey
case object FILE_ABSOLUTE_PATH extends LogKey
case object FILE_END_OFFSET extends LogKey
Expand Down Expand Up @@ -649,6 +650,7 @@ private[spark] object LogKeys {
case object RECEIVER_IDS extends LogKey
case object RECORDS extends LogKey
case object RECOVERY_STATE extends LogKey
case object RECURSIVE_DEPTH extends LogKey
case object REDACTED_STATEMENT extends LogKey
case object REDUCE_ID extends LogKey
case object REGEX extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ private[sql] case class AvroDataToCatalyst(
val dt = SchemaConverters.toSqlType(
expectedSchema,
avroOptions.useStableIdForUnionType,
avroOptions.stableIdPrefixForUnionType).dataType
avroOptions.stableIdPrefixForUnionType,
avroOptions.recursiveFieldMaxDepth).dataType
parseMode match {
// With PermissiveMode, the output Catalyst row might contain columns of null values for
// corrupt records, even if some of the columns are not nullable in the user-provided schema.
Expand All @@ -69,7 +70,8 @@ private[sql] case class AvroDataToCatalyst(
dataType,
avroOptions.datetimeRebaseModeInRead,
avroOptions.useStableIdForUnionType,
avroOptions.stableIdPrefixForUnionType)
avroOptions.stableIdPrefixForUnionType,
avroOptions.recursiveFieldMaxDepth)

@transient private var decoder: BinaryDecoder = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,25 @@ private[sql] class AvroDeserializer(
datetimeRebaseSpec: RebaseSpec,
filters: StructFilters,
useStableIdForUnionType: Boolean,
stableIdPrefixForUnionType: String) {
stableIdPrefixForUnionType: String,
recursiveFieldMaxDepth: Int) {

def this(
rootAvroType: Schema,
rootCatalystType: DataType,
datetimeRebaseMode: String,
useStableIdForUnionType: Boolean,
stableIdPrefixForUnionType: String) = {
stableIdPrefixForUnionType: String,
recursiveFieldMaxDepth: Int) = {
this(
rootAvroType,
rootCatalystType,
positionalFieldMatch = false,
RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)),
new NoopFilters,
useStableIdForUnionType,
stableIdPrefixForUnionType)
stableIdPrefixForUnionType,
recursiveFieldMaxDepth)
}

private lazy val decimalConversions = new DecimalConversion()
Expand Down Expand Up @@ -128,7 +131,8 @@ private[sql] class AvroDeserializer(
s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})"

val realDataType = SchemaConverters.toSqlType(
avroType, useStableIdForUnionType, stableIdPrefixForUnionType).dataType
avroType, useStableIdForUnionType, stableIdPrefixForUnionType,
recursiveFieldMaxDepth).dataType

(avroType.getType, catalystType) match {
case (NULL, NullType) => (updater, ordinal, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ private[sql] class AvroFileFormat extends FileFormat
datetimeRebaseMode,
avroFilters,
parsedOptions.useStableIdForUnionType,
parsedOptions.stableIdPrefixForUnionType)
parsedOptions.stableIdPrefixForUnionType,
parsedOptions.recursiveFieldMaxDepth)
override val stopPosition = file.start + file.length

override def hasNext: Boolean = hasNextRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkRuntimeException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
Expand Down Expand Up @@ -136,6 +137,15 @@ private[sql] class AvroOptions(

val stableIdPrefixForUnionType: String = parameters
.getOrElse(STABLE_ID_PREFIX_FOR_UNION_TYPE, "member_")

val recursiveFieldMaxDepth: Int =
parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1)

if (recursiveFieldMaxDepth > RECURSIVE_FIELD_MAX_DEPTH_LIMIT) {
throw AvroOptionsError.avroInvalidOptionValue(
RECURSIVE_FIELD_MAX_DEPTH,
s"Should not be greater than $RECURSIVE_FIELD_MAX_DEPTH_LIMIT.")
}
}

private[sql] object AvroOptions extends DataSourceOptions {
Expand Down Expand Up @@ -170,4 +180,46 @@ private[sql] object AvroOptions extends DataSourceOptions {
// When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure the prefix for fields
// of Avro Union type.
val STABLE_ID_PREFIX_FOR_UNION_TYPE = newOption("stableIdentifierPrefixForUnionType")

/**
* Adds support for recursive fields. If this option is not specified or is set to 0, recursive
* fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive
* fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15.
* Values larger than 15 are not allowed in order to avoid inadvertently creating very large
* schemas. If an avro message has depth beyond this limit, the Spark struct returned is
* truncated after the recursion limit.
*
* Examples: Consider an Avro schema with a recursive field:
* {"type" : "record", "name" : "Node", "fields" : [{"name": "Id", "type": "int"},
* {"name": "Next", "type": ["null", "Node"]}]}
* The following lists the parsed schema with different values for this setting.
* 1: `struct<Id: int>`
* 2: `struct<Id: int, Next: struct<Id: int>>`
* 3: `struct<Id: int, Next: struct<Id: int, Next: struct<Id: int>>>`
* and so on.
*/
val RECURSIVE_FIELD_MAX_DEPTH = newOption("recursiveFieldMaxDepth")

val RECURSIVE_FIELD_MAX_DEPTH_LIMIT: Int = 15
}

abstract class AvroOptionsException(
errorClass: String,
messageParameters: Map[String, String],
cause: Throwable)
extends SparkRuntimeException(
errorClass,
messageParameters,
cause)

object AvroOptionsError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to QueryCompilationErrors. There are some Avro errors in it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @hkulyc

def avroInvalidOptionValue(optionName: String, message: String): AvroInvalidOptionValue = {
new AvroInvalidOptionValue(optionName, message)
}
}

class AvroInvalidOptionValue(optionName: String, message: String)
extends AvroOptionsException(
"STDS_INVALID_OPTION_VALUE.WITH_MESSAGE",
Map("optionName" -> optionName, "message" -> message),
cause = null)
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ private[sql] object AvroUtils extends Logging {
SchemaConverters.toSqlType(
avroSchema,
parsedOptions.useStableIdForUnionType,
parsedOptions.stableIdPrefixForUnionType).dataType match {
parsedOptions.stableIdPrefixForUnionType,
parsedOptions.recursiveFieldMaxDepth).dataType match {
case t: StructType => Some(t)
case _ => throw new RuntimeException(
s"""Avro schema cannot be converted to a Spark SQL StructType:
Expand Down
Loading