-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-12042] Python API for mllib.stat.test.StreamingTest #11374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc: @feynmanliang |
|
Test build #51989 has finished for PR 11374 at commit
|
|
test it please |
|
Test build #52523 has finished for PR 11374 at commit
|
|
Test build #58598 has finished for PR 11374 at commit
|
|
Any updates to this PR? |
|
Test build #67696 has finished for PR 11374 at commit
|
|
I'll review this tonight |
|
Apologies for the delay, I am traveling but I'll get this done this weekend. |
feynmanliang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a first pass.
It's been awhile since I've looked at PySpark so I may be a bit rusty on some things.
|
|
||
| """ | ||
| Create a DStream that contains several RDDs to show the StreamingTest of PySpark. | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like other examples are including a from __future__ import print_function here
| sc = SparkContext(appName="PythonStreamingTestExample") | ||
| ssc = StreamingContext(sc, 1) | ||
|
|
||
| checkpoint_path = tempfile.mkdtemp() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary?
| from pyspark.mllib.stat.test import BinarySample, StreamingTest | ||
|
|
||
| if __name__ == "__main__": | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't include newline here
|
|
||
| from pyspark import SparkContext | ||
| from pyspark.streaming import StreamingContext | ||
| from pyspark.mllib.stat.test import BinarySample, StreamingTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$example on$ and $example off appear to be used in other examples, though I'm not sure why myself
| ssc.checkpoint(checkpoint_path) | ||
|
|
||
| # Create the queue through which RDDs can be pushed to a QueueInputDStream. | ||
| rdd_queue = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use camelCase to be consistent with other examples
| """ | ||
|
|
||
| checkpoint_path = tempfile.mkdtemp() | ||
| self.ssc.checkpoint(checkpoint_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary?
| input_stream = self.ssc.queueStream(rdd_queue) | ||
|
|
||
| model = StreamingTest() | ||
| model.setPeacePeriod(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we break this into another test just for model params like
spark/python/pyspark/mllib/tests.py
Line 1165 in 39e2bad
| def test_model_params(self): |
| } | ||
| } | ||
|
|
||
| private[python] class StreamingTestResultPickler extends BasePickler[StreamingTestResult] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to test these in PythonMLLibAPISuite?
| streamingTest.setTestMethod(self._testMethod) | ||
|
|
||
| javaDStream = sc._jvm.SerDe.pythonToJava(data._jdstream, True) | ||
| testResult = streamingTest.registerStream(javaDStream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need pythonToJava and javaToPython; its not used for streaming K means
spark/python/pyspark/mllib/clustering.py
Line 773 in 39e2bad
| updatedModel = callMLlibFunc( |
| */ | ||
| @Since("1.6.0") | ||
| private[stat] class StreamingTestResult @Since("1.6.0") ( | ||
| class StreamingTestResult @Since("1.6.0") ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be public? Java API doesn't seem to require it
|
Hi @yinxusen, are you able to proceed this further? If not, it seems it might be better closed for now. |
What changes were proposed in this pull request?
The patch adds python API for mllib.stat.test.StreamingTest under JIRA https://issues.apache.org/jira/browse/SPARK-12042.
Note that for
StreamingTestResult, unlike other test results in Python, I define it as a normal Python class which doesn't extend fromTestResultwith a_java_objin it.How was this patch tested?
The patch is tested with Python unit test.