-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-33567][SQL] DSv2: Use callback instead of passing Spark session and v2 relation for refreshing cache #30491
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
de468e5 to
7a9b72d
Compare
|
Test build #131696 has started for PR 30491 at commit |
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Show resolved
Hide resolved
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. I'll merge this in a few days, unless there are objections from others.
| writeOptions: CaseInsensitiveStringMap, | ||
| query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { | ||
| query: SparkPlan, | ||
| afterWrite: () => Unit = () => ()) extends V2TableWriteExec with BatchWriteHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to not have the default parameter value when unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for @cloud-fan 's comment.
| r.table.asWritable match { | ||
| case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => | ||
| AppendDataExecV1(v1, writeOptions.asOptions, query, r) :: Nil | ||
| AppendDataExecV1(v1, writeOptions.asOptions, query, afterWrite = refreshCache) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we omit afterWrite = ?
|
Test build #131817 has finished for PR 30491 at commit
|
| writeOptions: CaseInsensitiveStringMap, | ||
| plan: LogicalPlan, | ||
| v2Relation: DataSourceV2Relation) extends V1FallbackWriters { | ||
| afterWrite: () => Unit) extends V1FallbackWriters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use a consistent name "refreshCache"?
| writeOptions: CaseInsensitiveStringMap, | ||
| plan: LogicalPlan, | ||
| v2Relation: DataSourceV2Relation) extends V1FallbackWriters { | ||
| afterWrite: () => Unit) extends V1FallbackWriters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| session, r.table.asWritable, r, writeOptions.asOptions, planLater(query)) :: Nil | ||
| r.table.asWritable, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil | ||
|
|
||
| case DeleteFromTable(relation, condition) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For DeleteFromTable, do we need to invalidate cache too? Doesn't this command also update table data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes good catch! I think we should. I'll work on this in a separate PR.
|
Test build #131911 has finished for PR 30491 at commit
|
|
retest this please |
| catalog.invalidateTable(ident) | ||
|
|
||
| // invalidate all caches referencing the given table | ||
| // TODO(SPARK-33437): re-cache the table itself once we support caching a DSv2 table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunchao let's also fix this TODO in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will do that soon.
|
GA passed, merging to master! |
|
Test build #131951 has finished for PR 30491 at commit
|
What changes were proposed in this pull request?
This replaces Spark session and
DataSourceV2Relationin V2 write plans by replacing them with a callbackafterWrite.Why are the changes needed?
Per discussion in #30429, it's better to not pass Spark session and
DataSourceV2Relationthrough Spark plans. Instead we can use a callback which makes the interface cleaner.Does this PR introduce any user-facing change?
No
How was this patch tested?
N/A