Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 445647a

Browse files
Davies Liupwendell
authored andcommitted
[SPARK-8021] [SQL] [PYSPARK] make Python read/write API consistent with Scala
add schema()/format()/options() for reader, add mode()/format()/options()/partitionBy() for writer cc rxin yhuai pwendell Author: Davies Liu <[email protected]> Closes apache#6578 from davies/readwrite and squashes the following commits: 720d293 [Davies Liu] address comments b65dfa2 [Davies Liu] Update readwriter.py 1299ab6 [Davies Liu] make Python API consistent with Scala
1 parent 0f80990 commit 445647a

File tree

1 file changed

+94
-27
lines changed

1 file changed

+94
-27
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 94 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,39 @@ def _df(self, jdf):
4343
from pyspark.sql.dataframe import DataFrame
4444
return DataFrame(jdf, self._sqlContext)
4545

46+
@since(1.4)
47+
def format(self, source):
48+
"""
49+
Specifies the input data source format.
50+
"""
51+
self._jreader = self._jreader.format(source)
52+
return self
53+
54+
@since(1.4)
55+
def schema(self, schema):
56+
"""
57+
Specifies the input schema. Some data sources (e.g. JSON) can
58+
infer the input schema automatically from data. By specifying
59+
the schema here, the underlying data source can skip the schema
60+
inference step, and thus speed up data loading.
61+
62+
:param schema: a StructType object
63+
"""
64+
if not isinstance(schema, StructType):
65+
raise TypeError("schema should be StructType")
66+
jschema = self._sqlContext._ssql_ctx.parseDataType(schema.json())
67+
self._jreader = self._jreader.schema(jschema)
68+
return self
69+
70+
@since(1.4)
71+
def options(self, **options):
72+
"""
73+
Adds input options for the underlying data source.
74+
"""
75+
for k in options:
76+
self._jreader = self._jreader.option(k, options[k])
77+
return self
78+
4679
@since(1.4)
4780
def load(self, path=None, format=None, schema=None, **options):
4881
"""Loads data from a data source and returns it as a :class`DataFrame`.
@@ -52,20 +85,15 @@ def load(self, path=None, format=None, schema=None, **options):
5285
:param schema: optional :class:`StructType` for the input schema.
5386
:param options: all other string options
5487
"""
55-
jreader = self._jreader
5688
if format is not None:
57-
jreader = jreader.format(format)
89+
self.format(format)
5890
if schema is not None:
59-
if not isinstance(schema, StructType):
60-
raise TypeError("schema should be StructType")
61-
jschema = self._sqlContext._ssql_ctx.parseDataType(schema.json())
62-
jreader = jreader.schema(jschema)
63-
for k in options:
64-
jreader = jreader.option(k, options[k])
91+
self.schema(schema)
92+
self.options(**options)
6593
if path is not None:
66-
return self._df(jreader.load(path))
94+
return self._df(self._jreader.load(path))
6795
else:
68-
return self._df(jreader.load())
96+
return self._df(self._jreader.load())
6997

7098
@since(1.4)
7199
def json(self, path, schema=None):
@@ -105,12 +133,9 @@ def json(self, path, schema=None):
105133
| |-- field5: array (nullable = true)
106134
| | |-- element: integer (containsNull = true)
107135
"""
108-
if schema is None:
109-
jdf = self._jreader.json(path)
110-
else:
111-
jschema = self._sqlContext._ssql_ctx.parseDataType(schema.json())
112-
jdf = self._jreader.schema(jschema).json(path)
113-
return self._df(jdf)
136+
if schema is not None:
137+
self.schema(schema)
138+
return self._df(self._jreader.json(path))
114139

115140
@since(1.4)
116141
def table(self, tableName):
@@ -194,6 +219,51 @@ def __init__(self, df):
194219
self._sqlContext = df.sql_ctx
195220
self._jwrite = df._jdf.write()
196221

222+
@since(1.4)
223+
def mode(self, saveMode):
224+
"""
225+
Specifies the behavior when data or table already exists. Options include:
226+
227+
* `append`: Append contents of this :class:`DataFrame` to existing data.
228+
* `overwrite`: Overwrite existing data.
229+
* `error`: Throw an exception if data already exists.
230+
* `ignore`: Silently ignore this operation if data already exists.
231+
"""
232+
self._jwrite = self._jwrite.mode(saveMode)
233+
return self
234+
235+
@since(1.4)
236+
def format(self, source):
237+
"""
238+
Specifies the underlying output data source. Built-in options include
239+
"parquet", "json", etc.
240+
"""
241+
self._jwrite = self._jwrite.format(source)
242+
return self
243+
244+
@since(1.4)
245+
def options(self, **options):
246+
"""
247+
Adds output options for the underlying data source.
248+
"""
249+
for k in options:
250+
self._jwrite = self._jwrite.option(k, options[k])
251+
return self
252+
253+
@since(1.4)
254+
def partitionBy(self, *cols):
255+
"""
256+
Partitions the output by the given columns on the file system.
257+
If specified, the output is laid out on the file system similar
258+
to Hive's partitioning scheme.
259+
260+
:param cols: name of columns
261+
"""
262+
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
263+
cols = cols[0]
264+
self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols))
265+
return self
266+
197267
@since(1.4)
198268
def save(self, path=None, format=None, mode="error", **options):
199269
"""
@@ -216,16 +286,15 @@ def save(self, path=None, format=None, mode="error", **options):
216286
:param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
217287
:param options: all other string options
218288
"""
219-
jwrite = self._jwrite.mode(mode)
289+
self.mode(mode).options(**options)
220290
if format is not None:
221-
jwrite = jwrite.format(format)
222-
for k in options:
223-
jwrite = jwrite.option(k, options[k])
291+
self.format(format)
224292
if path is None:
225-
jwrite.save()
293+
self._jwrite.save()
226294
else:
227-
jwrite.save(path)
295+
self._jwrite.save(path)
228296

297+
@since(1.4)
229298
def insertInto(self, tableName, overwrite=False):
230299
"""
231300
Inserts the content of the :class:`DataFrame` to the specified table.
@@ -256,12 +325,10 @@ def saveAsTable(self, name, format=None, mode="error", **options):
256325
:param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
257326
:param options: all other string options
258327
"""
259-
jwrite = self._jwrite.mode(mode)
328+
self.mode(mode).options(**options)
260329
if format is not None:
261-
jwrite = jwrite.format(format)
262-
for k in options:
263-
jwrite = jwrite.option(k, options[k])
264-
return jwrite.saveAsTable(name)
330+
self.format(format)
331+
return self._jwrite.saveAsTable(name)
265332

266333
@since(1.4)
267334
def json(self, path, mode="error"):

0 commit comments

Comments
 (0)