Skip to content

Commit f2312c7

Browse files
committed
Moved everything into sql.py
1 parent a19afe4 commit f2312c7

File tree

5 files changed

+343
-321
lines changed

5 files changed

+343
-321
lines changed

python/pyspark/__init__.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@
3636
Finer-grained cache persistence levels.
3737
3838
Spark SQL:
39-
- L{SQLContext<pyspark.context.SQLContext>}
39+
- L{SQLContext<pyspark.sql.SQLContext>}
4040
Main entry point for SQL functionality.
41-
- L{SchemaRDD<pyspark.rdd.SchemaRDD>}
41+
- L{SchemaRDD<pyspark.sql.SchemaRDD>}
4242
A Resilient Distributed Dataset (RDD) with Schema information for the data contained. In
4343
addition to normal RDD operations, SchemaRDDs also support SQL.
44-
- L{Row<pyspark.rdd.Row>}
44+
- L{Row<pyspark.sql.Row>}
4545
A Row of data returned by a Spark SQL query.
4646
4747
Hive:
@@ -58,10 +58,10 @@
5858

5959
from pyspark.conf import SparkConf
6060
from pyspark.context import SparkContext
61-
from pyspark.context import SQLContext
61+
from pyspark.sql import SQLContext
6262
from pyspark.rdd import RDD
63-
from pyspark.rdd import SchemaRDD
64-
from pyspark.rdd import Row
63+
from pyspark.sql import SchemaRDD
64+
from pyspark.sql import Row
6565
from pyspark.files import SparkFiles
6666
from pyspark.storagelevel import StorageLevel
6767

python/pyspark/context.py

Lines changed: 1 addition & 223 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@
3232
PairDeserializer
3333
from pyspark.storagelevel import StorageLevel
3434
from pyspark import rdd
35-
from pyspark.rdd import RDD, SchemaRDD
35+
from pyspark.rdd import RDD
3636

3737
from py4j.java_collections import ListConverter
38-
from py4j.protocol import Py4JError
3938

4039

4140
class SparkContext(object):
@@ -175,8 +174,6 @@ def _ensure_initialized(cls, instance=None, gateway=None):
175174
SparkContext._gateway = gateway or launch_gateway()
176175
SparkContext._jvm = SparkContext._gateway.jvm
177176
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
178-
SparkContext._pythonToJavaMap = SparkContext._jvm.PythonRDD.pythonToJavaMap
179-
SparkContext._javaToPython = SparkContext._jvm.PythonRDD.javaToPython
180177

