Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ kramdown:

include:
- _static
- _modules

# These allow the documentation to be updated with newer releases
# of Spark, Scala, and Mesos.
Expand Down
3 changes: 0 additions & 3 deletions python/docs/pyspark.sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@ Module Context
.. automodule:: pyspark.sql
:members:
:undoc-members:
:show-inheritance:


pyspark.sql.types module
------------------------
.. automodule:: pyspark.sql.types
:members:
:undoc-members:
:show-inheritance:


pyspark.sql.functions module
------------------------
.. automodule:: pyspark.sql.functions
:members:
:undoc-members:
:show-inheritance:
182 changes: 34 additions & 148 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def registerFunction(self, name, f, returnType=StringType()):
>>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
[Row(c0=u'4')]

>>> from pyspark.sql.types import IntegerType
>>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
Expand Down Expand Up @@ -197,31 +198,6 @@ def inferSchema(self, rdd, samplingRatio=None):
>>> df = sqlCtx.inferSchema(rdd)
>>> df.collect()[0]
Row(field1=1, field2=u'row1')

>>> NestedRow = Row("f1", "f2")
>>> nestedRdd1 = sc.parallelize([
... NestedRow(array('i', [1, 2]), {"row1": 1.0}),
... NestedRow(array('i', [2, 3]), {"row2": 2.0})])
>>> df = sqlCtx.inferSchema(nestedRdd1)
>>> df.collect()
[Row(f1=[1, 2], f2={u'row1': 1.0}), ..., f2={u'row2': 2.0})]

>>> nestedRdd2 = sc.parallelize([
... NestedRow([[1, 2], [2, 3]], [1, 2]),
... NestedRow([[2, 3], [3, 4]], [2, 3])])
>>> df = sqlCtx.inferSchema(nestedRdd2)
>>> df.collect()
[Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), ..., f2=[2, 3])]

>>> from collections import namedtuple
>>> CustomRow = namedtuple('CustomRow', 'field1 field2')
>>> rdd = sc.parallelize(
... [CustomRow(field1=1, field2="row1"),
... CustomRow(field1=2, field2="row2"),
... CustomRow(field1=3, field2="row3")])
>>> df = sqlCtx.inferSchema(rdd)
>>> df.collect()[0]
Row(field1=1, field2=u'row1')
"""

if isinstance(rdd, DataFrame):
Expand Down Expand Up @@ -252,56 +228,8 @@ def applySchema(self, rdd, schema):
>>> schema = StructType([StructField("field1", IntegerType(), False),
... StructField("field2", StringType(), False)])
>>> df = sqlCtx.applySchema(rdd2, schema)
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.sql("SELECT * from table1")
>>> df2.collect()
[Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]

>>> from datetime import date, datetime
>>> rdd = sc.parallelize([(127, -128L, -32768, 32767, 2147483647L, 1.0,
... date(2010, 1, 1),
... datetime(2010, 1, 1, 1, 1, 1),
... {"a": 1}, (2,), [1, 2, 3], None)])
>>> schema = StructType([
... StructField("byte1", ByteType(), False),
... StructField("byte2", ByteType(), False),
... StructField("short1", ShortType(), False),
... StructField("short2", ShortType(), False),
... StructField("int1", IntegerType(), False),
... StructField("float1", FloatType(), False),
... StructField("date1", DateType(), False),
... StructField("time1", TimestampType(), False),
... StructField("map1",
... MapType(StringType(), IntegerType(), False), False),
... StructField("struct1",
... StructType([StructField("b", ShortType(), False)]), False),
... StructField("list1", ArrayType(ByteType(), False), False),
... StructField("null1", DoubleType(), True)])
>>> df = sqlCtx.applySchema(rdd, schema)
>>> results = df.map(
... lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1, x.date1,
... x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
>>> results.collect()[0] # doctest: +NORMALIZE_WHITESPACE
(127, -128, -32768, 32767, 2147483647, 1.0, datetime.date(2010, 1, 1),
datetime.datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)

>>> df.registerTempTable("table2")
>>> sqlCtx.sql(
... "SELECT byte1 - 1 AS byte1, byte2 + 1 AS byte2, " +
... "short1 + 1 AS short1, short2 - 1 AS short2, int1 - 1 AS int1, " +
... "float1 + 1.5 as float1 FROM table2").collect()
[Row(byte1=126, byte2=-127, short1=-32767, short2=32766, int1=2147483646, float1=2.5)]

>>> from pyspark.sql.types import _parse_schema_abstract, _infer_schema_type
>>> rdd = sc.parallelize([(127, -32768, 1.0,
... datetime(2010, 1, 1, 1, 1, 1),
... {"a": 1}, (2,), [1, 2, 3])])
>>> abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]"
>>> schema = _parse_schema_abstract(abstract)
>>> typedSchema = _infer_schema_type(rdd.first(), schema)
>>> df = sqlCtx.applySchema(rdd, typedSchema)
>>> df.collect()
[Row(byte1=127, short1=-32768, float1=1.0, time1=..., list1=[1, 2, 3])]
[Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
"""

