Skip to content

Commit 889eb25

Browse files
committed
Minor refactoring and add partitionBy to save, saveAsTable, and parquet.
1 parent 7fbc24b commit 889eb25

File tree

1 file changed

+14
-29
lines changed

1 file changed

+14
-29
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ def mode(self, saveMode):
218218
219219
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
220220
"""
221-
self._jwrite = self._jwrite.mode(saveMode)
221+
# At the JVM side, the default value of mode is already set to "error".
222+
# So, if the given saveMode is None, we will not call JVM-side's mode method.
223+
if saveMode is not None:
224+
self._jwrite = self._jwrite.mode(saveMode)
222225
return self
223226

224227
@since(1.4)
@@ -253,11 +256,12 @@ def partitionBy(self, *cols):
253256
"""
254257
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
255258
cols = cols[0]
256-
self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols))
259+
if len(cols) > 0:
260+
self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols))
257261
return self
258262

259263
@since(1.4)
260-
def save(self, path=None, format=None, mode=None, **options):
264+
def save(self, path=None, format=None, mode=None, partitionBy=(), **options):
261265
"""Saves the contents of the :class:`DataFrame` to a data source.
262266
263267
The data source is specified by the ``format`` and a set of ``options``.
@@ -276,11 +280,7 @@ def save(self, path=None, format=None, mode=None, **options):
276280
277281
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
278282
"""
279-
if mode is not None:
280-
# At the JVM side, the default value of mode is already set to "error".
281-
# We will only call mode method if the provided mode is not None.
282-
self.mode(mode)
283-
self.options(**options)
283+
self.partitionBy(partitionBy).mode(mode).options(**options)
284284
if format is not None:
285285
self.format(format)
286286
if path is None:
@@ -300,7 +300,7 @@ def insertInto(self, tableName, overwrite=False):
300300
self._jwrite.mode("overwrite" if overwrite else "append").insertInto(tableName)
301301

302302
@since(1.4)
303-
def saveAsTable(self, name, format=None, mode=None, **options):
303+
def saveAsTable(self, name, format=None, mode=None, partitionBy=(), **options):
304304
"""Saves the content of the :class:`DataFrame` as the specified table.
305305
306306
In the case the table already exists, behavior of this function depends on the
@@ -318,11 +318,7 @@ def saveAsTable(self, name, format=None, mode=None, **options):
318318
:param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
319319
:param options: all other string options
320320
"""
321-
if mode is not None:
322-
# At the JVM side, the default value of mode is already set to "error".
323-
# We will only call mode method if the provided mode is not None.
324-
self.mode(mode)
325-
self.options(**options)
321+
self.partitionBy(partitionBy).mode(mode).options(**options)
326322
if format is not None:
327323
self.format(format)
328324
self._jwrite.saveAsTable(name)
@@ -341,14 +337,10 @@ def json(self, path, mode=None):
341337
342338
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
343339
"""
344-
if mode is not None:
345-
# At the JVM side, the default value of mode is already set to "error".
346-
# We will only call mode method if the provided mode is not None.
347-
self.mode(mode)
348-
self._jwrite.json(path)
340+
self._jwrite.mode(mode).json(path)
349341

350342
@since(1.4)
351-
def parquet(self, path, mode=None):
343+
def parquet(self, path, mode=None, partitionBy=()):
352344
"""Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
353345
354346
:param path: the path in any Hadoop supported file system
@@ -361,10 +353,7 @@ def parquet(self, path, mode=None):
361353
362354
>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
363355
"""
364-
if mode is not None:
365-
# At the JVM side, the default value of mode is already set to "error".
366-
# We will only call mode method if the provided mode is not None.
367-
self.mode(mode)
356+
self.partitionBy(partitionBy).mode(mode)
368357
self._jwrite.parquet(path)
369358

370359
@since(1.4)
@@ -386,14 +375,10 @@ def jdbc(self, url, table, mode=None, properties={}):
386375
arbitrary string tag/value. Normally at least a
387376
"user" and "password" property should be included.
388377
"""
389-
if mode is not None:
390-
# At the JVM side, the default value of mode is already set to "error".
391-
# We will only call mode method if the provided mode is not None.
392-
self.mode(mode)
393378
jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
394379
for k in properties:
395380
jprop.setProperty(k, properties[k])
396-
self._jwrite.jdbc(url, table, jprop)
381+
self._jwrite.mode(mode).jdbc(url, table, jprop)
397382

398383

399384
def _test():

0 commit comments

Comments
 (0)