From 0a425514a1dd7850a01c4dbf58fe6f14f57cdb7a Mon Sep 17 00:00:00 2001 From: Michael Vogiatzis Date: Sat, 4 Jul 2015 13:37:56 +0100 Subject: [PATCH 1/4] Added important updateStateByKey details Runs for *all* existing keys and returning "None" will remove the key-value pair. --- docs/streaming-programming-guide.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index e72d5580dae55..39c081717502a 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -928,7 +928,9 @@ runningCounts = pairs.updateStateByKey(updateFunction) The update function will be called for each word, with `newValues` having a sequence of 1's (from the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete Python code, take a look at the example -[stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py). +[stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py). + +Spark will run the updateStateByKey for all existing keys, regardless of whether they have new data in a batch or not. If updateStateByKey returns None then the key-value pair will be eliminated. From c2656f9b632debb4e414c3383fa42161a256e945 Mon Sep 17 00:00:00 2001 From: Michael Vogiatzis Date: Mon, 6 Jul 2015 10:33:49 +0100 Subject: [PATCH 2/4] Moved description farther up Moved the update description up, before the example. Maybe it's worth mentioning in the example that "Existing words with no new values will also be called by updateStateByKey until they return None" --- docs/streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 39c081717502a..ac9f875a1dcf3 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -854,6 +854,8 @@ it with new information. To use this, you will have to do two steps. 1. Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream. +Spark will run the `updateStateByKey` operation for all existing keys, regardless of whether they have new data in a batch or not. If `updateStateByKey` returns None then the key-value pair will be eliminated. + Let's illustrate this with an example. Say you want to maintain a running count of each word seen in a text data stream. Here, the running count is the state and it is an integer. We define the update function as: @@ -930,8 +932,6 @@ the `(word, 1)` pairs) and the `runningCount` having the previous count. For the Python code, take a look at the example [stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py). -Spark will run the updateStateByKey for all existing keys, regardless of whether they have new data in a batch or not. If updateStateByKey returns None then the key-value pair will be eliminated. - From 00283ede636d838d793cb5b81dc68f11021ab8b1 Mon Sep 17 00:00:00 2001 From: Michael Vogiatzis Date: Wed, 8 Jul 2015 13:58:41 +0100 Subject: [PATCH 3/4] Removed space --- docs/streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index ac9f875a1dcf3..8617981b54dc5 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -930,7 +930,7 @@ runningCounts = pairs.updateStateByKey(updateFunction) The update function will be called for each word, with `newValues` having a sequence of 1's (from the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete Python code, take a look at the example -[stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py). +[stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py). From e7a2946769d1f3edb3370b517c4007f5f05821a5 Mon Sep 17 00:00:00 2001 From: Michael Vogiatzis Date: Thu, 9 Jul 2015 09:27:51 +0100 Subject: [PATCH 4/4] Updated updateStateByKey text Replaced operation with update function and None with code-font None --- docs/streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 8617981b54dc5..d83899f44e9c6 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -854,7 +854,7 @@ it with new information. To use this, you will have to do two steps. 1. Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream. -Spark will run the `updateStateByKey` operation for all existing keys, regardless of whether they have new data in a batch or not. If `updateStateByKey` returns None then the key-value pair will be eliminated. +Spark will run the `updateStateByKey` update function for all existing keys, regardless of whether they have new data in a batch or not. If the update function returns `None` then the key-value pair will be eliminated. Let's illustrate this with an example. Say you want to maintain a running count of each word seen in a text data stream. Here, the running count is the state and it is an integer. We