-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-29576][Core] Use Spark's CompressionCodec for Ser/Deser of MapOutputStatus #26235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Looks good to me. I'm regenerating the result. |
|
The initial result looks better than before. It's faster (2x+) and there is a size reduction, too. |
|
looks good in general, just few questions. |
|
Hi, @dbtsai . Please review and merge the result. The result is good! |
EC2 Result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
|
Thank you all for reviewing. Thanks @dongjoon-hyun for running the benchmark. |
|
Test build #112563 has finished for PR 26235 at commit
|
|
Test build #112564 has finished for PR 26235 at commit
|
|
Test build #112566 has finished for PR 26235 at commit
|
|
|
||
| val objOut = new ObjectOutputStream(out) | ||
| out.write(DIRECT) | ||
| val codec = CompressionCodec.createCodec(conf, "zstd") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the other compressions have conf. Could we do it for this too? See the examples:
spark/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
Lines 67 to 73 in 1b575ef
| private[this] val compressBroadcast = conf.get(config.BROADCAST_COMPRESS) | |
| // Whether to compress shuffle output that are stored | |
| private[this] val compressShuffle = conf.get(config.SHUFFLE_COMPRESS) | |
| // Whether to compress RDD partitions that are stored serialized | |
| private[this] val compressRdds = conf.get(config.RDD_COMPRESS) | |
| // Whether to compress shuffle output temporarily spilled to disk | |
| private[this] val compressShuffleSpill = conf.get(config.SHUFFLE_SPILL_COMPRESS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I brought the up here in the other pr, please see discussion there: #26085
If you think its needed now then we should file a jira for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a JIRA https://issues.apache.org/jira/browse/SPARK-29939 @Ngone51 Could you submit a PR to fix it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure:)@gatorsmile
…OutputStatus Instead of using ZStd codec directly, we use Spark's CompressionCodec which wraps ZStd codec in a buffered stream to avoid overhead excessive of JNI call while trying to compress/decompress small amount of data. Also, by using Spark's CompressionCodec, we can easily to make it configurable in the future if it's needed. Faster performance. No. Existing tests. Closes apache#26235 from dbtsai/optimizeDeser. Lead-authored-by: DB Tsai <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Ref: LIHADOOP-56788
What changes were proposed in this pull request?
Instead of using ZStd codec directly, we use Spark's CompressionCodec which wraps ZStd codec in a buffered stream to avoid overhead excessive of JNI call while trying to compress/decompress small amount of data.
Also, by using Spark's CompressionCodec, we can easily to make it configurable in the future if it's needed.
Why are the changes needed?
Faster performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.