- 
                Notifications
    You must be signed in to change notification settings 
- Fork 28.9k
[SPARK-29503][SQL] Remove conversion CreateNamedStruct to CreateNamedStructUnsafe #26173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Credit to @aaronlewism to report this issue and provide simple reproducer. Please let me know if we would like to apply checking type recursively and call copy() to all unsafe data structs. | 
| Test build #112311 has finished for PR 26173 at commit  
 | 
| retest this, please | 
| Test build #112313 has finished for PR 26173 at commit  
 | 
| Test build #112314 has finished for PR 26173 at commit  
 | 
| val runInsideLoop = expressions.exists { | ||
| case _: LambdaVariable => true | ||
| case _ => false | ||
| } | ||
| val extractValueCode = if (runInsideLoop) { | ||
| s"$rowWriter.getRow().copy()" | ||
| } else { | ||
| s"$rowWriter.getRow()" | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice solution! Correct me if I'm wrong, but I believe this means it should be safe to get rid of the top-level copy from applying makeCopyIfInstanceOf in MapObjects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not an expert on SQL area (limited knowledge) so I'm not sure this is only one path - I'd feel safe if experts in SQL area jump in and review the approach. If we are uncertain and don't feel safe to fix for discovered case, I may need to make more change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that we should fix CodegenContext.generateExpressionAndCopyIfUnsafe. To simplify codegen, maybe we can add copyUnsafeData to InternalRow/ArrayData/MapData.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to follow up review comment clearly, there's no CodegenContext,generateExpressionAndCopyIfUnsafe. Maybe you referred to makeCopyIfInstanceOf inside of MapObjects.doGenCode, or suggest to create a new method CodegenContext,generateExpressionAndCopyIfUnsafe?
| cc. @cloud-fan @maropu @viirya | 
| } | ||
|  | ||
| test("SPARK-29503 nest unsafe struct inside safe array") { | ||
| withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This issue is encountered only when whole stage codegen is disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least yes for provided reproducer. For other case I'm not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. For whole stage codegen, CreateNamedStruct is not converted to CreateNamedStructUnsafe, so the nested struct is not unsafe one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK got it. Thanks for explanation.
8b2be71    to
    39d0a2e      
    Compare
  
    | I've changed the approach to what @cloud-fan had suggested. It has to pass a data type to check nested struct and copy unsafe data, so it needs to serialize data type to JSON and deserialize in method. Hope this is OK given we only do once per generating code for MapObjects. I'd like to see whether the direction is OK - if we agree with the direction, I'll try to refine the code as well as add some UTs for new methods, and change the title and description of PR. In case of rolling back, previous commit head is here: | 
| Test build #112391 has finished for PR 26173 at commit  
 | 
39d0a2e    to
    07514a1      
    Compare
  
    | Test build #112395 has finished for PR 26173 at commit  
 | 
| Looks like escaping JSON doesn't seem to work consistently during compiling generated code. 
 
 
 
 
 
 
 
 
 So double escaping seems to be only working solution for new UT, but consistently fails on ALSSuite -  | 
| Using catalog string instead of json seems to work on both tests. Let me see the build result. | 
| s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value" | ||
| // Make a copy of the unsafe data if the result contains any | ||
| def makeCopyUnsafeData(dataType: DataType, value: String) = { | ||
| s"""${value}.copyUnsafeData("${dataType.catalogString}")""" | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this safe for codgen? For complex type, will we bump into a very long string here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it depends on how long would we classify as very long string. JSON is much easier to possibly create very long string, catalog string would create less. I agree the approach is questionable, but even getting a value from field requires schema for InternalRow so I'm not sure there's alternative approach for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may be able to let CodeGenerator go through the schema and generate code based on the schema, but I'd suspect the overall amount of generated code could be larger as generated code cannot leverage schema information and we have to generate all the codes accessing and replacing fields recursively.
And looks like string literal has 64k limit, which doesn't seem to be short - it's also the limitation of method length as well. We may be even be able to try to apply trick here, define this in static field or field of class, and assign to one String instance via splitting by chunk (less than 64k) and concatenating to get over the limitation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the approach depends on how long of catalog string we are expecting for very complex type.
- If we expect this as not considerable size, it would work as it is. We might be able to refine a bit as defining local variable and assign from there, but it may require additional conditional logic.
- If we expect less than 64k but considerable size and we don't want to let it consume the length of method limitation, we may need to define it to be one of static fields.
- If we expect more than 64k, we should have it to be one of static fields, and also break catalog string down to chunk, and concat to one of String instance.
Do we have some huge and complicated but realistic (in production) schema to test this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this current approach seems overkill. Also I suspect possible performance downgrade. Can we avoid convert CreateNamedStruct to CreateNamedStructUnsafe in this special case?
| Would it work if we replace  I'm not sure we don't miss anything as it seems to only deal with struct type, but what we discovered for now is struct type, so the approach makes sense to me for resolving SPARK-29503. Just a view of as non-expert of SQL, current approach makes me feel safer though, in the sense of defensive programming. I would wait to hear more voices before making change so we don't do back and forth. Thanks for the suggestion! | 
| Test build #112413 has finished for PR 26173 at commit  
 | 
4c38ffc    to
    6187d99      
    Compare
  
    | Test build #112426 has finished for PR 26173 at commit  
 | 
| it's safer to make the  How many places do we create mixed safe and unsafe data? It looks to me that there is only one:  Furthermore, I don't quite understand why we need  | 
… CreateNamedStructUnsafe
6187d99    to
    d8d3900      
    Compare
  
    | I quickly checked the usage of  So I'm seeing the same, CreateNamedStructUnsafe doesn't seem to be needed at all. As a side effect  I've just removed them and rebased the branch to contain only the change. The new test passes with new change, so let's see the build result. Old commit hash to revert: 6187d99 | 
| OFFTOPIC: I've indicated  | 
| looks a much simpler. if no other concern, this approach looks good. | 
| Test build #112485 has finished for PR 26173 at commit  
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm late. The current one looks ok to me, too. I left super minor comments.
| assert(result.size === 1) | ||
| assert(getValueInsideDepth(result.head, 0) === 1) | ||
| assert(getValueInsideDepth(result.head, 1) === 2) | ||
| assert(getValueInsideDepth(result.head, 2) === 3) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:  just check assert(result === Row(Seq(Seq(Row(1)), Seq(Row(2)), Seq(Row(3)))) :: Nil)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|  | ||
| test("SPARK-29503 nest unsafe struct inside safe array") { | ||
| withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { | ||
| val exampleDS = spark.sparkContext.parallelize(Seq(Seq(1, 2, 3))).toDF("items") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super...nit.., exampleDF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or simply df
| Ur, for better commit logs, can you describe a change of generated code for MapObjects in the PR decription? | 
| Thanks for reviewing. Just addressed review comments. | 
| import scala.collection.mutable | ||
|  | ||
| import org.apache.spark.sql.catalyst.DefinedByConstructorParams | ||
| import org.apache.spark.sql.catalyst.expressions.{Expression, GenericRowWithSchema} | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: plz remove GenericRowWithSchema
|  | ||
| package org.apache.spark.sql | ||
|  | ||
| import scala.collection.mutable | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: plz remove this import.
| Test build #112508 has finished for PR 26173 at commit  
 | 
| Test build #112509 has finished for PR 26173 at commit  
 | 
| retest this please | 
| Test build #112520 has finished for PR 26173 at commit  
 | 
| thanks, merging to master! | 
| Thanks all for reviewing and merging! | 
| // items: Seq[Int] => items.map { item => Seq(Struct(item)) } | ||
| val result = df.select( | ||
| new Column(MapObjects( | ||
| (item: Expression) => array(struct(new Column(item))).expr, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, while fix seems fine to me too, was this only the reproducer? MapObjects is supposed to be internal purpose - it's under catalyst package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't spent another time to try it (as it seems to be clean and simple reproducer). I'm not sure it's not going to be valid reproducer just due to pulling catalyst package. Catalyst could analyze the query and inject it if necessary in any way.
I indicated you'd like to revisit #25745 - that was WIP and it didn't have any number of performance gain. I'd rather choose "safeness" over "speed", and even we haven't figured out there's outstanding difference between twos. It was the only one case MapObjects could have unsafe struct, by allowing this, safe and unsafe are possibly mixed up leading to encounter corner case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I am not against this change. In that way, I think this fix is fine but wanted to know if this actually affects any user-facing surface.
Was also wondering if we can benefit from #25745 since some investigations look already made there to completely use unsafe one instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My guess of #25745 is, it was based on the assumption that it's safe to replace CreateNamedStruct with CreateNamedStructUnsafe as we already have one path to do this - and this observation broke the assumption. IMHO, once we found it's not safe to do this, the improvement has to prove safety before we take its benefits into account.
| Hi, All. | 
| Is this an internal bug? (by referring to the @HyukjinKwon suggestion: https://github.com/apache/spark/pull/26173/files#r339915780) | 
| Do you mean that this PR doesn't fix anything to user sides with the final results? 
 | 
| If users don't use internal objects ( | 
| @maropu Please see the example in the JIRA. 
 | 
| 
 | 
| Ah, I see. Probably, I don't clearly understand which component  | 
| No~ Thank you for feedback, @maropu . The reason why I'm asking around (or pushing more) is that there was no consensus on it. Since RC1 failure, I'm collecting them including yours. | 
| Ah, I see, ok. Thanks for the kind explanation! Yea, its important to discuss more about it. | 
| I also agree that catalyst thing has been treated as a non-public API, but it seems to be no harm to port back this to 2.4 branch. (And even we treated catalyst as non-public API, looks like end users don't recognize the fact and use it.) No strong opinion though - OK in either way. I can follow up if we decide to port back. | 
What changes were proposed in this pull request?
There's a case where MapObjects has a lambda function which creates nested struct - unsafe data in safe data struct. In this case, MapObjects doesn't copy the row returned from lambda function (as outmost data type is safe data struct), which misses copying nested unsafe data.
The culprit is that
UnsafeProjection.toUnsafeExprsconvertsCreateNamedStructtoCreateNamedStructUnsafe(this is the only place whereCreateNamedStructUnsafeis used) which incurs safe and unsafe being mixed up temporarily, which may not be needed at all at least logically, as it will finally assembly these evaluations toUnsafeRow.Why are the changes needed?
This patch fixes the bug described above.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT added which fails on master branch and passes on PR.