Skip to content

Conversation

@eason-yuchen-liu
Copy link
Contributor

What changes were proposed in this pull request?

The builtin ProtoBuf connector first supports recursive schema reference. It is approached by letting users specify an option “recursive.fields.max.depth”, and at the start of the execution, unroll the recursive field by this level. It converts a problem of dynamic schema for each row to a fixed schema which is supported by Spark. Avro can just adopt a similar method. This PR defines an option "recursiveFieldMaxDepth" to both Avro data source and from_avro function. With this option, Spark can support Avro recursive schema up to certain depth.

Why are the changes needed?

Recursive reference denotes the case that the type of a field can be defined before in the parent nodes. A simple example is:

{
  "type": "record",
  "name": "LongList",
  "fields" : [
    {"name": "value", "type": "long"},
    {"name": "next", "type": ["null", "LongList"]}
  ]
}

This is written in Avro Schema DSL and represents a linked list data structure. Spark currently will throw an error on this schema. Many users used schema like this, so we should support it.

Does this PR introduce any user-facing change?

Yes. Previously, it will throw error on recursive schemas like above. With this change, it will still throw the same error by default but when users specify the option to a number greater than 0, the schema will be unrolled to that depth.

How was this patch tested?

Added new unit tests and integration tests to AvroSuite and AvroFunctionSuite.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the DOCS label Jul 22, 2024
@eason-yuchen-liu eason-yuchen-liu marked this pull request as ready for review July 23, 2024 17:15
@eason-yuchen-liu
Copy link
Contributor Author

@WweiL @bogao007 PTAL. Thanks!

Copy link
Contributor

@WweiL WweiL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Left some comments

* Adds support for recursive fields. If this option is not specified or is set to 0, recursive
* fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive
* fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15.
* Values larger than 15 are not allowed in order avoid inadvertently creating very large schemas.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does protobuf also have max depth of 15?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this should be a spark conf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protobuf has max depth of 10 and it is hardcoded. 15 is used because some users have demand for up to 12 and 3 more is given as buffer. I agree that it will be better if users can increase the max depth at will. Since Protobuf also does not support it. This config can be added in a future PR for both Protobuf and Avro.

false,
"")
"",
-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just add a default value in the definition to prevent multiple API change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I have thought about it. I did not add default value for two reasons. First is some newly added options (stableIdPrefixForUnionType) did not specify a default value either. Second is there are two constructors for the class, one with 5 arguements, the other with 7 arguements, if we were adding default values for both of them, there will be a clash of definition which is confusing.

throw new IncompatibleSchemaException(s"""
|Found recursive reference in Avro schema, which can not be processed by Spark:
|${avroSchema.toString(true)}
|Found recursive reference in Avro schema, which can not be processed by Spark by
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC protobuf does similar here. But this logic looks a bit weird. If we do want to limit the max recursive depth, I feel that it should be checked in the option and throw IllegalArgumentException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

""".stripMargin)
}

private def checkSparkSchemaEquals(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also have some boundary checks (< 0, > 15)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value can be negative. I will add a test for > 15 case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it in the integration test to minimize code.

val recursiveFieldMaxDepth: Int =
parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1)

if (recursiveFieldMaxDepth > 15) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 15 the max depth we allow? Can we using a constant value like RECURSIVE_FIELD_DEPTH_LIMIT to represent it?

parameters.get(RECURSIVE_FIELD_MAX_DEPTH).map(_.toInt).getOrElse(-1)

