-
Notifications
You must be signed in to change notification settings - Fork 28.9k
Scalable Memory option for HDFSBackedStateStore #21500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@tdas this is the change I mentioned in our chat in SparkSummit. |
Allow configuration option to unload loadedMaps from memory after commit
|
I agree that current cache approach may consume excessive memory unnecessarily, and that's also same to my finding in #21469. The issue is not that simple however, because in micro-batch mode, each batch should read previous version of state, otherwise it should read from file system, in worst case seeking and reading multiple files in remote file system. So previous version of state is encouraged to be available in memory. There're three cases here (please add if I'm missing here): 1. fail before commit 2. committed but batch failed afterwards 3. committed and batch succeeds. It might be better to think about all the cases. |
fecbc23 to
ea407a6
Compare
|
@HeartSaVioR IMHO we should consider new state provider such as RocksDB, like Flink and Databricks Delta did. It is not a direct fix, but will improve latency and memory consumption, maybe additional management on Spark side won't be required |
|
@TomaszGaweda @aalobaidi From every start of batch, state store loads previous version of state so that it can be read and written. If we unload all the version "after committing" the cache will no longer contain previous version of state and it will try to load the state via reading files, adding huge latency on starting batch. That's why I stated about three cases before to avoid loading state from files when starting a new batch. Please apply #21469 manually and see how much HDFSBackedStateStoreProvider consumes memory due to storing multiple versions (it will show the state size on the latest version as well as overall state size in cache). Please also observe and provide numbers of latency to show how much they are and how much they will be after the patch. We always have to ask ourselves that we are addressing the issue correctly. |
|
Retaining versions of state is also relevant to do snapshotting the last version in files: HDFSBackedStateStoreProvider doesn't snapshot if the version doesn't exist in loadedMaps. So we may want to check whether this option also works with current approach of snapshotting. |
|
Clearing the map after each commit might make things worse, since the maps needs to be loaded from the snapshot + delta files for the next micro-batch. Setting Maybe we need to explore how to avoid maintaining multiple copies of the state in memory within HDFS state store or even explore Rocks DB for incremental checkpointing. |
|
Sorry for the late reply. The option is useful for specific use case which is micro-batches with relatively large number partitions with each of the partitions is very big in size. When this option is enabled, Spark will load the state of a partition from disk, process all events belonging to the partition and then commit the new state (delta) to disk and unloaded the entire partition state from memory. And go to the next partition(task). This way each executor will keep in memory the state of the partitions running concurrently as opposite to keeping all the state of all partitions executed. You can control the balance between memory usage and IOs by setting I did JVM profiling and benchmarks with 5M events micro-batchs of total state of ~600M key 6 nodes EMR cluster. The memory usage was much better (in fact the default behavior failed with less than 200M key) and performance wasn't affected significantly. (I will have to compile more specific numbers). @HeartSaVioR brings a good point regarding state compaction (snapshots). I can’t confirm if compactions was working or not during the test, I will have to get back to you guys about this. |
|
@aalobaidi |
|
@aalobaidi |
|
I can confirm that snapshots are still being built normally with no issue. @HeartSaVioR not sure why executor must load at least 1 version of state in memory. Could you elaborate? |
|
@aalobaidi #21506 logs messages when loading state requires dealing with (remote) filesystem. That's why I suggest to merge my patch and run your case again. |
|
After enabling option, I've observed small expected latency whenever starting batch per each partition per each batch. Median/average was 4~50 ms for my case, but max latency was a bit higher than 700 ms. Please note that state size in my experiment is not that super huge, so if partition has much bigger state the latency could be much higher: As I explained earlier, loading the last version from files brings avoidable latency. |
Also, I agree that a better solution should be developed, maybe using RocksDB. But for the time being and for this store implementation, this will enable one extra use case with very little code to maintain. |
|
@aalobaidi |
|
Can one of the admins verify this patch? |
|
I guess #21700 handles this case now, so this can be closed. |
More scalable memory management for HDFSBackedStateStore. This is controlled by a configuration (
spark.sql.streaming.stateStore.unloadAfterCommit), if enabled HDFSBackedStateStore will unload state after commit.What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
This is been tested manually but need unit tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.