Skip to content

Commit 8df4dad

Browse files
daviesJoshRosen
authored andcommitted
[SPARK-2871] [PySpark] add approx API for RDD
RDD.countApprox(self, timeout, confidence=0.95) :: Experimental :: Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished. >>> rdd = sc.parallelize(range(1000), 10) >>> rdd.countApprox(1000, 1.0) 1000 RDD.sumApprox(self, timeout, confidence=0.95) Approximate operation to return the sum within a timeout or meet the confidence. >>> rdd = sc.parallelize(range(1000), 10) >>> r = sum(xrange(1000)) >>> (rdd.sumApprox(1000) - r) / r < 0.05 RDD.meanApprox(self, timeout, confidence=0.95) :: Experimental :: Approximate operation to return the mean within a timeout or meet the confidence. >>> rdd = sc.parallelize(range(1000), 10) >>> r = sum(xrange(1000)) / 1000.0 >>> (rdd.meanApprox(1000) - r) / r < 0.05 True Author: Davies Liu <[email protected]> Closes apache#2095 from davies/approx and squashes the following commits: e8c252b [Davies Liu] add approx API for RDD
1 parent db436e3 commit 8df4dad

File tree

2 files changed

+98
-0
lines changed

2 files changed

+98
-0
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,23 @@ private[spark] object PythonRDD extends Logging {
749749
}
750750
}
751751
}
752+
753+
/**
754+
* Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
755+
*/
756+
def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
757+
pyRDD.rdd.mapPartitions { iter =>
758+
val unpickle = new Unpickler
759+
iter.flatMap { row =>
760+
val obj = unpickle.loads(row)
761+
if (batched) {
762+
obj.asInstanceOf[JArrayList[_]]
763+
} else {
764+
Seq(obj)
765+
}
766+
}
767+
}.toJavaRDD()
768+
}
752769
}
753770

754771
private

python/pyspark/rdd.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,22 @@ def __exit__(self, type, value, tb):
131131
self._context._jsc.setCallSite(None)
132132

133133

134+
class BoundedFloat(float):
135+
"""
136+
Bounded value is generated by approximate job, with confidence and low
137+
bound and high bound.
138+
139+
>>> BoundedFloat(100.0, 0.95, 95.0, 105.0)
140+
100.0
141+
"""
142+
def __new__(cls, mean, confidence, low, high):
143+
obj = float.__new__(cls, mean)
144+
obj.confidence = confidence
145+
obj.low = low
146+
obj.high = high
147+
return obj
148+
149+
134150
class MaxHeapQ(object):
135151

136152
"""
@@ -1792,6 +1808,71 @@ def _defaultReducePartitions(self):
17921808
# keys in the pairs. This could be an expensive operation, since those
17931809
# hashes aren't retained.
17941810

1811+
def _is_pickled(self):
1812+
""" Return this RDD is serialized by Pickle or not. """
1813+
der = self._jrdd_deserializer
1814+
if isinstance(der, PickleSerializer):
1815+
return True
1816+
if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
1817+
return True
1818+
return False
1819+
1820+
def _to_jrdd(self):
1821+
""" Return an JavaRDD of Object by unpickling
1822+
1823+
It will convert each Python object into Java object by Pyrolite, whenever the
1824+
RDD is serialized in batch or not.
1825+
"""
1826+
if not self._is_pickled():
1827+
self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
1828+
batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
1829+
return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
1830+
1831+
def countApprox(self, timeout, confidence=0.95):
1832+
"""
1833+
:: Experimental ::
1834+
Approximate version of count() that returns a potentially incomplete
1835+
result within a timeout, even if not all tasks have finished.
1836+
1837+
>>> rdd = sc.parallelize(range(1000), 10)
1838+
>>> rdd.countApprox(1000, 1.0)
1839+
1000
1840+
"""
1841+
drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))])
1842+
return int(drdd.sumApprox(timeout, confidence))
1843+
1844+
def sumApprox(self, timeout, confidence=0.95):
1845+
"""
1846+
:: Experimental ::
1847+
Approximate operation to return the sum within a timeout
1848+
or meet the confidence.
1849+
1850+
>>> rdd = sc.parallelize(range(1000), 10)
1851+
>>> r = sum(xrange(1000))
1852+
>>> (rdd.sumApprox(1000) - r) / r < 0.05
1853+
True
1854+
"""
1855+
jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd()
1856+
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
1857+
r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
1858+
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
1859+
1860+
def meanApprox(self, timeout, confidence=0.95):
1861+
"""
1862+
:: Experimental ::
1863+
Approximate operation to return the mean within a timeout
1864+
or meet the confidence.
1865+
1866+
>>> rdd = sc.parallelize(range(1000), 10)
1867+
>>> r = sum(xrange(1000)) / 1000.0
1868+
>>> (rdd.meanApprox(1000) - r) / r < 0.05
1869+
True
1870+
"""
1871+
jrdd = self.map(float)._to_jrdd()
1872+
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
1873+
r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
1874+
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
1875+
17951876

17961877
class PipelinedRDD(RDD):
17971878

0 commit comments

Comments
 (0)