Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,11 @@ def partitionBy(self, *cols):
"""
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
cols = cols[0]
if len(cols) > 0:
self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols))
self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols))
return self

@since(1.4)
def save(self, path=None, format=None, mode=None, partitionBy=(), **options):
def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
"""Saves the contents of the :class:`DataFrame` to a data source.

The data source is specified by the ``format`` and a set of ``options``.
Expand All @@ -281,7 +280,9 @@ def save(self, path=None, format=None, mode=None, partitionBy=(), **options):

>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.partitionBy(partitionBy).mode(mode).options(**options)
self.mode(mode).options(**options)
if partitionBy is not None:
self.partitionBy(partitionBy)
if format is not None:
self.format(format)
if path is None:
Expand All @@ -301,7 +302,7 @@ def insertInto(self, tableName, overwrite=False):
self._jwrite.mode("overwrite" if overwrite else "append").insertInto(tableName)

@since(1.4)
def saveAsTable(self, name, format=None, mode=None, partitionBy=(), **options):
def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options):
"""Saves the content of the :class:`DataFrame` as the specified table.

In the case the table already exists, behavior of this function depends on the
Expand All @@ -320,7 +321,9 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=(), **options):
:param partitionBy: names of partitioning columns
:param options: all other string options
"""
self.partitionBy(partitionBy).mode(mode).options(**options)
self.mode(mode).options(**options)
if partitionBy is not None:
self.partitionBy(partitionBy)
if format is not None:
self.format(format)
self._jwrite.saveAsTable(name)
Expand All @@ -342,7 +345,7 @@ def json(self, path, mode=None):
self.mode(mode)._jwrite.json(path)

@since(1.4)
def parquet(self, path, mode=None, partitionBy=()):
def parquet(self, path, mode=None, partitionBy=None):
"""Saves the content of the :class:`DataFrame` in Parquet format at the specified path.

:param path: the path in any Hadoop supported file system
Expand All @@ -356,7 +359,9 @@ def parquet(self, path, mode=None, partitionBy=()):

>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.partitionBy(partitionBy).mode(mode)
self.mode(mode)
if partitionBy is not None:
self.partitionBy(partitionBy)
self._jwrite.parquet(path)

@since(1.4)
Expand Down