if (recursiveFieldMaxDepth > 15) {
throw new IllegalArgumentException(s"Valid range of $RECURSIVE_FIELD_MAX_DEPTH is 0 - 15.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we follow the error class strategy to classify this error? You can refer to something like this, but consider creating your own type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Contributor

@WweiL WweiL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Contributor

@bogao007 bogao007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, thanks for adding this support! Left a comment regarding the doc.

}
} else if (recursiveDepth > 0 && recursiveDepth >= recursiveFieldMaxDepth) {
logInfo(
log"The field ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior for Protobuf? Do we drop the fields or do we throw errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protobuf also drops the field and logs the action.

<td>read</td>
<td>4.0.0</td>
</tr>
<tr>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the doc for the new option! Apart from this, should we add a block on recursion support similar to what we do for protobuf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

Copy link
Contributor

@bogao007 bogao007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@eason-yuchen-liu
Copy link
Contributor Author

@cloud-fan Could you please review it? Thanks!

@HyukjinKwon
Copy link
Member

cc @HeartSaVioR and @rangadi

@HeartSaVioR
Copy link
Contributor

@gengliangwang Would you mind helping reviewing the change as you've been one of the main reviewers for Avro? I can give a try, but I don't feel like I'm qualified to review and sign-off.

@HeartSaVioR
Copy link
Contributor

Friendly reminder, @gengliangwang

@gengliangwang
Copy link
Member

@HeartSaVioR Thanks for the ping. I will find time to review this one recently.

@WweiL
Copy link
Contributor

WweiL commented Aug 15, 2024

Thanks for the review routing! When it's convenient, can people assign this PR to @WweiL and @hkulyc so that we can keep track of reviews? Thanks again!

messageParameters,
cause)

object AvroOptionsError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to QueryCompilationErrors. There are some Avro errors in it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @hkulyc

stableIdPrefixForUnionType: String): SchemaType = {
toSqlTypeHelper(avroSchema, Set.empty, useStableIdForUnionType, stableIdPrefixForUnionType)
stableIdPrefixForUnionType: String,
recursiveFieldMaxDepth: Int = -1): SchemaType = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add code comment for each parameter and explain -1 means not supported.

else {
StructField(f.name, schemaType.dataType, schemaType.nullable)
}
}.filter(_ != null).toSeq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to keep the null fields?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to drop them because StructType does not take an array that has null values. One alternative is to wrap the null value with StructField(..., nullable = true), but we are not doing it for Protobuf. In Protobuf, we just directly drop them.

} else {
s"member$i"
}
val fieldName = if (useStableIdForUnionType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case for this code branch?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are talking about useStableIdForUnionType, the code was originally added by another PR: #44964. And I believe the test is already included in that PR.

Copy link

@hkulyc hkulyc Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR only wraps this block with a condition checking whether the child type is null or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the recursive union schema with more than 2 non-null fields.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recursive and non-recursive union schema shares this code branch. For non-recursive schema we have this test that covers this branch.

test("SPARK-48545: from_avro and to_avro SQL functions") {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hkulyc yes, what about the recursive union schema?
For example

{
  "type": "record",
  "name": "TreeNode",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "value",
      "type": [
        "string", 
        "int"
      ]
    },
    {
      "name": "children",
      "type": [
        "null",
        {
          "type": "array",
          "items": "TreeNode"
        }
      ],
      "default": null
    }
  ]
}

Do we have a test case for that?

Copy link

@hkulyc hkulyc Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a test case for a simplified version of this:

