-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19074][SS][DOCS] Updated Structured Streaming Programming Guide for update mode and source/sink options #16468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Binary file not shown.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou | |
|
|
||
| - *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change. | ||
|
|
||
| - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (not available yet in Spark 2.0). Note that this is different from the Complete Mode in that this mode does not output the rows that are not changed. | ||
| - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. | ||
|
|
||
| Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes). | ||
|
|
||
|
|
@@ -424,7 +424,7 @@ Streaming DataFrames can be created through the `DataStreamReader` interface | |
| ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamReader.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs) | ||
| returned by `SparkSession.readStream()`. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc. | ||
|
|
||
| #### Data Sources | ||
| #### Input Sources | ||
| In Spark 2.0, there are a few built-in sources. | ||
|
|
||
| - **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations. | ||
|
|
@@ -433,6 +433,54 @@ In Spark 2.0, there are a few built-in sources. | |
|
|
||
| - **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees. | ||
|
|
||
| Some sources are not fault-tolerant because they do not guarantee that data can be replayed using | ||
| checkpointed offsets after a failure. See the earlier section on | ||
| [fault-tolerance semantics](#fault-tolerance-semantics). | ||
| Here are the details of all the sources in Spark. | ||
|
|
||
| <table class="table"> | ||
| <tr> | ||
| <th>Source</th> | ||
| <th>Options</th> | ||
| <th>Fault-tolerant</th> | ||
| <th>Notes</th> | ||
| </tr> | ||
| <tr> | ||
| <td><b>File source</b></td> | ||
| <td> | ||
| <code>path</code>: path to the input directory, and common to all file formats. | ||
| <br/><br/> | ||
| For file-format-specific options, see the related methods in <code>DataStreamReader</code> | ||
| (<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>). | ||
| E.g. for "parquet" format options see <code>DataStreamReader.parquet()</code></td> | ||
| <td>Yes</td> | ||
| <td>Supports glob paths, but does not support multiple comma-separated paths/globs.</td> | ||
| </tr> | ||
| <tr> | ||
| <td><b>Socket Source</b></td> | ||
| <td> | ||
| <code>host</code>: host to connect to, must be specified<br/> | ||
| <code>port</code>: port to connect to, must be specified | ||
| </td> | ||
| <td>No</td> | ||
| <td></td> | ||
| </tr> | ||
| <tr> | ||
| <td><b>Kafka Source</b></td> | ||
| <td> | ||
| See the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a>. | ||
| </td> | ||
| <td>Yes</td> | ||
| <td></td> | ||
| </tr> | ||
| <tr> | ||
| <td></td> | ||
| <td></td> | ||
| <td></td> | ||
| <td></td> | ||
| </tr> | ||
| </table> | ||
|
|
||
| Here are some examples. | ||
|
|
||
| <div class="codetabs"> | ||
|
|
@@ -753,34 +801,47 @@ windowedCounts = words | |
|
|
||
| In this example, we are defining the watermark of the query on the value of the column "timestamp", | ||
| and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query | ||
| is run in Append output mode (discussed later in [Output Modes](#output-modes) section), | ||
| the engine will track the current event time from the column "timestamp" and wait for additional | ||
| "10 minutes" in event time before finalizing the windowed counts and adding them to the Result Table. | ||
| is run in Update output mode (discussed later in [Output Modes](#output-modes) section), | ||
| the engine will keep updating counts of a window in the Resule Table until the window is older | ||
| than the watermark, which lags behind the current event time in column "timestamp" by 10 minutes. | ||
| Here is an illustration. | ||
|
|
||
|  | ||
|  | ||
|
|
||
| As shown in the illustration, the maximum event time tracked by the engine is the | ||
| *blue dashed line*, and the watermark set as `(max event time - '10 mins')` | ||
| at the beginning of every trigger is the red line For example, when the engine observes the data | ||
| `(12:14, dog)`, it sets the watermark for the next trigger as `12:04`. | ||
| For the window `12:00 - 12:10`, the partial counts are maintained as internal state while the system | ||
| is waiting for late data. After the system finds data (i.e. `(12:21, owl)`) such that the | ||
| watermark exceeds 12:10, the partial count is finalized and appended to the table. This count will | ||
| not change any further as all "too-late" data older than 12:10 will be ignored. | ||
|
|
||
| Note that in Append output mode, the system has to wait for "late threshold" time | ||
| before it can output the aggregation of a window. This may not be ideal if data can be very late, | ||
| (say 1 day) and you like to have partial counts without waiting for a day. In future, we will add | ||
| Update output mode which would allows every update to aggregates to be written to sink every trigger. | ||
| This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late | ||
| data to be counted. For example, the data `(12:09, cat)` is out of order and late, and it falls in | ||
| windows `12:05 - 12:15` and `12:10 - 12:20`. Since, it is still ahead of the watermark `12:04` in | ||
| the trigger, the engine still maintains the intermediate counts as state and correctly updates the | ||
| counts of the related windows. However, when the watermark is updated to 12:11, the intermediate | ||
| state for window `(12:00 - 12:10)` is cleared, and all subsequent data (e.g. `(12:04, donkey)`) | ||
| is considered "too late" and therefore ignored. Note that after every trigger, | ||
| the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by | ||
| the Update mode. | ||
|
|
||
| Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires. To work | ||
| with them, we have also support Append Mode, where only the *final counts* are written to sink. | ||
| This is illustrated below. | ||
|
|
||
|  | ||
|
|
||
| Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. | ||
| However, the partial counts are not updated to the Result Table and not written to sink. The engine | ||
| waits for "10 mins" for late date to be counted, | ||
| then drops intermediate state of a window < watermark, and appends the final | ||
| counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is | ||
| appended to the Result Table only after the watermark is updated to `12:11`. | ||
|
|
||
| **Conditions for watermarking to clean aggregation state** | ||
| It is important to note that the following conditions must be satisfied for the watermarking to | ||
| clean the state in aggregation queries *(as of Spark 2.1, subject to change in the future)*. | ||
| clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*. | ||
|
|
||
| - **Output mode must be Append.** Complete mode requires all aggregate data to be preserved, and hence | ||
| cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) section | ||
| for detailed explanation of the semantics of each output mode. | ||
| - **Output mode must be Append or Update.** Complete mode requires all aggregate data to be preserved, | ||
| and hence cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) | ||
| section for detailed explanation of the semantics of each output mode. | ||
|
|
||
| - The aggregation must have either the event-time column, or a `window` on the event-time column. | ||
|
|
||
|
|
@@ -835,8 +896,9 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat | |
| </div> | ||
|
|
||
| ### Unsupported Operations | ||
| However, note that all of the operations applicable on static DataFrames/Datasets are not supported in streaming DataFrames/Datasets yet. While some of these unsupported operations will be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting is not supported on the input streaming Dataset, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently. As of Spark 2.0, some of the unsupported operations are as follows | ||
|
|
||
| There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. | ||
| Some of them are as follows. | ||
|
|
||
| - Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets. | ||
|
|
||
| - Limit and take first N rows are not supported on streaming Datasets. | ||
|
|
@@ -863,7 +925,12 @@ In addition, there are some Dataset methods that will not work on streaming Data | |
|
|
||
| - `show()` - Instead use the console sink (see next section). | ||
|
|
||
| If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets". | ||
| If you try any of these operations, you will see an `AnalysisException` like "operation XYZ is not supported with streaming DataFrames/Datasets". | ||
| While some of them may be supported in future releases of Spark, | ||
| there are others which are fundamentally hard to implement on streaming data efficiently. | ||
| For example, sorting on the input stream is not supported, as it requires keeping | ||
| track of all the data received in the stream. This is therefore fundamentally hard to execute | ||
| efficiently. | ||
|
|
||
| ## Starting Streaming Queries | ||
| Once you have defined the final result DataFrame/Dataset, all that is left is for you start the streaming computation. To do that, you have to use the `DataStreamWriter` | ||
|
|
@@ -894,11 +961,11 @@ fault-tolerant sink). For example, queries with only `select`, | |
| - **Complete mode** - The whole Result Table will be outputted to the sink after every trigger. | ||
| This is supported for aggregation queries. | ||
|
|
||
| - **Update mode** - (*not available in Spark 2.1*) Only the rows in the Result Table that were | ||
| - **Update mode** - (*Available since Spark 2.1.1*) Only the rows in the Result Table that were | ||
| updated since the last trigger will be outputted to the sink. | ||
| More information to be added in future releases. | ||
|
|
||
| Different types of streaming queries support different output modes. | ||
| Different types of streaming queries support different output modes. | ||
| Here is the compatibility matrix. | ||
|
|
||
| <table class="table"> | ||
|
|
@@ -909,36 +976,38 @@ Here is the compatibility matrix. | |
| <th>Notes</th> | ||
| </tr> | ||
| <tr> | ||
| <td colspan="2" valign="middle"><br/>Queries without aggregation</td> | ||
| <td>Append</td> | ||
| <td> | ||
| Complete mode note supported as it is infeasible to keep all data in the Result Table. | ||
| <td colspan="2" style="vertical-align: middle;">Queries without aggregation</td> | ||
| <td style="vertical-align: middle;">Append</td> | ||
| <td style="vertical-align: middle;"> | ||
| Complete mode not supported as it is infeasible to keep all data in the Result Table. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td rowspan="2">Queries with aggregation</td> | ||
| <td>Aggregation on event-time with watermark</td> | ||
| <td>Append, Complete</td> | ||
| <td rowspan="2" style="vertical-align: middle;">Queries with aggregation</td> | ||
| <td style="vertical-align: middle;">Aggregation on event-time with watermark</td> | ||
| <td style="vertical-align: middle;">Append, Update, Complete</td> | ||
| <td> | ||
| Append mode uses watermark to drop old aggregation state. But the output of a | ||
| windowed aggregation is delayed the late threshold specified in `withWatermark()` as by | ||
| the modes semantics, rows can be added to the Result Table only once after they are | ||
| finalized (i.e. after watermark is crossed). See | ||
| <a href="#handling-late-data">Late Data</a> section for more details. | ||
| finalized (i.e. after watermark is crossed). See the | ||
| <a href="#handling-late-data-and-watermarking">Late Data</a> section for more details. | ||
| <br/><br/> | ||
| Update mode uses watermark to drop old aggregation state. | ||
| <br/><br/> | ||
| Complete mode does drop not old aggregation state since by definition this mode | ||
| preserves all data in the Result Table. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td>Other aggregations</td> | ||
| <td>Complete</td> | ||
| <td style="vertical-align: middle;">Other aggregations</td> | ||
| <td style="vertical-align: middle;">Complete, Update</td> | ||
| <td> | ||
| Since no watermark is defined (only defined in other category), | ||
| old aggregation state is not dropped. | ||
| <br/><br/> | ||
| Append mode is not supported as aggregates can update thus violating the semantics of | ||
| this mode. | ||
| <br/><br/> | ||
| Complete mode does drop not old aggregation state since by definition this mode | ||
| preserves all data in the Result Table. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
|
|
@@ -954,49 +1023,94 @@ There are a few types of built-in output sinks. | |
|
|
||
| - **File sink** - Stores the output to a directory. | ||
|
|
||
| {% highlight scala %} | ||
| writeStream | ||
| .format("parquet") // can be "orc", "json", "csv", etc. | ||
| .option("path", "path/to/destination/dir") | ||
| .start() | ||
| {% endhighlight %} | ||
|
|
||
| - **Foreach sink** - Runs arbitrary computation on the records in the output. See later in the section for more details. | ||
|
|
||
| {% highlight scala %} | ||
| writeStream | ||
| .foreach(...) | ||
| .start() | ||
| {% endhighlight %} | ||
|
|
||
| - **Console sink (for debugging)** - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver's memory after every trigger. | ||
|
|
||
| - **Memory sink (for debugging)** - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver's memory after every trigger. | ||
| {% highlight scala %} | ||
| writeStream | ||
| .format("console") | ||
| .start() | ||
| {% endhighlight %} | ||
|
|
||
| - **Memory sink (for debugging)** - The output is stored in memory as an in-memory table. | ||
| Both, Append and Complete output modes, are supported. This should be used for debugging purposes | ||
| on low data volumes as the entire output is collected and stored in the driver's memory. | ||
| Hence, use it with caution. | ||
|
|
||
| {% highlight scala %} | ||
| writeStream | ||
| .format("memory") | ||
| .queryName("tableName") | ||
| .start() | ||
| {% endhighlight %} | ||
|
|
||
| Here is a table of all the sinks, and the corresponding settings. | ||
| Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are | ||
| meant for debugging purposes only. See the earlier section on | ||
| [fault-tolerance semantics](#fault-tolerance-semantics). | ||
| Here are the details of all the sinks in Spark. | ||
|
|
||
| <table class="table"> | ||
| <tr> | ||
| <th>Sink</th> | ||
| <th>Supported Output Modes</th> | ||
| <th style="width:30%">Usage</th> | ||
| <th>Options</th> | ||
| <th>Fault-tolerant</th> | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto |
||
| <th>Notes</th> | ||
| </tr> | ||
| <tr> | ||
| <td><b>File Sink</b></td> | ||
| <td>Append</td> | ||
| <td><pre>writeStream<br/> .format("parquet")<br/> .option(<br/> "checkpointLocation",<br/> "path/to/checkpoint/dir")<br/> .option(<br/> "path",<br/> "path/to/destination/dir")<br/> .start()</pre></td> | ||
| <td> | ||
| <code>path</code>: path to the output directory, must be specified. | ||
| <code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max) | ||
| <br/> | ||
| <code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files(default: false) | ||
| <br/><br/> | ||
| For file-format-specific options, see the related methods in DataFrameWriter | ||
| (<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>). | ||
| E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code> | ||
| </td> | ||
| <td>Yes</td> | ||
| <td>Supports writes to partitioned tables. Partitioning by time may be useful.</td> | ||
| </tr> | ||
| <tr> | ||
| <td><b>Foreach Sink</b></td> | ||
| <td>All modes</td> | ||
| <td><pre>writeStream<br/> .foreach(...)<br/> .start()</pre></td> | ||
| <td>Append, Update, Compelete</td> | ||
| <td>None</td> | ||
| <td>Depends on ForeachWriter implementation</td> | ||
| <td>More details in the <a href="#using-foreach">next section</a></td> | ||
| </tr> | ||
| <tr> | ||
| <td><b>Console Sink</b></td> | ||
| <td>Append, Complete</td> | ||
| <td><pre>writeStream<br/> .format("console")<br/> .start()</pre></td> | ||
| <td>Append, Update, Complete</td> | ||
| <td> | ||
| <code>numRows</code>: Number of rows to print every trigger (default: 20) | ||
| <br/> | ||
| <code>truncate</code>: Whether to truncate the output if too long (default: true) | ||
| </td> | ||
| <td>No</td> | ||
| <td></td> | ||
| </tr> | ||
| <tr> | ||
| <td><b>Memory Sink</b></td> | ||
| <td>Append, Complete</td> | ||
| <td><pre>writeStream<br/> .format("memory")<br/> .queryName("table")<br/> .start()</pre></td> | ||
| <td>No</td> | ||
| <td>Saves the output data as a table, for interactive querying. Table name is the query name.</td> | ||
| <td>None</td> | ||
| <td>No. But in Complete Mode, restarted query will recreate the full table.</td> | ||
| <td>Table name is the query name.</td> | ||
| </tr> | ||
| <tr> | ||
| <td></td> | ||
|
|
@@ -1007,7 +1121,7 @@ Here is a table of all the sinks, and the corresponding settings. | |
| </tr> | ||
| </table> | ||
|
|
||
| Finally, you have to call `start()` to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let’s understand all this with a few examples. | ||
| Note that you have to call `start()` to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let’s understand all this with a few examples. | ||
|
|
||
|
|
||
| <div class="codetabs"> | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we link back to
#fault-tolerance-semanticshere?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont want to make the table heading a link, but I will do something.