181178
if instance:
182179
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
@@ -463,225 +460,6 @@ def sparkUser(self):
463460
"""
464461
return self._jsc.sc().sparkUser()
465462

466-
class SQLContext:
467-
"""
468-
Main entry point for SparkSQL functionality. A SQLContext can be used create L{SchemaRDD}s,
469-
register L{SchemaRDD}s as tables, execute sql over tables, cache tables, and read parquet files.
470-
"""
471-
472-
def __init__(self, sparkContext):
473-
"""
474-
Create a new SQLContext.
475-
476-
@param sparkContext: The SparkContext to wrap.
477-
478-
>>> from pyspark.context import SQLContext
479-
>>> sqlCtx = SQLContext(sc)
480-
481-
>>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"},
482-
... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
483-
484-
>>> srdd = sqlCtx.inferSchema(rdd)
485-
>>> sqlCtx.inferSchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL
486-
Traceback (most recent call last):
487-
...
488-
ValueError:...
489-
490-
>>> bad_rdd = sc.parallelize([1,2,3])
491-
>>> sqlCtx.inferSchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL
492-
Traceback (most recent call last):
493-
...
494-
ValueError:...
495-
496-
>>> allTypes = sc.parallelize([{"int" : 1, "string" : "string", "double" : 1.0, "long": 1L,
497-
... "boolean" : True}])
498-
>>> srdd = sqlCtx.inferSchema(allTypes).map(lambda x: (x.int, x.string, x.double, x.long,
499-
... x.boolean))
500-
>>> srdd.collect()[0]
501-
(1, u'string', 1.0, 1, True)
502-
"""
503-
self._sc = sparkContext
504-
self._jsc = self._sc._jsc
505-
self._jvm = self._sc._jvm
506-
507-
@property
508-
def _ssql_ctx(self):
509-
"""
510-
Accessor for the JVM SparkSQL context. Subclasses can overrite this property to provide
511-
their own JVM Contexts.
512-
"""
513-
if not hasattr(self, '_scala_SQLContext'):
514-
self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())
515-
return self._scala_SQLContext
516-
517-
def inferSchema(self, rdd):
518-
"""
519-
Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to
520-
determine the fields names and types, and then use that to extract all the dictionaries.
521-
522-
>>> from pyspark.context import SQLContext
523-
>>> sqlCtx = SQLContext(sc)
524-
>>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"},
525-
... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
526-
>>> srdd = sqlCtx.inferSchema(rdd)
527-
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"},
528-
... {"field1" : 3, "field2": "row3"}]
529-
True
530-
"""
531-
if (rdd.__class__ is SchemaRDD):
532-
raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__)
533-
elif not isinstance(rdd.first(), dict):
534-
raise ValueError("Only RDDs with dictionaries can be converted to %s: %s" %
535-
(SchemaRDD.__name__, rdd.first()))
536-
537-
jrdd = self._sc._pythonToJavaMap(rdd._jrdd)
538-
srdd = self._ssql_ctx.inferSchema(jrdd.rdd())
539-
return SchemaRDD(srdd, self)
540-
541-
def registerRDDAsTable(self, rdd, tableName):
542-
"""
543-
Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
544-
during the lifetime of this instance of SQLContext.
545-
546-
>>> from pyspark.context import SQLContext
547-
>>> sqlCtx = SQLContext(sc)
548-
>>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"},
549-
... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
550-
>>> srdd = sqlCtx.inferSchema(rdd)
551-
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
552-
"""
553-
if (rdd.__class__ is SchemaRDD):
554-
jschema_rdd = rdd._jschema_rdd
555-
self._ssql_ctx.registerRDDAsTable(jschema_rdd, tableName)
556-
else:
557-
raise ValueError("Can only register SchemaRDD as table")
558-
559-
def parquetFile(self, path):
560-
"""
561-
Loads a Parquet file, returning the result as a L{SchemaRDD}.
562-
563-
>>> from pyspark.context import SQLContext
564-
>>> sqlCtx = SQLContext(sc)
565-
>>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"},
566-
... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
567-
>>> srdd = sqlCtx.inferSchema(rdd)
568-
>>> srdd.saveAsParquetFile("/tmp/tmp.parquet")
569-
>>> srdd2 = sqlCtx.parquetFile("/tmp/tmp.parquet")
570-
>>> srdd.collect() == srdd2.collect()
571-
True
572-
"""
573-
jschema_rdd = self._ssql_ctx.parquetFile(path)
574-
return SchemaRDD(jschema_rdd, self)
575-
576-
def sql(self, sqlQuery):
577-
"""
578-
Executes a SQL query using Spark, returning the result as a L{SchemaRDD}.
579-
580-
>>> from pyspark.context import SQLContext
581-
>>> sqlCtx = SQLContext(sc)
582-
>>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"},
583-
... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
584-
>>> srdd = sqlCtx.inferSchema(rdd)
585-
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
586-
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
587-
>>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"},
588-
... {"f1" : 3, "f2": "row3"}]
589-
True
590-
"""
591-
return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self)
592-
593-
def table(self, tableName):
594-
"""
595-
Returns the specified table as a L{SchemaRDD}.
596-
597-
>>> from pyspark.context import SQLContext
598-
>>> sqlCtx = SQLContext(sc)
599-
>>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"},
600-
... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
601-
>>> srdd = sqlCtx.inferSchema(rdd)
602-
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
603-
>>> srdd2 = sqlCtx.table("table1")
604-
>>> srdd.collect() == srdd2.collect()
605-
True
606-
"""
607-
return SchemaRDD(self._ssql_ctx.table(tableName), self)
608-
609-
def cacheTable(tableName):
610-
"""
611-
Caches the specified table in-memory.
612-
"""
613-
self._ssql_ctx.cacheTable(tableName)
614-
615-
def uncacheTable(tableName):
616-
"""
617-
Removes the specified table from the in-memory cache.
618-
"""
619-
self._ssql_ctx.uncacheTable(tableName)
620-
621-
class HiveContext(SQLContext):
622-
"""
623-
An instance of the Spark SQL execution engine that integrates with data stored in Hive.
624-
Configuration for Hive is read from hive-site.xml on the classpath. It supports running both SQL
625-
and HiveQL commands.
626-
"""
627-
628-
@property
629-
def _ssql_ctx(self):
630-
try:
631-
if not hasattr(self, '_scala_HiveContext'):
632-
self._scala_HiveContext = self._get_hive_ctx()
633-
return self._scala_HiveContext
634-
except Py4JError as e:
635-
raise Exception("You must build Spark with Hive. Export 'SPARK_HIVE=true' and run " \
636-
"sbt/sbt assembly" , e)
637-
638-
def _get_hive_ctx(self):
639-
return self._jvm.HiveContext(self._jsc.sc())
640-
641-
def hiveql(self, hqlQuery):
642-
"""
643-
Runs a query expressed in HiveQL, returning the result as a L{SchemaRDD}.
644-
"""
645-
return SchemaRDD(self._ssql_ctx.hiveql(hqlQuery), self)
646-
647-
def hql(self, hqlQuery):
648-
"""
649-
Runs a query expressed in HiveQL, returning the result as a L{SchemaRDD}.
650-
"""
651-
return self.hiveql(hqlQuery)
652-
653-
class LocalHiveContext(HiveContext):
654-
"""
655-
Starts up an instance of hive where metadata is stored locally. An in-process metadata data is
656-
created with data stored in ./metadata. Warehouse data is stored in in ./warehouse.
657-
658-
>>> import os
659-
>>> from pyspark.context import LocalHiveContext
660-
>>> hiveCtx = LocalHiveContext(sc)
661-
>>> try:
662-
... supress = hiveCtx.hql("DROP TABLE src")
663-
... except Exception:
664-
... pass
665-
>>> kv1 = os.path.join(os.environ["SPARK_HOME"], 'examples/src/main/resources/kv1.txt')
666-
>>> supress = hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
667-
>>> supress = hiveCtx.hql("LOAD DATA LOCAL INPATH '%s' INTO TABLE src" % kv1)
668-
>>> results = hiveCtx.hql("FROM src SELECT value").map(lambda r: int(r.value.split('_')[1]))
669-
>>> num = results.count()
670-
>>> reduce_sum = results.reduce(lambda x, y: x + y)
671-
>>> num
672-
500
673-
>>> reduce_sum
674-
130091
675-
"""
676-
677-
def _get_hive_ctx(self):
678-
return self._jvm.LocalHiveContext(self._jsc.sc())
679-
680-
class TestHiveContext(HiveContext):
681-
682-
def _get_hive_ctx(self):
683-
return self._jvm.TestHiveContext(self._jsc.sc())
684-
685463
def _test():
686464
import atexit
687465
import doctest

python/pyspark/rdd.py

Lines changed: 0 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1387,95 +1387,6 @@ def _jrdd(self):
13871387
def _is_pipelinable(self):
13881388
return not (self.is_cached or self.is_checkpointed)
13891389

1390-
class Row(dict):
1391-
"""
1392-
An extended L{dict} that takes a L{dict} in its constructor, and exposes those items as fields.
1393-
1394-
>>> r = Row({"hello" : "world", "foo" : "bar"})
1395-
>>> r.hello
1396-
'world'
1397-
>>> r.foo
1398-
'bar'
1399-
"""
1400-
1401-
def __init__(self, d):
1402-
d.update(self.__dict__)
1403-
self.__dict__ = d
1404-
dict.__init__(self, d)
1405-
1406-
class SchemaRDD(RDD):
1407-
"""
1408-
An RDD of Row objects that has an associated schema. The underlying JVM object is a SchemaRDD,
1409-
not a PythonRDD, so we can utilize the relational query api exposed by SparkSQL.
1410-
1411-
For normal L{RDD} operations (map, count, etc.) the L{SchemaRDD} is not operated on directly, as
1412-
it's underlying implementation is a RDD composed of Java objects. Instead it is converted to a
1413-
PythonRDD in the JVM, on which Python operations can be done.
1414-
"""
1415-
1416-
def __init__(self, jschema_rdd, sql_ctx):
1417-
self.sql_ctx = sql_ctx
1418-
self._sc = sql_ctx._sc
1419-
self._jschema_rdd = jschema_rdd
1420-
1421-
self.is_cached = False
1422-
self.is_checkpointed = False
1423-
self.ctx = self.sql_ctx._sc
1424-
self._jrdd_deserializer = self.ctx.serializer
1425-
1426-
@property
1427-
def _jrdd(self):
1428-
"""
1429-
Lazy evaluation of PythonRDD object. Only done when a user calls methods defined by the
1430-
L{RDD} super class (map, count, etc.).
1431-
"""
1432-
return self.toPython()._jrdd
1433-
1434-
@property
1435-
def _id(self):
1436-
return self._jrdd.id()
1437-
1438-
def saveAsParquetFile(self, path):
1439-
"""
1440-
Saves the contents of this L{SchemaRDD} as a parquet file, preserving the schema. Files
1441-
that are written out using this method can be read back in as a SchemaRDD using the
1442-
L{SQLContext.parquetFile} method.
1443-
1444-
>>> from pyspark.context import SQLContext
1445-
>>> sqlCtx = SQLContext(sc)
1446-
>>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"},
1447-
... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
1448-
>>> srdd = sqlCtx.inferSchema(rdd)
1449-
>>> srdd.saveAsParquetFile("/tmp/test.parquet")
1450-
>>> srdd2 = sqlCtx.parquetFile("/tmp/test.parquet")
1451-
>>> srdd2.collect() == srdd.collect()
1452-
True
1453-
"""
1454-
self._jschema_rdd.saveAsParquetFile(path)
1455-
1456-
def registerAsTable(self, name):
1457-
"""
1458-
Registers this RDD as a temporary table using the given name. The lifetime of this temporary
1459-
table is tied to the L{SQLContext} that was used to create this SchemaRDD.
1460-
1461-
>>> from pyspark.context import SQLContext
1462-
>>> sqlCtx = SQLContext(sc)
1463-
>>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"},
1464-
... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
1465-
>>> srdd = sqlCtx.inferSchema(rdd)
1466-
>>> srdd.registerAsTable("test")
1467-
>>> srdd2 = sqlCtx.sql("select * from test")
1468-
>>> srdd.collect() == srdd2.collect()
1469-
True
1470-
"""
1471-
self._jschema_rdd.registerAsTable(name)
1472-
1473-
def toPython(self):
1474-
jrdd = self._jschema_rdd.javaToPython()
1475-
# TODO: This is inefficient, we should construct the Python Row object
1476-
# in Java land in the javaToPython function. May require a custom
1477-
# pickle serializer in Pyrolite
1478-
return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d))
14791390

14801391
def _test():
14811392
import doctest

0 commit comments

Comments
 (0)