Skip to content

Commit 9f2d877

Browse files
committed
Address Burak's comments
1 parent 7d62de4 commit 9f2d877

File tree

5 files changed

+24
-13
lines changed

5 files changed

+24
-13
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou
374374

375375
- *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
376376

377-
- *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be same as the Append mode.
377+
- *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode.
378378

379379
Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes).
380380

python/pyspark/sql/streaming.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,9 @@ def outputMode(self, outputMode):
665665
the sink
666666
* `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
667667
every time these is some updates
668+
* `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
669+
written to the sink every time there are some updates. If the query doesn't contain
670+
aggregations, it will be equivalent to the `append` mode.
668671
669672
.. note:: Experimental.
670673
@@ -768,7 +771,8 @@ def trigger(self, processingTime=None):
768771

769772
@ignore_unicode_prefix
770773
@since(2.0)
771-
def start(self, path=None, format=None, partitionBy=None, queryName=None, **options):
774+
def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None,
775+
**options):
772776
"""Streams the contents of the :class:`DataFrame` to a data source.
773777
774778
The data source is specified by the ``format`` and a set of ``options``.
@@ -779,15 +783,20 @@ def start(self, path=None, format=None, partitionBy=None, queryName=None, **opti
779783
780784
:param path: the path in a Hadoop supported file system
781785
:param format: the format used to save
782-
783-
* ``append``: Append contents of this :class:`DataFrame` to existing data.
784-
* ``overwrite``: Overwrite existing data.
785-
* ``ignore``: Silently ignore this operation if data already exists.
786-
* ``error`` (default case): Throw an exception if data already exists.
786+
:param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a
787+
streaming sink. Options include:
788+
789+
* `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the
790+
sink
791+
* `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
792+
every time these is some updates
793+
* `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
794+
written to the sink every time there are some updates. If the query doesn't contain
795+
aggregations, it will be equivalent to the `append` mode.
787796
:param partitionBy: names of partitioning columns
788797
:param queryName: unique name for the query
789798
:param options: All other string options. You may want to provide a `checkpointLocation`
790-
for most streams, however it is not required for a `memory` stream.
799+
for most streams, however it is not required for a `memory` stream.
791800
792801
>>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
793802
>>> sq.isActive
@@ -798,14 +807,16 @@ def start(self, path=None, format=None, partitionBy=None, queryName=None, **opti
798807
>>> sq.isActive
799808
False
800809
>>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
801-
... queryName='that_query', format='memory')
810+
... queryName='that_query', outputMode="append", format='memory')
802811
>>> sq.name
803812
u'that_query'
804813
>>> sq.isActive
805814
True
806815
>>> sq.stop()
807816
"""
808817
self.options(**options)
818+
if outputMode is not None:
819+
self.outputMode(outputMode)
809820
if partitionBy is not None:
810821
self.partitionBy(partitionBy)
811822
if format is not None:

sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public static OutputMode Complete() {
5858
/**
5959
* OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will
6060
* be written to the sink every time there are some updates. If the query doesn't contain
61-
* aggregations, it will be same as the `Append` mode.
61+
* aggregations, it will be equivalent to the `Append` mode.
6262
*
6363
* @since 2.1.1
6464
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ private[sql] object InternalOutputModes {
4141
/**
4242
* OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
4343
* written to the sink every time these is some updates. If the query doesn't contain
44-
* aggregations, it will be same as the `Append` mode.
44+
* aggregations, it will be equivalent to the `Append` mode.
4545
*/
4646
case object Update extends OutputMode
4747
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
4646
* to the sink every time these is some updates
4747
* - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset
4848
* will be written to the sink every time there are some updates. If
49-
* the query doesn't contain aggregations, it will be same as the
49+
* the query doesn't contain aggregations, it will be equivalent to the
5050
* `OutputMode.Append()` mode.
5151
*
5252
* @since 2.0.0
@@ -64,7 +64,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
6464
* every time these is some updates
6565
* - `update`: only the rows that were updated in the streaming DataFrame/Dataset will
6666
* be written to the sink every time there are some updates. If the query doesn't
67-
* contain aggregations, it will be same as the `append` mode.
67+
* contain aggregations, it will be equivalent to the `append` mode.
6868
* @since 2.0.0
6969
*/
7070
def outputMode(outputMode: String): DataStreamWriter[T] = {

0 commit comments

Comments
 (0)