-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27237][SS] Introduce State schema validation among query restart #24173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
For now, the verification is done per partition. That sounds really weird because schema should be same across partitions, but if we want to do it only once for all partitions, we should modify code (maybe some interfaces too, and even maybe DSv2?) to let stateful operators report the states with its key/value schema. So this approach is less optimized and less intuitive, but also pretty less intrusive, doesn't touch the current contract and will work well with custom state store providers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we want to overwrite schema if only field name is changed. Even we don't leverage field name to check compatibility, storing and showing the name would give more meaningful message to end users.
My 2 cents, we might also want to log (with proper level) when only field name is changed - there's a chance end users intend to change field's name, but there's also some chance for state to be semantically broken when fields with same data type are swapped, etc. But this is pretty optional and up to our preference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now these things are done. One possible but up to preference todo is warn users if field name change is detected, which might be possible to break query. Why it's up to preference is end users can intentionally change the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this file will be created and maintained per partition which will avoid concurrent read/write issue, but may not sound good since it's redundant.
|
Test build #103793 has finished for PR 24173 at commit
|
|
Test build #103796 has finished for PR 24173 at commit
|
|
I think it's good to add such check in general. Without too deep code line parsing I would ask:
|
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Maybe schema doesn't match is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to have log message which is self-describe. If we would like to reduce a bit, how about New schema for state doesn't match to current schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Maybe schema doesn't match is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the nullable would be interesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know about some kinds of aggregation functions which output of schema would be nullable/non-nullable? Actually if we would like to be strict about this, we may need to allow changing non-nullable to nullable, but disallow opposite way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know about some kinds of aggregation functions which output of schema would be nullable/non-nullable?
No, but if something is physically possible sooner or later it will come.
I think your suggestion makes sense.
Do you have what exactly has to be modified? Storing things in a redundant way is rarely a good choice (not considering heavy constraints). Number of partitions may change over time. |
The schemas are exactly same as how states are stored, so there's no optional thing. If you meant nullable the code includes it in comparison.
I'm not sure what you meant. If you meant toggling this option, I guess that's not necessary. Preventing query to be run would be always better than exposing possibility to get indeterministic behaviors.
The schema of state is determined in stateful operator, or even in state implementation (please refer implementation of state in symmetric join) which we don't have any contract/interface for that. Moreover, if we want to deal with state schema at only one place, that should be done in driver side, whereas states are being initialized in executor side which would require additional work/trick to get the information from driver side.
For state, Spark disallows changing the number of partitions - that's why Spark retains |
Yeah, some config which is on by default.
Don't let the query start by default is good in such cases, but not having a plan B for cases what we've maybe missed is different. After the deeper look I see the nullable stuff...
Now I see that part and it's really not easy. I don't have suggestion out of the box, let me think a bit... |
Got it. So the bug in checking schema compatibility would be a show-stopper and we want to have plan B. Makes sense. Will address. |
1b56660 to
c34e763
Compare
|
Seems like there is an issue with jenkins, written a mail to dev list... |
|
Test build #103825 has finished for PR 24173 at commit
|
|
Test build #103832 has finished for PR 24173 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: There was a discussion about these helper vars and the agreement was create them only when multiple places used. All other cases inline it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For StateStoreConf, it's not same if it is available or not, in case of reading default value. If we remove it and access via key, we should deal with default value manually (because confs in StateStoreConf doesn't provide default value if it doesn't exist.)
|
Related driver side schema check don't have a solution which is clean enough to put it into the code. Maybe someone has a good idea. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my own understanding what will happen when column dropped in schemaNew?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undefined behavior, as state would be stored in file as unsafe byte array, and we just rely on new schema to parse it. It might be fine while reading if all column(s) is(are) dropped from rightmost, but the some of information in row (like numFields) would be incorrect so not sure which operation refers it and finally make query crash. If column(s) is(are) dropped from other spots, query would be crashed sooner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining.
|
Looks good, except I would ping someone and ask for opinion in the |
|
Test build #103963 has finished for PR 24173 at commit
|
|
Kindly asking for reviewing from committers. |
|
Ping again, as Spark+AI Summit 2019 in SF is end. |
|
Test build #107118 has finished for PR 24173 at commit
|
|
I happen to revisit this, and succeed to change the approach to check schema (and write schema file) only once per each stateful operator. The new approach is centralizing request to driver side, via RPC. Both executor and driver would cache the providerId with partition id erased, so requests would be minimized. @gaborgsomogyi Could you review the last change? I guess you've also lost context so need time to rebuild. Thanks for the support! |
| assert(loadedMaps.size() === 0) | ||
| } | ||
|
|
||
| testQuietly("changing schema of state when restarting query") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is migrated to StateSchemaCompatibilityCheckerSuite
| ExpectFailure[SparkException] { e => | ||
| val cause = e.getCause | ||
| // it would bring other error in runtime, but it shouldn't check schema in any way | ||
| assert(!cause.isInstanceOf[StateSchemaNotCompatible]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that it anyway throws exception even disabling schema check, and the exception would be non-friendly one. That's what this patch brings the value.
|
Test build #132159 has finished for PR 24173 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #132162 has finished for PR 24173 at commit
|
|
Tests are failing due to the last change. It affects the order of effect between this and SPARK-31894. Probably I have to turn off SPARK-31894 for these tests or change the order of effect. Will fix today. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #132177 has finished for PR 24173 at commit
|
c98df13 to
f9a3b5b
Compare
|
Sorry I had to rebase to find the odd test failure. It's fixed. I'll squash commits I added after the latest review comment, so that you can look into the single commit for the overall change. |
f9a3b5b to
2901429
Compare
|
2901429 is the commit containing reflect of review comments. Please take a look again. Thanks in advance. |
|
Kubernetes integration test starting |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Also looks like all standing review comments were addressed at 2901429 at this moment.
Please allow me to a little bit rush to merge this one because the branch cut will be very soon in an hour, and I do think this is a good feature to add in Spark 3.1.
Other followup changes can hopefully done in a followup.
|
Agree, my LGTM still stands. We don't want to miss this in 3.1. |
|
Kubernetes integration test status success |
|
Test build #132180 has finished for PR 24173 at commit
|
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @HyukjinKwon. Seems the equalsIgnoreCompatibleNullability comment was addressed. It is rush to look again for now but I'm fine as we can have followup if we need.
|
Test build #132199 has finished for PR 24173 at commit
|
|
The test failure is known as a flaky test. Let me merge this in now so I can cut the branch. Thank you all. Merged to master. |
|
Test build #132197 has finished for PR 24173 at commit
|
|
Thanks all for reviewing and merging! As it doesn't go through the recent reviews, I'll be open to post review and deal with further comments as follow-up. The content of two commits are actually identical (just squashed last several commits) so we could consider test is passing. |
| ) | ||
| } | ||
|
|
||
| testQuietlyWithAllStateVersions("changing schema of state when restarting query", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR FYI, the test is flaky. It fails sometimes in my PRs:
[info] - changing schema of state when restarting query - state format version 1 *** FAILED *** (915 milliseconds)
[info] Error while checking stream failure: stateSchemaExc.isDefined was false
[info] org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test still fails periodically:
[info] - changing schema of state when restarting query - state format version 1 *** FAILED *** (1 second, 171 milliseconds)
[info] Error while checking stream failure: stateSchemaExc.isDefined was false
[info] org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info] org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info] org.apache.spark.sql.streaming.StreamingAggregationSuite.$anonfun$new$82(StreamingAggregationSuite.scala:781)
[info] org.apache.spark.sql.streaming.StreamingAggregationSuite.$anonfun$new$82$adapted(StreamingAggregationSuite.scala:779)
[info] org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$33(StreamTest.scala:642)
[info] scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info] org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info] org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
[info]
[info]
[info] == Progress ==
[info] StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@50c50d34,Map(),/home/runner/work/spark/spark/target/tmp/spark-1ee805db-b75b-4469-a5e9-dbc39d212620)
[info] AddData to MemoryStream[value#33761]: 21
[info] => ExpectFailure[org.apache.spark.SparkException, isFatalError: false]
[info]
[info] == Stream ==
[info] Output Mode: Update
[info] Stream state: {MemoryStream[value#33761]: 0}
[info] Thread state: dead
How about to disable it, and create a blocker JIRA for the next release to enable it back? @HyukjinKwon WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to jump in lately. How often does the test failure occur? If that is happening one of 100 times then we have lots of tests with similar frequency of failure. If that is happening like one of 10 times, let's disable the test and file a blocker JIRA ticket.
What changes were proposed in this pull request?
Please refer the description of SPARK-27237 to see rationalization of this patch.
This patch proposes to introduce state schema validation, via storing key schema and value schema to
schemafile (for the first time) and verify new key schema and value schema for state are compatible with existing one. To be clear for definition of "compatible", state schema is "compatible" when number of fields are same and data type for each field is same - Spark has been allowing rename of field.This patch will prevent query run which has incompatible state schema, which would reduce the chance to get indeterministic behavior (actually renaming of field is also the smell of semantically incompatible, but end users could just modify its name so we can't say) as well as providing more informative error message.
How was this patch tested?
Added UTs.