-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-29111][CORE] Support snapshot/restore on KVStore #25811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| @Override | ||
| public int hashCode() { | ||
| return key.hashCode(); | ||
| return Arrays.hashCode(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes possible existing bug - without fixing this, comparison of both Sets/Maps which contain ArrayKeyIndexType as key would fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just created a separate PR for this: #26709
| } | ||
|
|
||
| @Test | ||
| public void testMultipleTypesWriteReadDelete() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was only available for LevelDBSuite so I copied here to test new API addition.
|
Test build #110727 has finished for PR 25811 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both InMemoryStoreSuite and LevelDBSuite are heavily duplicated, but LevelDBSuite also has a logic which checks with LevelDB, so I leave them as they are. Please let me know if it would be worth to deduplicate them.
|
|
||
| import java.util.List; | ||
|
|
||
| public class IntKeyType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved out of LevelDBSuite to co-use between test suites.
| import static org.junit.Assert.*; | ||
|
|
||
| public abstract class DBIteratorSuite { | ||
| public abstract class KVStoreIteratorSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just renamed this as it made me confused - I imagined DB as LevelDB but there's separate suite for LevelDB. KVStore sounds better to me.
|
Test build #110728 has finished for PR 25811 at commit
|
|
Test build #110731 has finished for PR 25811 at commit
|
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.NoSuchElementException; | ||
| import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Can we add one line import java.util.Set instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDE automatically did it. Just unrolled. Thanks!
|
Test build #110735 has finished for PR 25811 at commit
|
|
Test build #110738 has finished for PR 25811 at commit
|
|
retest this, please |
|
Test build #110759 has finished for PR 25811 at commit
|
|
retest this, please |
|
Test build #110788 has started for PR 25811 at commit |
|
test this please |
|
Test build #110802 has finished for PR 25811 at commit
|
|
retest this, please |
|
Test build #110869 has finished for PR 25811 at commit
|
|
retest this, please |
|
Test build #110875 has finished for PR 25811 at commit
|
|
Known flaky test: SPARK-23197. Not relevant to this patch. |
|
retest this, please |
|
Test build #110912 has finished for PR 25811 at commit
|
|
Let me cc. to same for #25670 - @felixcheung (as shepherd of SPARK-28594) @vanzin @squito @gengliangwang @dongjoon-hyun also cc. to @Ngone51 as might be interested on this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, looks good!
| this.serializer = serializer; | ||
| } | ||
|
|
||
| public void dump(KVStore store, File snapshotFile) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to let you know that SPARK-28867 may also want to dump KVStore to HDFS-support filesystem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! Thanks for pointing out.
Maybe we need to have InputStream/OutputStream (or more specific types) as a parameter instead of File so that it can be worked with any filesystem. kvstore module doesn't have Hadoop dependency so it would be ideal to avoid depending on Hadoop directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, InputStream/OutputStream could work around this case. But I'm afraid it brings troublesome usage for the caller, as they needs to prepare their own streams(local or HDFS, in or out). And I believe, currently, KVStore is only designed to persist in filesystems and underlying streams should always be file streams. So, exposing InputStream/OutputStream seems an overkill here.
I'd prefer to use HDFS API to support dump to both local and HDFS-supported filesystem and caller could only pass in a Path to indicate where they want to dump (just as previous File does). And since we're introducing a dump feature here, so I think it would be OK to depend on Hadoop directly now. If we don't depend on Hadoop here, we'll still depend it elsewhere.
WDYT ? @HeartSaVioR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, InputStream/OutputStream could work around this case. But I'm afraid it brings troublesome usage for the caller, as they needs to prepare their own streams(local or HDFS, in or out). And I believe, currently, KVStore is only designed to persist in filesystems and underlying streams should always be file streams. So, exposing InputStream/OutputStream seems an overkill here.
Sorry I totally disagree this is an workaround - especially the parameter here depending on is pure Java API, which any Java developers must know and familiar with how to deal with.
spark/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
Lines 52 to 59 in c7c6b64
| def replay( | |
| logData: InputStream, | |
| sourceName: String, | |
| maybeTruncated: Boolean = false, | |
| eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { | |
| val lines = Source.fromInputStream(logData)(Codec.UTF8).getLines() | |
| replay(lines, sourceName, maybeTruncated, eventsFilter) | |
| } |
Just look at ReplayListenerBus. What exactly it requires caller to prepare? InputStream. Does it use something other than file except some case of UT? Well, no. That's consideration on API level to open extension.
I'll leave the decision to the committers.
Even if we turn out to agree directly depend on Hadoop, I would just move this to core module and rewrite this as Scala. Writing this to Java was the intentional effort to put this along with other classes in common/kvstore, and I don't think modules in common depend on hadoop - except network-yarn which even define Hadoop dependency as provided. common modules are trying to avoid depending on Hadoop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine. If we want to follow ReplayListenerBus ' s way, I think it would be better if we could provides read/write helper method later, which just return inputStream/outputStream, similar as EventLoggingListener.openEventLog() do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to follow ReplayListenerBus ' s way, I think it would be better if we could provides read/write helper method later, which just return inputStream/outputStream, similar as EventLoggingListener.openEventLog() do.
That's a good idea. As the PR intends to touch only common-kvstore module, we would be better to deal with this in next PR (FOLLOWUP or include in next work).
| for (Class<?> clazz : types) { | ||
| writeClassName(clazz, output); | ||
|
|
||
| KVStoreView<?> view = store.view(clazz); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering will there be a type with empty objects in the KVStore. Normally, it seems impossible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure any implementations allow type with empty objects.
I see what you say - if there's some implementation allow the case like types() returning Class but view(A.class) contains nothing, we don't provide the way to only add type to KVStore. We may want to be clear in the interface javadoc that "type with empty objects are ignored while recovering, so implementations should not rely on this", as thinking theoretically, but I'm afraid I might be over-thinking.
|
Test build #111102 has finished for PR 25811 at commit
|
|
I'll add a version of Spark on top of snapshot file soon to give a hint to determine whether the snapshot file is compatible with the reader side. |
…yIndexType correctly ### What changes were proposed in this pull request? This patch fixes the bug on ArrayKeyIndexType.hashCode() as it is simply calling Array.hashCode() which in turn calls Object.hashCode(). That should be Arrays.hashCode() to reflect the elements in the array. ### Why are the changes needed? I've encountered the bug in #25811 while adding test codes for #25811, and I've split the fix into individual PR to speed up reviewing. Without this patch, ArrayKeyIndexType would bring various issues when it's used as type of collections. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? I've skipped adding UT as ArrayKeyIndexType is in test and the patch is pretty simple one-liner. Closes #26709 from HeartSaVioR/SPARK-30075. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Sean Owen <[email protected]>
…yIndexType correctly ### What changes were proposed in this pull request? This patch fixes the bug on ArrayKeyIndexType.hashCode() as it is simply calling Array.hashCode() which in turn calls Object.hashCode(). That should be Arrays.hashCode() to reflect the elements in the array. ### Why are the changes needed? I've encountered the bug in apache#25811 while adding test codes for apache#25811, and I've split the fix into individual PR to speed up reviewing. Without this patch, ArrayKeyIndexType would bring various issues when it's used as type of collections. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? I've skipped adding UT as ArrayKeyIndexType is in test and the patch is pretty simple one-liner. Closes apache#26709 from HeartSaVioR/SPARK-30075. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Sean Owen <[email protected]>
b6e65f8 to
922707e
Compare
|
Test build #114987 has finished for PR 25811 at commit
|
|
Test build #114991 has finished for PR 25811 at commit
|
|
Test build #116277 has finished for PR 25811 at commit
|
|
Given we're going with the different approach you proposed, should this PR be closed? |
|
Thanks for the reminder! I think this PR is still useful to implement incremental replaying on SHS side, but I can close this for now as I may not work on incremental replaying right now. If some other contributors want to implement incremental replaying (actually I've seen couple of PRs for this, though all of them are restricted to in-memory KVStore) this PR can be used again. It would be just simple to reopen. |
What changes were proposed in this pull request?
This patch proposes adding new feature to KVStore, snapshot (dump) & restore. The structure of snapshot file is also described to the design doc, but it's alike the structure of delta/snapshot file in HDFSBackedStateStoreProvider.
Here's the format of snapshot file. The file is in binary format, and follows the way how DataOutputStream writes the types of “int” and “byte[]”, hence it follows “big-endian” when writing “int” type of value.
The file will start with metadata information, as KVStore has at most one metadata object being stored.
If there's no metadata stored in KVStore, the length of metadata class name would be -2, which denotes following fields will not be presented.
And the file will contain normal objects for each type which format is below:
Above format will be repeated for each type, and when no object is left for dumping, we simply put -1 in the place where the length of class name is expected to mark the end of file.
Why are the changes needed?
The new feature will be used as a building block for SPARK-28870. The patch is intended to be separate with the issue, as I would like to make each PR smaller.
Does this PR introduce any user-facing change?
No, as KVStore interface is defined as
@Private.How was this patch tested?
Added new UTs.