if isinstance(rdd, DataFrame):
Expand Down Expand Up @@ -459,46 +387,28 @@ def jsonFile(self, path, schema=None, samplingRatio=1.0):
>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
>>> ofn = open(jsonFile, 'w')
>>> for json in jsonStrings:
... print>>ofn, json
>>> ofn.close()
>>> with open(jsonFile, 'w') as f:
... f.writelines(jsonStrings)
>>> df1 = sqlCtx.jsonFile(jsonFile)
>>> sqlCtx.registerDataFrameAsTable(df1, "table1")
>>> df2 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table1")
>>> for r in df2.collect():
... print r
Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')])
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)

>>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema)
>>> sqlCtx.registerDataFrameAsTable(df3, "table2")
>>> df4 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table2")
>>> for r in df4.collect():
... print r
Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')])
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
>>> df1.printSchema()
root
|-- field1: long (nullable = true)
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field4: long (nullable = true)

>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("field2", StringType(), True),
... StructField("field2", StringType()),
... StructField("field3",
... StructType([
... StructField("field5",
... ArrayType(IntegerType(), False), True)]), False)])
>>> df5 = sqlCtx.jsonFile(jsonFile, schema)
>>> sqlCtx.registerDataFrameAsTable(df5, "table3")
>>> df6 = sqlCtx.sql(
... "SELECT field2 AS f1, field3.field5 as f2, "
... "field3.field5[0] as f3 from table3")
>>> df6.collect()
[Row(f1=u'row1', f2=None, f3=None)...Row(f1=u'row3', f2=[], f3=None)]
... StructType([StructField("field5", ArrayType(IntegerType()))]))])
>>> df2 = sqlCtx.jsonFile(jsonFile, schema)
>>> df2.printSchema()
root
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field5: array (nullable = true)
| | |-- element: integer (containsNull = true)
"""
if schema is None:
df = self._ssql_ctx.jsonFile(path, samplingRatio)
Expand All @@ -517,48 +427,23 @@ def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
determine the schema.

>>> df1 = sqlCtx.jsonRDD(json)
>>> sqlCtx.registerDataFrameAsTable(df1, "table1")
>>> df2 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table1")
>>> for r in df2.collect():
... print r
Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')])
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)

>>> df3 = sqlCtx.jsonRDD(json, df1.schema)
>>> sqlCtx.registerDataFrameAsTable(df3, "table2")
>>> df4 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table2")
>>> for r in df4.collect():
... print r
Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')])
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
>>> df1.first()
Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)

>>> df2 = sqlCtx.jsonRDD(json, df1.schema)
>>> df2.first()
Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)

>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("field2", StringType(), True),
... StructField("field2", StringType()),
... StructField("field3",
... StructType([
... StructField("field5",
... ArrayType(IntegerType(), False), True)]), False)])
>>> df5 = sqlCtx.jsonRDD(json, schema)
>>> sqlCtx.registerDataFrameAsTable(df5, "table3")
>>> df6 = sqlCtx.sql(
... "SELECT field2 AS f1, field3.field5 as f2, "
... "field3.field5[0] as f3 from table3")
>>> df6.collect()
[Row(f1=u'row1', f2=None,...Row(f1=u'row3', f2=[], f3=None)]

>>> sqlCtx.jsonRDD(sc.parallelize(['{}',
... '{"key0": {"key1": "value1"}}'])).collect()
[Row(key0=None), Row(key0=Row(key1=u'value1'))]
>>> sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}',
... '{"key0": {"key1": "value1"}}'])).collect()
[Row(key0=None), Row(key0=Row(key1=u'value1'))]
... StructType([StructField("field5", ArrayType(IntegerType()))]))
... ])
>>> df3 = sqlCtx.jsonRDD(json, schema)
>>> df3.first()
Row(field2=u'row1', field3=Row(field5=None))

"""

def func(iterator):
Expand Down Expand Up @@ -848,7 +733,8 @@ def _test():
globs['jsonStrings'] = jsonStrings
globs['json'] = sc.parallelize(jsonStrings)
(failure_count, test_count) = doctest.testmod(
pyspark.sql.context, globs=globs, optionflags=doctest.ELLIPSIS)
pyspark.sql.context, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
globs['sc'].stop()
if failure_count:
exit(-1)
Expand Down
Loading