Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Oct 4, 2018

What changes were proposed in this pull request?

Added

  • Python foreach
  • Scala, Java and Python foreachBatch
  • Multiple watermark policy
  • The semantics of what changes are allowed to the streaming between restarts.

How was this patch tested?

No tests

@tdas
Copy link
Contributor Author

tdas commented Oct 4, 2018

@zsxwing

@SparkQA
Copy link

SparkQA commented Oct 4, 2018

Test build #96934 has finished for PR 22627 at commit f61c13e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • In Scala, you have to extend the class ForeachWriter ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
  • In Java, you have to extend the class ForeachWriter ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).

@SparkQA
Copy link

SparkQA commented Oct 4, 2018

Test build #96936 has finished for PR 22627 at commit d16cfeb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented Oct 4, 2018

I think we should consider this for backport to 2.4 given that it documents new behaviour in 2.4 unless folks object.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. Left some comments.


{% highlight java %}
streamingDatasetOfString.writeStream.foreachBatch(
new VoidFunction2<Dataset<String>, long> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long -> Long. I noticed the current Java API actually is wrong. Submitted #22633 to fix it.

{% highlight java %}
streamingDatasetOfString.writeStream.foreachBatch(
new VoidFunction2<Dataset<String>, long> {
void call(Dataset<String> dataset, long batchId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

<div data-lang="java" markdown="1">

{% highlight java %}
streamingDatasetOfString.writeStream.foreachBatch(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: writeStream()

batchDF.cache()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.uncache()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncache() -> unpersist()


In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
{% highlight java %}
streamingDF.writeStream.foreach(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamingDF.writeStream -> streamingDatasetOfString.writeStream()

// Write row to connection. This method is NOT optional in Python.

def close(self, error):
// Close the connection. This method in optional in Python.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

by a unique tuple (partition_id, epoch_id) is guaranteed to have the same data.
Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit
data and achieve exactly-once guarantees. However, if the streaming query is being executed
in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think continuous processing will always reprocess the whole epoch after recovery and the user should be able to use (partition_id, epoch_id) to deduplicate. Is it not true?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Oct 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my understanding is right, continuous processing doesn't guarantee same epoch id processes same offset range of source (since it will process as many as possible just before it receives epoch marker), so epoch id can't be used for deduplicate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @HeartSaVioR
I continuous processing, when a epoch is reprocessed, the engine and offset tracking will ensure that the starting offset of that epoch is same as what was recorded with the previous epoch's offset, but the ending offset is not guaranteed to be the same as what was processed before the failure. It may so happen that the epoch E of partition P processed offsets X to Y (and the output of partition P was written), but the query failed before Y was recorded (as other partitions may not have completed epoch E). So after restarting, it may so happens that the re-executed epoch E may process offsets X to Y + Z before the epoch is incremented.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Thanks for your explanation.


- Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")`

- Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: path -> topic


- Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`.

- Changes in projections with same output schema is allowed: `sdf.selectExpr("stringColumn AS json").writeStream` to `sdf.select(to_json(...).as("json")).writeStream`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this example changes the schema. Right? From string to struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.


- *Changes in stateful operations*: Some operations in streaming queries need to maintain
state data in order to continuously update the result. Structured Streaming automatically checkpoints
the state data to fault-tolerant storage (for example, DBFS, AWS S3, Azure Blob storage) and restores it after restart.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove DBFS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced with HDFS


- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported
in streaming DataFrames because Spark does not support generating incremental plans in those cases.
Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal but methods like foreachBatch are sometimes rendered that way, and sometimes without code font like foreachBatch(). It's nice to back-tick-quote class and method names if you are doing another pass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I missed a few, and I want to fix them all.

// Open connection
}

def process(record: String) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return type Unit

1. The function takes a row as input.

{% highlight python %}
def processRow(row):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processRow -> process_row


{% highlight python %}
def foreachBatchFunction(df, epochId):
# Transform and write batchDF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 space indentation

<div data-lang="python" markdown="1">

{% highlight python %}
def foreachBatchFunction(df, epochId):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreachBatchFunction -> foreach_batch_function

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

epochId -> epoch_id

@tdas
Copy link
Contributor Author

tdas commented Oct 8, 2018

@holdenk yeah, i intend to backport this to 2.4

@SparkQA
Copy link

SparkQA commented Oct 8, 2018

Test build #97129 has finished for PR 22627 at commit 222bfc6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except some nits.

In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).

{% highlight scala %}
streamingDF.writeStream.foreach(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: streamingDF -> streamingDatasetOfString.


In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
{% highlight java %}
streamingDF.writeStream().foreach(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

<div data-lang="python" markdown="1">

{% highlight python %}
def foreachBatchFunction(df, epoch_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: foreachBatchFunction -> foreach_batch_function

# Transform and write batchDF
pass

streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: foreachBatchFunction -> foreach_batch_function

@zsxwing
Copy link
Member

zsxwing commented Oct 8, 2018

LGTM

@SparkQA
Copy link

SparkQA commented Oct 8, 2018

Test build #97131 has finished for PR 22627 at commit 9d60534.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Oct 8, 2018
…ultiple watermarks

## What changes were proposed in this pull request?

Added
- Python foreach
- Scala, Java and Python foreachBatch
- Multiple watermark policy
- The semantics of what changes are allowed to the streaming between restarts.

## How was this patch tested?
No tests

Closes #22627 from tdas/SPARK-25639.

Authored-by: Tathagata Das <[email protected]>
Signed-off-by: Tathagata Das <[email protected]>
(cherry picked from commit f9935a3)
Signed-off-by: Tathagata Das <[email protected]>
@asfgit asfgit closed this in f9935a3 Oct 8, 2018
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ultiple watermarks

## What changes were proposed in this pull request?

Added
- Python foreach
- Scala, Java and Python foreachBatch
- Multiple watermark policy
- The semantics of what changes are allowed to the streaming between restarts.

## How was this patch tested?
No tests

Closes apache#22627 from tdas/SPARK-25639.

Authored-by: Tathagata Das <[email protected]>
Signed-off-by: Tathagata Das <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants