Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

NOTE: This covers the proposal of adding DataStreamWriterV2 API, which isn't discussed yet. I expect this PR should go over some discussion/review and these phases may change the PR significantly, so I'm marking the PR as "WIP" as of now. Once we agree about the direction, I'll change the state of the PR and fill the content of PR description.

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

@HeartSaVioR
Copy link
Contributor Author

cc. @rdblue @cloud-fan @brkyvz

Let me leave some background about the PR in current status...

I focused on adding the API similar to writeTo in Structured Streaming, as it looks to be a missing piece on SS for DSv2. At first I planned to go deep and touch logical plans as well, but unlike batch, SS is a bit more complicated as there're two different execution modes (micro-batch, continuous) as well as Sink interface on DSv1 should be still supported in the code path.

So the PR in current state doesn't touch the logical plan and try to address the surface, so that the necessary changes for logical plan can be discussed and handled later without changing the API introducing here (hopefully).

If we want to require the PR to include the changes for logical plan, I may need to have some more time to look into the details of logical plan and propose some approach (or it should be even better if we discussed something already).


@throws[NoSuchTableException]
@throws[TimeoutException]
def truncateAndAppend(): StreamingQuery = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the name of the "output mode", the name should be complete, but I feel it's a bit weird so I just take to name closer to the actual behavior.

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128520 has finished for PR 29715 at commit 1edd283.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

The DataFrameWriterV2 gets rid of SaveMode and provides a clear API set that is even more powerful than SQL API. It's great to have DataStreamWriterV2, but the key problem is: what's the better API for OutputMode and Trigger?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 14, 2020

Thanks for the input.

My initial goal was to enable reading catalog table in SS query, so I didn't touch the other stuff from DataStreamWriter. I borrowed the concept of representing "save mode" as the end chain of the method, but that's also OK for me if we'd like to keep the start() as DataStreamWriter does. If you have some idea in mind about the problem, please feel free to share.

I think there're some points to consider while designing:

  1. The output mode for the sink isn't exactly matched to the output mode for the result table.

We already know about the case of "update as append" (output mode for the result table is update but the sink does the append) for DSv2, but in reality, most built-in sinks are doing the append for any mode (even complete mode), just because we did for Spark 2.x. DSv1 is even more problematic, the interface is designed to only append, but there's no limitation of the output mode for DSv1 sink.

I think we won't support DSv1 in DataStreamWriterV2, but mismatch still remains in DSv2. Do we want to keep the mismatch forever, or fix it at least in DSv2? (Kafka is an one of examples - Kafka sink shouldn't allow update and complete mode. I think we did the right fix but the compatibility messed up.)

  1. The continuous mode hasn't been actively developed.

Given the current status of SS development, I don't think continuous mode would leverage the output mode in near future. (That said, output mode is not needed.) I'm not sure that will be valid in near future - if it is, we may be able to split builders for micro-batch and continuous mode and remove output mode for continuous mode.

(TBH, I'm wondering continuous mode is being used in production - the mode is introduced in Spark 2.3, and no one has been claimed to graduate continuous mode from experimental. No contributor has been caring about it. Is that something we might be able to consider retiring to reduce complexity?)

  1. More things to consider?

Without the clear answer on considerations it would be hard to construct a good API.

@cloud-fan
Copy link
Contributor

So this DataStreamWriterV2 is used to enforce output mode for v2 streaming sinks, so that there is no backward compatibility issue?

I think we should at least put something in the PR description to explain: what are the problems of DataStreamWriter? Then we can further discuss how to solve these problems in DataStreamWriterV2.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 14, 2020

So this DataStreamWriterV2 is used to enforce output mode for v2 streaming sinks, so that there is no backward compatibility issue?

No. That's a side improvement which can be dropped, not a major goal.

As I commented, fixing the problems on DataStreamWriter isn't the purpose of introducing DataStreamWriterV2. This is rather providing symmetric user experience between batch and streaming, as with DataFrameWriterV2 end users can go through running batch query with catalog table on writer side, whereas streaming query doesn't have something to enable this.
(I don't see API for reading catalog table on reader side of streaming query as well. Do I understand correct?)

The problems I described in previous comment are simply the problems on Structured Streaming - let me explain at the end of comment, as it might be going to be out of topic.

I see DataFrameWriterV2 has integrated lots of other benefits (more fluent, logical plan on write node, etc.) which should be great to have in DataStreamWriterV2, but I think they're not a key part of *WriterV2. Supporting catalog table is simply the major reason to have it.

Regarding the problems on Structured Streaming -

I kicked the incomplete state support on continuous mode out from Structured Streaming, but I basically concerns about "continuous mode" itself, as it's rather applying hacks to workaround architectural limitation. (+ No one cares about it in community.)

And as I had initiated discussion earlier (and has been commented in various PRs), I think complete mode should be kicked out as well. The mode addresses some limited cases but is treated as one of valid modes which adds much complexity - some operations which basically shouldn't be supported in streaming query are supported under complete mode, and vice versa. Because the mode doesn't fit naturally.

It's useful for now because Spark doesn't support true update mode on sink - and once Spark can support update mode on sink, content in external storage should be just equivalent to what the complete mode provides, without having to dump all of the outputs. (Or that's just because of missing feature - queryable state.) Probably we can simulate complete mode via having a special stateful operator which only works with update mode.

Specific to micro-batch, supporting DSv1 is also a major headache - lots of pattern matchings in MicroBatchExecution are to support DSv1, and even there're workarounds applied for DSv1 (e.g #29700). I remember the answer in discussion thread that DSv1 for streaming data source is not exposed to the public API which is great news, but I see no action/plan to get rid of it. Is there something DSv2 cannot cover the functionality which is possible in DSv1? If then why not prioritize to address the problem?

@cloud-fan
Copy link
Contributor

Supporting catalog table is simply the major reason to have it.

No, the major reason is to get rid of SaveMode. BTW DataFrameWriter.insertInto supports catalog table as well. We need a strong motivation to create a new API to replace the old one, to justify the overhead (users need to learn the new API). Supporting catalog table is not a strong motivation to me, we can improve the existing API to do that.

@HeartSaVioR
Copy link
Contributor Author

Oh OK. Thanks for the input. That's a good point and I agree with the point DataFrameWriter can work with catalog table hence DataStreamWriter also can. (Though I still feel the clear benefit of DataFrameWriterV2 is that it "only" needs to deal with DSv2.)

Let me try to deal with it first, and revisit if I can think of better ideas on improving UX on DataStreamWriterV2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants