Skip to content

Commit 51129ca

Browse files
author
prabinb
committed
Added missing Python RDD functions
Added __repr__ function to StorageLevel class. Added doctest for RDD.getStorageLevel(). Removed RDD.generator() and RDD.setGenerator() functions.
1 parent 2409af9 commit 51129ca

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

python/pyspark/rdd.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
python_right_outer_join, python_cogroup
3737
from pyspark.statcounter import StatCounter
3838
from pyspark.rddsampler import RDDSampler
39+
from pyspark.storagelevel import StorageLevel
3940

4041
from py4j.java_collections import ListConverter, MapConverter
4142

@@ -1119,6 +1120,47 @@ def zip(self, other):
11191120
other._jrdd_deserializer)
11201121
return RDD(pairRDD, self.ctx, deserializer)
11211122

1123+
def name(self):
1124+
"""
1125+
Return the name of this RDD.
1126+
"""
1127+
name_ = self._jrdd.name()
1128+
if not name_:
1129+
return None
1130+
return name_.encode('utf-8')
1131+
1132+
def setName(self, name):
1133+
"""
1134+
Assign a name to this RDD.
1135+
>>> rdd1 = sc.parallelize([1,2])
1136+
>>> rdd1.setName('RDD1')
1137+
>>> rdd1.name()
1138+
'RDD1'
1139+
"""
1140+
self._jrdd.setName(name)
1141+
1142+
def toDebugString(self):
1143+
"""
1144+
A description of this RDD and its recursive dependencies for debugging.
1145+
"""
1146+
debug_string = self._jrdd.toDebugString()
1147+
if not debug_string:
1148+
return None
1149+
return debug_string.encode('utf-8')
1150+
1151+
def getStorageLevel(self):
1152+
"""
1153+
Get the RDD's current storage level.
1154+
>>> rdd1 = sc.parallelize([1,2])
1155+
>>> rdd1.getStorageLevel()
1156+
StorageLevel(False, False, False, 1)
1157+
"""
1158+
java_storage_level = self._jrdd.getStorageLevel()
1159+
storage_level = StorageLevel(java_storage_level.useDisk(),
1160+
java_storage_level.useMemory(),
1161+
java_storage_level.deserialized(),
1162+
java_storage_level.replication())
1163+
return storage_level
11221164

11231165
# TODO: `lookup` is disabled because we can't make direct comparisons based
11241166
# on the key; we need to compare the hash of the key to the hash of the

python/pyspark/storagelevel.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ def __init__(self, useDisk, useMemory, deserialized, replication = 1):
3131
self.deserialized = deserialized
3232
self.replication = replication
3333

34+
def __repr__(self):
35+
return "StorageLevel(%s, %s, %s, %s)" % (
36+
self.useDisk, self.useMemory, self.deserialized, self.replication)
37+
3438
StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
3539
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
3640
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)

0 commit comments

Comments
 (0)