Skip to content

Commit 7515ba0

Browse files
committed
added more tests :)
1 parent d26ec5e commit 7515ba0

File tree

1 file changed

+35
-10
lines changed

1 file changed

+35
-10
lines changed

python/pyspark/context.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -488,14 +488,31 @@ def __init__(self, sparkContext):
488488
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]
489489
True
490490
491+
>>> sqlCtx.applySchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL
492+
Traceback (most recent call last):
493+
...
494+
ValueError:...
495+
496+
>>> bad_rdd = sc.parallelize([1,2,3])
497+
>>> sqlCtx.applySchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL
498+
Traceback (most recent call last):
499+
...
500+
ValueError:...
501+
491502
# registerRDDAsTable
492503
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
493504
494505
# sql
495-
>>> srdd2 = sqlCtx.sql("select field1 as f1, field2 as f2 from table1")
506+
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
496507
>>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}]
497508
True
498509
510+
# table
511+
#>>> sqlCtx.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
512+
#>>> sqlCtx.sql('INSERT INTO src (key, value) VALUES (1, "one")')
513+
#>>> sqlCtx.sql('INSERT INTO src (key, value) VALUES (2, "two")')
514+
#>>> srdd3 = sqlCtx.table("src")
515+
#>>> srdd3.collect() == [{"key" : 1, "value" : "one"}, {"key" : 2, "value": "two"}]
499516
"""
500517
self._sc = sparkContext
501518
self._jsc = self._sc._jsc
@@ -506,25 +523,21 @@ def applySchema(self, rdd):
506523
"""
507524
Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to
508525
determine the fields names and types, and then use that to extract all the dictionaries.
509-
510-
# >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL
511-
# Traceback (most recent call last):
512-
# ...
513-
# ValueError:...
514526
"""
515527
if (rdd.__class__ is SchemaRDD):
516528
raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__)
517529
elif not isinstance(rdd.first(), dict):
518530
raise ValueError("Only RDDs with dictionaries can be converted to %s: %s" %
519-
(SchemaRDD.__name__, rdd.first().__class__.__name))
531+
(SchemaRDD.__name__, rdd.first()))
520532

521533
jrdd = self._sc._pythonToJavaMap(rdd._jrdd)
522534
srdd = self._ssql_ctx.applySchema(jrdd.rdd())
523535
return SchemaRDD(srdd, self)
524536

525537
def registerRDDAsTable(self, rdd, tableName):
526538
"""
527-
539+
Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
540+
during the lifetime of this instance of SQLContext.
528541
"""
529542
if (rdd.__class__ is SchemaRDD):
530543
jschema_rdd = rdd._jschema_rdd
@@ -533,22 +546,34 @@ def registerRDDAsTable(self, rdd, tableName):
533546
raise ValueError("Can only register SchemaRDD as table")
534547

535548
def parquetFile(path):
549+
"""
550+
Loads a Parquet file, returning the result as a L{SchemaRDD}.
551+
"""
536552
jschema_rdd = self._ssql_ctx.parquetFile(path)
537553
return SchemaRDD(jschema_rdd, self)
538554

539555
def sql(self, sqlQuery):
540556
"""
541-
Run a sql query over a registered table, and return a L{SchemaRDD} with the results.
557+
Executes a SQL query using Spark, returning the result as a L{SchemaRDD}.
542558
"""
543559
return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self)
544560

545-
def table(tableName):
561+
def table(self, tableName):
562+
"""
563+
Returns the specified table as a L{SchemaRDD}.
564+
"""
546565
return SchemaRDD(self._ssql_ctx.table(tableName), self)
547566

548567
def cacheTable(tableName):
568+
"""
569+
Caches the specified table in-memory.
570+
"""
549571
self._ssql_ctx.cacheTable(tableName)
550572

551573
def uncacheTable(tableName):
574+
"""
575+
Removes the specified table from the in-memory cache.
576+
"""
552577
self._ssql_ctx.uncacheTable(tableName)
553578

554579
def _test():

0 commit comments

Comments
 (0)