Skip to content

Commit c0fb1c6

Browse files
committed
more working
1 parent 043ca85 commit c0fb1c6

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

python/pyspark/context.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
PairDeserializer
3333
from pyspark.storagelevel import StorageLevel
3434
from pyspark import rdd
35-
from pyspark.rdd import RDD
35+
from pyspark.rdd import RDD, SchemaRDD
3636

3737
from py4j.java_collections import ListConverter
3838

@@ -462,6 +462,23 @@ def sparkUser(self):
462462
"""
463463
return self._jsc.sc().sparkUser()
464464

465+
class SQLContext:
466+
467+
def __init__(self, sparkContext):
468+
self._sc = sparkContext
469+
self._jsc = self._sc._jsc
470+
self._jvm = self._sc._jvm
471+
self._jsql_ctx = self._jvm.JavaSQLContext(self._jsc)
472+
473+
def sql(self, sqlQuery):
474+
return SchemaRDD(self._jsql_ctx.sql(sqlQuery), self)
475+
476+
def applySchema(self, rdd):
477+
jrdd = self._sc._pythonToJava(rdd._jrdd)
478+
srdd = self._jsql_ctx.applySchema(jrdd)
479+
return SchemaRDD(srdd, self)
480+
481+
465482
def _test():
466483
import atexit
467484
import doctest

python/pyspark/rdd.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1389,13 +1389,17 @@ def _is_pipelinable(self):
13891389

13901390
class SchemaRDD:
13911391

1392-
def __init__(self, pyRDD):
1393-
self._pyRDD = pyRDD
1394-
self.ctx = pyRDD.ctx
1395-
self.sql_ctx = self.ctx._jvm.JavaSQLContext(self.ctx._jsc)
1396-
self._jrdd = self.ctx._pythonToJava(pyRDD._jrdd)
1397-
self._srdd = self.sql_ctx.applySchema(self._jrdd)
1392+
def __init__(self, jschema_rdd, sql_ctx):
1393+
self.sql_ctx = sql_ctx
1394+
self._sc = sql_ctx._sc
1395+
self._jschema_rdd = jschema_rdd
13981396

1397+
def registerAsTable(self, name):
1398+
self._jschema_rdd.registerAsTable(name)
1399+
1400+
def toPython(self):
1401+
jrdd = self._sc._javaToPython(self._jschema_rdd)
1402+
return RDD(jrdd, self._sc, self._sc.serializer)
13991403

14001404
def _test():
14011405
import doctest

0 commit comments

Comments
 (0)