From 6867a8953d5b94cfa3b6520c2a52ef8f04ed72fb Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 24 Feb 2016 22:42:38 -0800 Subject: [PATCH 1/5] A draft and runnable version --- .../python/mllib/streaming_test_example.py | 63 ++++++++ .../mllib/api/python/PythonMLLibAPI.scala | 64 +++++++- .../spark/mllib/stat/test/TestResult.scala | 2 +- python/pyspark/mllib/common.py | 1 + python/pyspark/mllib/stat/test.py | 138 +++++++++++++++++- python/pyspark/mllib/tests.py | 16 ++ 6 files changed, 276 insertions(+), 8 deletions(-) create mode 100644 examples/src/main/python/mllib/streaming_test_example.py diff --git a/examples/src/main/python/mllib/streaming_test_example.py b/examples/src/main/python/mllib/streaming_test_example.py new file mode 100644 index 000000000000..98f13174bfbc --- /dev/null +++ b/examples/src/main/python/mllib/streaming_test_example.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Create a queue of RDDs that will be mapped/reduced one at a time in + 1 second intervals. + + To run this example use + `$ bin/spark-submit examples/src/main/python/streaming/queue_stream.py +""" +import sys +import time +import tempfile +from shutil import rmtree + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +from pyspark.mllib.stat.test import BinarySample, StreamingTest + +if __name__ == "__main__": + + sc = SparkContext(appName="PythonStreamingTestExample") + ssc = StreamingContext(sc, 1) + + checkpointPath = tempfile.mkdtemp() + ssc.checkpoint(checkpointPath) + + # Create the queue through which RDDs can be pushed to + # a QueueInputDStream + rddQueue = [] + for i in range(5): + rddQueue += [ssc.sparkContext.parallelize( + [BinarySample(True, j) for j in range(1, 1001)], 10)] + + # Create the QueueInputDStream and use it do some processing + inputStream = ssc.queueStream(rddQueue) + + model = StreamingTest() + test_result = model.registerStream(inputStream) + + test_result.pprint() + + ssc.start() + time.sleep(12) + ssc.stop(stopSparkContext=True, stopGraceFully=True) + try: + rmtree(checkpointPath) + except OSError: + pass diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 93cf16e6f0c2..10fac93f6f48 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -22,13 +22,11 @@ import java.nio.{ByteBuffer, ByteOrder} import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import scala.language.existentials import scala.reflect.ClassTag import net.razorvine.pickle._ -import org.apache.spark.SparkContext import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.api.python.SerDeUtil import org.apache.spark.mllib.classification._ @@ -42,11 +40,10 @@ import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.random.{RandomRDDs => RG} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ -import org.apache.spark.mllib.stat.{ - KernelDensity, MultivariateStatisticalSummary, Statistics} +import org.apache.spark.mllib.stat.{KernelDensity, MultivariateStatisticalSummary, Statistics} import org.apache.spark.mllib.stat.correlation.CorrelationNames import org.apache.spark.mllib.stat.distribution.MultivariateGaussian -import org.apache.spark.mllib.stat.test.{ChiSqTestResult, KolmogorovSmirnovTestResult} +import org.apache.spark.mllib.stat.test._ import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomForest} import org.apache.spark.mllib.tree.configuration.{Algo, BoostingStrategy, Strategy} import org.apache.spark.mllib.tree.impurity._ @@ -56,6 +53,7 @@ import org.apache.spark.mllib.util.{LinearDataGenerator, MLUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.JavaDStream import org.apache.spark.util.Utils /** @@ -1214,7 +1212,11 @@ private[spark] object SerDe extends Serializable { extends IObjectPickler with IObjectConstructor { private val cls = implicitly[ClassTag[T]].runtimeClass - private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4) + + // drop 4 to remove "org.apache.spark.mllib", while dropRight 1 to remove class simple name. + private val interPath = cls.getName.split('.').drop(4).dropRight(1).mkString(".") + private val module = PYSPARK_PACKAGE + "." + interPath + private val name = cls.getSimpleName // register this to Pickler and Unpickler @@ -1469,6 +1471,20 @@ private[spark] object SerDe extends Serializable { } } + private[python] class BinarySamplePickler extends BasePickler[BinarySample] { + def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + val binarySample = obj.asInstanceOf[BinarySample] + saveObjects(out, pickler, binarySample.isExperiment, binarySample.value) + } + + def construct(args: Array[AnyRef]): AnyRef = { + if (args.length != 2) { + throw new PickleException("should be 2") + } + BinarySample(args(0).asInstanceOf[Boolean], args(1).asInstanceOf[Double]) + } + } + var initialized = false // This should be called before trying to serialize any above classes // In cluster mode, this should be put in the closure @@ -1476,6 +1492,7 @@ private[spark] object SerDe extends Serializable { SerDeUtil.initialize() synchronized { if (!initialized) { + new BinarySamplePickler().register() new DenseVectorPickler().register() new DenseMatrixPickler().register() new SparseMatrixPickler().register() @@ -1542,4 +1559,39 @@ private[spark] object SerDe extends Serializable { } }.toJavaRDD() } + + /** + * Convert a DStream of Java objects to a DStream of serialized Python objects, that is usable by + * PySpark. + */ + def javaToPython(jDStream: JavaDStream[Any]): JavaDStream[Array[Byte]] = { + val dStream = jDStream.dstream.mapPartitions { iter => + initialize() // let it called in executor + new SerDeUtil.AutoBatchedPickler(iter) + } + new JavaDStream[Array[Byte]](dStream) + } + + /** + * Convert a DStream of serialized Python objects to a DStream of objects, that is usable by + * PySpark. + */ + def pythonToJava(pyDStream: JavaDStream[Array[Byte]], batched: Boolean): JavaDStream[Any] = { + val dStream = pyDStream.dstream.mapPartitions { iter => + initialize() // let it called in executor + val unpickle = new Unpickler + iter.flatMap { row => + val obj = unpickle.loads(row) + if (batched) { + obj match { + case list: JArrayList[_] => list.asScala + case arr: Array[_] => arr + } + } else { + Seq(obj) + } + } + } + new JavaDStream[Any](dStream) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala index 8a29fd39a910..5588db61953f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala @@ -118,7 +118,7 @@ class KolmogorovSmirnovTestResult private[stat] ( */ @Experimental @Since("1.6.0") -private[stat] class StreamingTestResult @Since("1.6.0") ( +class StreamingTestResult @Since("1.6.0") ( @Since("1.6.0") override val pValue: Double, @Since("1.6.0") override val degreesOfFreedom: Double, @Since("1.6.0") override val statistic: Double, diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py index 9fda1b1682f5..31eb327c0caa 100644 --- a/python/pyspark/mllib/common.py +++ b/python/pyspark/mllib/common.py @@ -55,6 +55,7 @@ def _new_smart_decode(obj): 'DenseMatrix', 'Rating', 'LabeledPoint', + 'BinarySample', ] diff --git a/python/pyspark/mllib/stat/test.py b/python/pyspark/mllib/stat/test.py index 0abe104049ff..60399fd8a6f0 100644 --- a/python/pyspark/mllib/stat/test.py +++ b/python/pyspark/mllib/stat/test.py @@ -15,10 +15,15 @@ # limitations under the License. # +from collections import namedtuple + +from pyspark import SparkContext, since from pyspark.mllib.common import inherit_doc, JavaModelWrapper +from pyspark.streaming.dstream import DStream -__all__ = ["ChiSqTestResult", "KolmogorovSmirnovTestResult"] +__all__ = ["ChiSqTestResult", "KolmogorovSmirnovTestResult", "BinarySample", "StreamingTest", + "StreamingTestResult"] class TestResult(JavaModelWrapper): @@ -80,3 +85,134 @@ class KolmogorovSmirnovTestResult(TestResult): """ Contains test results for the Kolmogorov-Smirnov test. """ + + +class BinarySample(namedtuple("BinarySample", ["isExperiment", "value"])): + """ + Represents a (isExperiment, value) tuple. + + >>> bs = BinarySample(True, 1.0) + >>> (bs.isExperiment, bs.value) + (True, 1.0) + + .. versionadded:: 2.0.0 + """ + + def __reduce__(self): + return BinarySample, (bool(self.isExperiment), float(self.value)) + + +@inherit_doc +class StreamingTestResult(TestResult): + """ + Contains test results for StreamingTest. + """ + + @property + def method(self): + """ + Name of the test method + """ + return self._java_model.method() + + +class StreamingTest(object): + """ + .. note:: Experimental + + Online 2-sample significance testing for a stream of (Boolean, Double) pairs. The Boolean + identifies which sample each observation comes from, and the Double is the numeric value of the + observation. + + To address novelty affects, the `peacePeriod` specifies a set number of initial RDD batches of + the DStream to be dropped from significance testing. + + The `windowSize` sets the number of batches each significance test is to be performed over. The + window is sliding with a stride length of 1 batch. Setting windowSize to 0 will perform + cumulative processing, using all batches seen so far. + + Different tests may be used for assessing statistical significance depending on assumptions + satisfied by data. For more details, see StreamingTestMethod. The `testMethod` specifies + which test will be used. + + .. versionadded:: 2.0.0 + """ + + def __init__(self): + self._peacePeriod = 0 + self._windowSize = 0 + self._testMethod = "welch" + + @since('2.0.0') + def setPeacePeriod(self, peacePeriod): + """ + Update peacePeriod + :param peacePeriod: + :return: + """ + self._peacePeriod = peacePeriod + + @since('2.0.0') + def setWindowSize(self, windowSize): + """ + Update windowSize + :param windowSize: + :return: + """ + self._windowSize = windowSize + + @since('2.0.0') + def setTestMethod(self, testMethod): + """ + Update test method + :param testMethod: + :return: + """ + self._testMethod = testMethod + + @since('2.0.0') + def registerStream(self, data): + """ + Register a data stream to get its test result. + + :param data: + The input data stream, each element is a BinarySample instance. + """ + self._validate(data) + sc = SparkContext._active_spark_context + + streamingTest = sc._jvm.org.apache.spark.mllib.stat.test.StreamingTest() + streamingTest.setPeacePeriod(self._peacePeriod) + streamingTest.setWindowSize(self._windowSize) + streamingTest.setTestMethod(self._testMethod) + + javaDStream = sc._jvm.SerDe.pythonToJava(data._jdstream, True) + testResult = streamingTest.registerStream(javaDStream) + pythonTestResult = sc._jvm.SerDe.javaToPython(testResult) + + pyResult = DStream(pythonTestResult, data._ssc, data._jrdd_deserializer) + + return pyResult + + @classmethod + def _validate(cls, samples): + if isinstance(samples, DStream): + pass + else: + raise TypeError("BinarySample should be represented by a DStream, " + "but got %s." % type(samples)) + + +def _test(): + import doctest + import pyspark.mllib.stat.test + globs = pyspark.mllib.stat.test.__dict__.copy() + globs['sc'] = SparkContext('local[4]', 'Statistical Test doctest') + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 25a7c29982b3..51aca89bb565 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -58,6 +58,7 @@ from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD from pyspark.mllib.random import RandomRDDs from pyspark.mllib.stat import Statistics +from pyspark.mllib.stat.test import BinarySample, StreamingTest from pyspark.mllib.feature import Word2Vec from pyspark.mllib.feature import IDF from pyspark.mllib.feature import StandardScaler, ElementwiseProduct @@ -65,6 +66,7 @@ from pyspark.mllib.util import MLUtils from pyspark.serializers import PickleSerializer from pyspark.streaming import StreamingContext +from pyspark.streaming.tests import PySparkStreamingTestCase from pyspark.sql import SQLContext from pyspark.streaming import StreamingContext @@ -1583,6 +1585,20 @@ def test_als_ratings_id_long_error(self): self.assertRaises(Py4JJavaError, self.sc._jvm.SerDe.loads, bytearray(ser.dumps(r))) +class StreamingTestTest(PySparkStreamingTestCase): + def test_accuracy_for_single_center(self): + """Test that parameters obtained are correct for a single center.""" + + fake_stream = [[True, 2], [False, 2], [False, 1], [True, 1], + [True, 4], [True, 1], [True, 2], [False, 3], + [True, 1], [False, 1], [False, 1], [False, 1]] + + dstream = self.ssc.queueStream(fake_stream).map(lambda x: BinarySample(x[0], x[1])) + model = StreamingTest() + test_result = model.registerStream(dstream) + print(self._take(test_result, 2)) + + if __name__ == "__main__": from pyspark.mllib.tests import * if not _have_scipy: From 079a87369e57a0454aaa0ff5a1ebefce97abcfc0 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 25 Feb 2016 00:09:52 -0800 Subject: [PATCH 2/5] treat StreamingTestResult as an independent class --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 17 +++++++++++++++++ python/pyspark/mllib/common.py | 1 + python/pyspark/mllib/stat/test.py | 17 ++++++++++++++++- 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 10fac93f6f48..89be14da5a78 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1485,6 +1485,22 @@ private[spark] object SerDe extends Serializable { } } + private[python] class StreamingTestResultPickler extends BasePickler[StreamingTestResult] { + def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + val result = obj.asInstanceOf[StreamingTestResult] + saveObjects(out, pickler, result.pValue, result.degreesOfFreedom, result.statistic, + result.method, result.nullHypothesis) + } + + def construct(args: Array[AnyRef]): AnyRef = { + if (args.length != 5) { + throw new PickleException("should be 5") + } + new StreamingTestResult(args(0).asInstanceOf[Double], args(1).asInstanceOf[Double], + args(2).asInstanceOf[Double], args(3).asInstanceOf[String], args(4).asInstanceOf[String]) + } + } + var initialized = false // This should be called before trying to serialize any above classes // In cluster mode, this should be put in the closure @@ -1497,6 +1513,7 @@ private[spark] object SerDe extends Serializable { new DenseMatrixPickler().register() new SparseMatrixPickler().register() new SparseVectorPickler().register() + new StreamingTestResultPickler().register() new LabeledPointPickler().register() new RatingPickler().register() initialized = true diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py index 31eb327c0caa..d0754aafff70 100644 --- a/python/pyspark/mllib/common.py +++ b/python/pyspark/mllib/common.py @@ -56,6 +56,7 @@ def _new_smart_decode(obj): 'Rating', 'LabeledPoint', 'BinarySample', + 'StreamingTestResult', ] diff --git a/python/pyspark/mllib/stat/test.py b/python/pyspark/mllib/stat/test.py index 60399fd8a6f0..bb78646384d3 100644 --- a/python/pyspark/mllib/stat/test.py +++ b/python/pyspark/mllib/stat/test.py @@ -103,7 +103,7 @@ def __reduce__(self): @inherit_doc -class StreamingTestResult(TestResult): +class StreamingTestResult2(TestResult): """ Contains test results for StreamingTest. """ @@ -116,6 +116,21 @@ def method(self): return self._java_model.method() +class StreamingTestResult(namedtuple("StreamingTestResult", + ["pValue", "degreesOfFreedom", "statistic", "method", + "nullHypothesis"])): + """ + Contains test results for StreamingTest. + + .. versionadded:: 2.0.0 + """ + + def __reduce__(self): + return StreamingTestResult, (float(self.pValue), + float(self.degreesOfFreedom), float(self.statistic), + str(self.method), str(self.nullHypothesis)) + + class StreamingTest(object): """ .. note:: Experimental From f70d7aa52a2006d2a86de2e4591839f34dabc50a Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 25 Feb 2016 12:02:24 -0800 Subject: [PATCH 3/5] add test for streamingtest --- .../python/mllib/streaming_test_example.py | 26 +++++------- python/pyspark/mllib/tests.py | 42 +++++++++++++++---- 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/examples/src/main/python/mllib/streaming_test_example.py b/examples/src/main/python/mllib/streaming_test_example.py index 98f13174bfbc..e9a64c862195 100644 --- a/examples/src/main/python/mllib/streaming_test_example.py +++ b/examples/src/main/python/mllib/streaming_test_example.py @@ -16,13 +16,8 @@ # """ - Create a queue of RDDs that will be mapped/reduced one at a time in - 1 second intervals. - - To run this example use - `$ bin/spark-submit examples/src/main/python/streaming/queue_stream.py +Create a DStream that contains several RDDs to show the StreamingTest of PySpark. """ -import sys import time import tempfile from shutil import rmtree @@ -36,21 +31,20 @@ sc = SparkContext(appName="PythonStreamingTestExample") ssc = StreamingContext(sc, 1) - checkpointPath = tempfile.mkdtemp() - ssc.checkpoint(checkpointPath) + checkpoint_path = tempfile.mkdtemp() + ssc.checkpoint(checkpoint_path) - # Create the queue through which RDDs can be pushed to - # a QueueInputDStream - rddQueue = [] + # Create the queue through which RDDs can be pushed to a QueueInputDStream. + rdd_queue = [] for i in range(5): - rddQueue += [ssc.sparkContext.parallelize( + rdd_queue += [ssc.sparkContext.parallelize( [BinarySample(True, j) for j in range(1, 1001)], 10)] - # Create the QueueInputDStream and use it do some processing - inputStream = ssc.queueStream(rddQueue) + # Create the QueueInputDStream and use it do some processing. + input_stream = ssc.queueStream(rdd_queue) model = StreamingTest() - test_result = model.registerStream(inputStream) + test_result = model.registerStream(input_stream) test_result.pprint() @@ -58,6 +52,6 @@ time.sleep(12) ssc.stop(stopSparkContext=True, stopGraceFully=True) try: - rmtree(checkpointPath) + rmtree(checkpoint_path) except OSError: pass diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 51aca89bb565..2026e66c2d76 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -58,7 +58,7 @@ from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD from pyspark.mllib.random import RandomRDDs from pyspark.mllib.stat import Statistics -from pyspark.mllib.stat.test import BinarySample, StreamingTest +from pyspark.mllib.stat.test import BinarySample, StreamingTest, StreamingTestResult from pyspark.mllib.feature import Word2Vec from pyspark.mllib.feature import IDF from pyspark.mllib.feature import StandardScaler, ElementwiseProduct @@ -1586,17 +1586,41 @@ def test_als_ratings_id_long_error(self): class StreamingTestTest(PySparkStreamingTestCase): - def test_accuracy_for_single_center(self): - """Test that parameters obtained are correct for a single center.""" + def test_streaming_test_result_and_model(self): + """ + Assert the StreamingTest return valid result, and the set method of it. + """ + + checkpoint_path = tempfile.mkdtemp() + self.ssc.checkpoint(checkpoint_path) + + # Create the queue through which RDDs can be pushed to a QueueInputDStream. + rdd_queue = [] + for i in range(5): + rdd_queue += [self.ssc.sparkContext.parallelize( + [BinarySample(True, j) for j in range(1, 1001)], 10)] - fake_stream = [[True, 2], [False, 2], [False, 1], [True, 1], - [True, 4], [True, 1], [True, 2], [False, 3], - [True, 1], [False, 1], [False, 1], [False, 1]] + # Create the QueueInputDStream and use it do some processing. + input_stream = self.ssc.queueStream(rdd_queue) - dstream = self.ssc.queueStream(fake_stream).map(lambda x: BinarySample(x[0], x[1])) model = StreamingTest() - test_result = model.registerStream(dstream) - print(self._take(test_result, 2)) + model.setPeacePeriod(1) + model.setWindowSize(2) + model.setTestMethod("student") + + test_result = model.registerStream(input_stream) + res = self._take(test_result, 1)[0] + self.assertTrue(isinstance(res, StreamingTestResult)) + self.assertEqual(res.method, "Student's 2-sample t-test") + + self.assertEqual(model._peacePeriod, 1) + self.assertEqual(model._windowSize, 2) + self.assertEqual(model._testMethod, "student") + + try: + rmtree(checkpoint_path) + except OSError: + pass if __name__ == "__main__": From 770703b341fd4f900718cf2fd8a1985170435b65 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 25 Feb 2016 12:22:38 -0800 Subject: [PATCH 4/5] refine test --- python/pyspark/mllib/stat/test.py | 44 ++++++------------------------- 1 file changed, 8 insertions(+), 36 deletions(-) diff --git a/python/pyspark/mllib/stat/test.py b/python/pyspark/mllib/stat/test.py index bb78646384d3..00ea8b24057b 100644 --- a/python/pyspark/mllib/stat/test.py +++ b/python/pyspark/mllib/stat/test.py @@ -87,14 +87,11 @@ class KolmogorovSmirnovTestResult(TestResult): """ +@since('2.0.0') class BinarySample(namedtuple("BinarySample", ["isExperiment", "value"])): """ Represents a (isExperiment, value) tuple. - >>> bs = BinarySample(True, 1.0) - >>> (bs.isExperiment, bs.value) - (True, 1.0) - .. versionadded:: 2.0.0 """ @@ -102,20 +99,7 @@ def __reduce__(self): return BinarySample, (bool(self.isExperiment), float(self.value)) -@inherit_doc -class StreamingTestResult2(TestResult): - """ - Contains test results for StreamingTest. - """ - - @property - def method(self): - """ - Name of the test method - """ - return self._java_model.method() - - +@since('2.0.0') class StreamingTestResult(namedtuple("StreamingTestResult", ["pValue", "degreesOfFreedom", "statistic", "method", "nullHypothesis"])): @@ -131,6 +115,7 @@ def __reduce__(self): str(self.method), str(self.nullHypothesis)) +@since('2.0.0') class StreamingTest(object): """ .. note:: Experimental @@ -163,7 +148,7 @@ def setPeacePeriod(self, peacePeriod): """ Update peacePeriod :param peacePeriod: - :return: + Set number of initial RDD batches of the DStream to be dropped from significance testing. """ self._peacePeriod = peacePeriod @@ -172,7 +157,7 @@ def setWindowSize(self, windowSize): """ Update windowSize :param windowSize: - :return: + Set the number of batches each significance test is to be performed over. """ self._windowSize = windowSize @@ -181,8 +166,10 @@ def setTestMethod(self, testMethod): """ Update test method :param testMethod: - :return: + Currently supported tests: `welch`, `student`. """ + assert(testMethod in ("welch", "student"), + "Currently supported tests: \"welch\", \"student\"") self._testMethod = testMethod @since('2.0.0') @@ -216,18 +203,3 @@ def _validate(cls, samples): else: raise TypeError("BinarySample should be represented by a DStream, " "but got %s." % type(samples)) - - -def _test(): - import doctest - import pyspark.mllib.stat.test - globs = pyspark.mllib.stat.test.__dict__.copy() - globs['sc'] = SparkContext('local[4]', 'Statistical Test doctest') - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) - globs['sc'].stop() - if failure_count: - exit(-1) - - -if __name__ == "__main__": - _test() From e4e8d5ec5d467d7efebb917c6c7e60b14f519e74 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Sat, 5 Mar 2016 15:20:51 -0800 Subject: [PATCH 5/5] remove since from class header --- python/pyspark/mllib/stat/test.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/pyspark/mllib/stat/test.py b/python/pyspark/mllib/stat/test.py index 00ea8b24057b..ae88b10efc3f 100644 --- a/python/pyspark/mllib/stat/test.py +++ b/python/pyspark/mllib/stat/test.py @@ -87,7 +87,6 @@ class KolmogorovSmirnovTestResult(TestResult): """ -@since('2.0.0') class BinarySample(namedtuple("BinarySample", ["isExperiment", "value"])): """ Represents a (isExperiment, value) tuple. @@ -99,7 +98,6 @@ def __reduce__(self): return BinarySample, (bool(self.isExperiment), float(self.value)) -@since('2.0.0') class StreamingTestResult(namedtuple("StreamingTestResult", ["pValue", "degreesOfFreedom", "statistic", "method", "nullHypothesis"])): @@ -115,7 +113,6 @@ def __reduce__(self): str(self.method), str(self.nullHypothesis)) -@since('2.0.0') class StreamingTest(object): """ .. note:: Experimental