Skip to content

Commit d26ec5e

Browse files
committed
added test
1 parent e9f5b8d commit d26ec5e

File tree

1 file changed

+52
-15
lines changed

1 file changed

+52
-15
lines changed

python/pyspark/context.py

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -475,23 +475,71 @@ def __init__(self, sparkContext):
475475
476476
@param sparkContext: The SparkContext to wrap.
477477
478+
# SQLContext
478479
>>> from pyspark.context import SQLContext
479480
>>> sqlCtx = SQLContext(sc)
481+
482+
>>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"},
483+
... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
484+
485+
# applySchema
486+
>>> srdd = sqlCtx.applySchema(rdd)
487+
488+
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]
489+
True
490+
491+
# registerRDDAsTable
492+
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
493+
494+
# sql
495+
>>> srdd2 = sqlCtx.sql("select field1 as f1, field2 as f2 from table1")
496+
>>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}]
497+
True
498+
480499
"""
481500
self._sc = sparkContext
482501
self._jsc = self._sc._jsc
483502
self._jvm = self._sc._jvm
484503
self._ssql_ctx = self._jvm.SQLContext(self._jsc.sc())
485504

505+
def applySchema(self, rdd):
506+
"""
507+
Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to
508+
determine the fields names and types, and then use that to extract all the dictionaries.
509+
510+
# >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL
511+
# Traceback (most recent call last):
512+
# ...
513+
# ValueError:...
514+
"""
515+
if (rdd.__class__ is SchemaRDD):
516+
raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__)
517+
elif not isinstance(rdd.first(), dict):
518+
raise ValueError("Only RDDs with dictionaries can be converted to %s: %s" %
519+
(SchemaRDD.__name__, rdd.first().__class__.__name))
520+
521+
jrdd = self._sc._pythonToJavaMap(rdd._jrdd)
522+
srdd = self._ssql_ctx.applySchema(jrdd.rdd())
523+
return SchemaRDD(srdd, self)
524+
525+
def registerRDDAsTable(self, rdd, tableName):
526+
"""
527+
528+
"""
529+
if (rdd.__class__ is SchemaRDD):
530+
jschema_rdd = rdd._jschema_rdd
531+
self._ssql_ctx.registerRDDAsTable(jschema_rdd, tableName)
532+
else:
533+
raise ValueError("Can only register SchemaRDD as table")
534+
486535
def parquetFile(path):
487536
jschema_rdd = self._ssql_ctx.parquetFile(path)
488537
return SchemaRDD(jschema_rdd, self)
489538

490-
def registerRDDAsTable(rdd, tableName):
491-
jschema_rdd = rdd._jschema_rdd
492-
self._ssql_ctx.registerRDDAsTable(jschema_rdd, tableName)
493-
494539
def sql(self, sqlQuery):
540+
"""
541+
Run a sql query over a registered table, and return a L{SchemaRDD} with the results.
542+
"""
495543
return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self)
496544

497545
def table(tableName):
@@ -503,17 +551,6 @@ def cacheTable(tableName):
503551
def uncacheTable(tableName):
504552
self._ssql_ctx.uncacheTable(tableName)
505553

506-
def applySchema(self, rdd):
507-
if (rdd.__class__ is SchemaRDD):
508-
raise Exception("Cannot apply schema to %s" % SchemaRDD.__name__)
509-
elif isinstance(rdd.first(), dict) is not dict:
510-
raise Exception("Only RDDs with dictionaries can be converted to %s" % SchemaRDD.__name__)
511-
512-
jrdd = self._sc._pythonToJavaMap(rdd._jrdd)
513-
srdd = self._ssql_ctx.applySchema(jrdd.rdd())
514-
return SchemaRDD(srdd, self)
515-
516-
517554
def _test():
518555
import atexit
519556
import doctest

0 commit comments

Comments
 (0)