-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-40981][CONNECT][PYTHON] Support session.range in Python client #38460
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
python/pyspark/sql/connect/client.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we can use step: int = 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
furthermore, i think we can make step a required field in proto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm we are not marking step as required because Scala side implementation does not treat it as required and thus it has also a default value.
private def transformRange(rel: proto.Range): LogicalPlan = {
val start = rel.getStart
val end = rel.getEnd
val step = if (rel.hasStep) {
rel.getStep.getStep
} else {
1
}
val numPartitions = if (rel.hasNumPartitions) {
rel.getNumPartitions.getNumPartitions
} else {
session.leafNodeDefaultParallelism
}
logical.Range(start, end, step, numPartitions)
}
Same for numPartitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about adding a new case like range(start=10, end=20) and check:
1, step is set 1 (if we make the default value 1);
2, num_partitions not set;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this test case but only test step and num_partitions is not set.
Right now there is a division between client and server such that:
- Client take care of required fields, meaning that clients need to make sure the required fields are set.
- Server side take care of default values for optional fields.
This is to reduce load for both sides of implementation:
- clients do not need to worry about default values for optional fields unless the default value is exposed on the DataFrame API already.
- Server side do not care for whether required field is set (clients enforce it) but server side tracks the default value for optional fields. This can also avoid that clients side to set different default value. The default values are documented in proto:
// it is set, or 2) spark default parallelism.
59759a9 to
2bad9fd
Compare
|
merged into master |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM2
### What changes were proposed in this pull request? This PR adds `range` API to Python client's `RemoteSparkSession` with tests. This PR also updates `start`, `end`, `step` to `int64` in the Connect proto. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes apache#38460 from amaliujia/SPARK-40981. Authored-by: Rui Wang <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
What changes were proposed in this pull request?
This PR adds
rangeAPI to Python client'sRemoteSparkSessionwith tests.This PR also updates
start,end,steptoint64in the Connect proto.Why are the changes needed?
Improve API coverage.
Does this PR introduce any user-facing change?
NO
How was this patch tested?
UT