-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24822][PySpark] Python support for barrier execution mode #22011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #94302 has finished for PR 22011 at commit
|
|
Test build #94308 has finished for PR 22011 at commit
|
python/pyspark/rdd.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we expose a package private method to get the annotated RDD with isBarrier=True in RDDBarrier, we can implement mapPartitions easily here:
jBarrierRdd = self._jrdd.rdd.barrier().barrierRdd.javaRdd
pyBarrierRdd = RDD(self._jrdd.rdd.barrier().barrierRdd.javaRdd)
pyBarrierRdd.mapPartitions(f, preservesPartitioning)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not necessary to implement Python support.
python/pyspark/rdd.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why we didn't mark the version so far here but we really should .. versionadded:: 2.4.0 here or
@since(2.4)
def barrier(self):
...
python/pyspark/rdd.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto let's add .. versionadded:: 2.4.0 at the end. I guess optionally add them to each API here exposed as well.
python/pyspark/rdd.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: RDDBarrier -> RDD barrier
python/pyspark/rdd.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we match the documentation, or why is it different?
FWIW, for coding block, just `blabla` should be good enough. Nicer if linked properly by like :class:`ClassName`.
python/pyspark/rdd.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docstring?
|
Test build #94512 has finished for PR 22011 at commit
|
|
Test build #94514 has finished for PR 22011 at commit
|
|
retest this please |
|
test this please |
python/pyspark/rdd.py
Outdated
| """ | ||
| def func(s, iterator): | ||
| return f(iterator) | ||
| jBarrierRdd = self._jrdd.rdd().barrier().toJavaRDD() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will materialize the java RDD, which means the map functions before and after barrier will be executed by 2 python workers.
We should not materialize the java RDD here, but just set a isBarrier flag in the pythhon PipelinedRDD.
|
Test build #94530 has finished for PR 22011 at commit
|
|
Test build #94549 has finished for PR 22011 at commit
|
|
test this please |
|
@jiangxb1987 Please mention that tests will be added in a follow-up PR that implements BarrierTaskContext. |
python/pyspark/rdd.py
Outdated
| """ | ||
| return RDDBarrier(self) | ||
|
|
||
| def isBarrier(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have this API in the JVM RDD?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In scala RDD there is a private[spark] isBarrier() function, we don't add this to JavaRDD
|
Test build #94565 has finished for PR 22011 at commit
|
|
Test build #94575 has finished for PR 22011 at commit
|
|
retest this please |
|
Test build #94590 has finished for PR 22011 at commit
|
|
retest this please |
|
Test build #94600 has finished for PR 22011 at commit
|
|
LGTM, merging to master! |
What changes were proposed in this pull request?
This PR add python support for barrier execution mode, thus enable launch a job containing barrier stage(s) from PySpark.
We just forked the existing
RDDBarrierandRDD.barrier()in Python api.How was this patch tested?
Manually tested:
Unit tests will be added in a follow-up PR that implements BarrierTaskContext on python side.