test("Translate recursive schema - union") {

Do you think it is sufficient? Thanks for pointing this out. @gengliangwang

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it is sufficient?

No, it doesn't covert the case when the length of non-null fields > 1

gengliangwang pushed a commit that referenced this pull request Sep 18, 2024
Continue the discussion from #47425 to this PR because I can't push to Yuchen's account

###  What changes were proposed in this pull request?
The builtin ProtoBuf connector first supports recursive schema reference. It is approached by letting users specify an option “recursive.fields.max.depth”, and at the start of the execution, unroll the recursive field by this level. It converts a problem of dynamic schema for each row to a fixed schema which is supported by Spark. Avro can just adopt a similar method. This PR defines an option "recursiveFieldMaxDepth" to both Avro data source and from_avro function. With this option, Spark can support Avro recursive schema up to certain depth.

### Why are the changes needed?
Recursive reference denotes the case that the type of a field can be defined before in the parent nodes. A simple example is:
```
{
  "type": "record",
  "name": "LongList",
  "fields" : [
    {"name": "value", "type": "long"},
    {"name": "next", "type": ["null", "LongList"]}
  ]
}
```
This is written in Avro Schema DSL and represents a linked list data structure. Spark currently will throw an error on this schema. Many users used schema like this, so we should support it.

### Does this PR introduce any user-facing change?
Yes. Previously, it will throw error on recursive schemas like above. With this change, it will still throw the same error by default but when users specify the option to a number greater than 0, the schema will be unrolled to that depth.

### How was this patch tested?
Added new unit tests and integration tests to AvroSuite and AvroFunctionSuite.

### Was this patch authored or co-authored using generative AI tooling?
No.

Co-authored-by: Wei Liu <wei.liudatabricks.com>

Closes #48043 from WweiL/yuchen-avro-recursive-schema.

Lead-authored-by: Yuchen Liu <[email protected]>
Co-authored-by: Wei Liu <[email protected]>
Co-authored-by: Yuchen Liu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
@WweiL
Copy link
Contributor

WweiL commented Sep 24, 2024

With this one merged #48043
This PR can be closed. Thanks for everyone who worked / reviewed this PR!

attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
Continue the discussion from apache#47425 to this PR because I can't push to Yuchen's account

###  What changes were proposed in this pull request?
The builtin ProtoBuf connector first supports recursive schema reference. It is approached by letting users specify an option “recursive.fields.max.depth”, and at the start of the execution, unroll the recursive field by this level. It converts a problem of dynamic schema for each row to a fixed schema which is supported by Spark. Avro can just adopt a similar method. This PR defines an option "recursiveFieldMaxDepth" to both Avro data source and from_avro function. With this option, Spark can support Avro recursive schema up to certain depth.

### Why are the changes needed?
Recursive reference denotes the case that the type of a field can be defined before in the parent nodes. A simple example is:
```
{
  "type": "record",
  "name": "LongList",
  "fields" : [
    {"name": "value", "type": "long"},
    {"name": "next", "type": ["null", "LongList"]}
  ]
}
```
This is written in Avro Schema DSL and represents a linked list data structure. Spark currently will throw an error on this schema. Many users used schema like this, so we should support it.

### Does this PR introduce any user-facing change?
Yes. Previously, it will throw error on recursive schemas like above. With this change, it will still throw the same error by default but when users specify the option to a number greater than 0, the schema will be unrolled to that depth.

### How was this patch tested?
Added new unit tests and integration tests to AvroSuite and AvroFunctionSuite.

### Was this patch authored or co-authored using generative AI tooling?
No.

Co-authored-by: Wei Liu <wei.liudatabricks.com>

Closes apache#48043 from WweiL/yuchen-avro-recursive-schema.

Lead-authored-by: Yuchen Liu <[email protected]>
Co-authored-by: Wei Liu <[email protected]>
Co-authored-by: Yuchen Liu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
Continue the discussion from apache#47425 to this PR because I can't push to Yuchen's account

###  What changes were proposed in this pull request?
The builtin ProtoBuf connector first supports recursive schema reference. It is approached by letting users specify an option “recursive.fields.max.depth”, and at the start of the execution, unroll the recursive field by this level. It converts a problem of dynamic schema for each row to a fixed schema which is supported by Spark. Avro can just adopt a similar method. This PR defines an option "recursiveFieldMaxDepth" to both Avro data source and from_avro function. With this option, Spark can support Avro recursive schema up to certain depth.

### Why are the changes needed?
Recursive reference denotes the case that the type of a field can be defined before in the parent nodes. A simple example is:
```
{
  "type": "record",
  "name": "LongList",
  "fields" : [
    {"name": "value", "type": "long"},
    {"name": "next", "type": ["null", "LongList"]}
  ]
}
```
This is written in Avro Schema DSL and represents a linked list data structure. Spark currently will throw an error on this schema. Many users used schema like this, so we should support it.

### Does this PR introduce any user-facing change?
Yes. Previously, it will throw error on recursive schemas like above. With this change, it will still throw the same error by default but when users specify the option to a number greater than 0, the schema will be unrolled to that depth.

### How was this patch tested?
Added new unit tests and integration tests to AvroSuite and AvroFunctionSuite.

### Was this patch authored or co-authored using generative AI tooling?
No.

Co-authored-by: Wei Liu <wei.liudatabricks.com>

Closes apache#48043 from WweiL/yuchen-avro-recursive-schema.

Lead-authored-by: Yuchen Liu <[email protected]>
Co-authored-by: Wei Liu <[email protected]>
Co-authored-by: Yuchen Liu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
@github-actions
Copy link

github-actions bot commented Jan 3, 2025

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 3, 2025
@github-actions github-actions bot closed this Jan 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants