-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily #20292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without even commenting on the logic, a lot about this change is wrong. I'd review it carefully first.
| // An intermediate stream that reads from exactly one batch | ||
| // This guards against pre-fetching and other arbitrary behavior of higher level streams | ||
| private var deserializeStream = nextBatchStream() | ||
| private var deserializeStream = null.asInstanceOf[Option[DeserializationStream]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better: ... deserializeStream: Option[DeserializationStream] = None. It's not just the cast, but the fact that you're assigning null to an Option
| // we're still in a valid batch. | ||
| if (batchIndex < batchOffsets.length - 1) { | ||
| if (batchIndex < batchOffsets.length - 1 && deserializeStream.isDefined) { | ||
| if (deserializeStream != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check doesn't make sense then. You probably want a deserializeStream.foreach { ... } construct anyway
| if (ds != null) { | ||
| ds.close() | ||
| deserializeStream = null | ||
| if (ds != null && ds.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is wrong
| try { | ||
| val k = deserializeStream.readKey().asInstanceOf[K] | ||
| val c = deserializeStream.readValue().asInstanceOf[C] | ||
| val k = deserializeStream.get.readKey().asInstanceOf[K] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If developer call next() without checking hasNext then this will trigger clean up, which is not expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiangxb1987 I think this does not change the original semantic,since it is only call cleanup when EOFException was thrown.Maybe i missed something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here may throw NoSuchElementException.I will fix this.Thanks @jiangxb1987
|
Ping @jiangxb1987 could you help review this?thanks too much! |
| // An intermediate stream that reads from exactly one batch | ||
| // This guards against pre-fetching and other arbitrary behavior of higher level streams | ||
| private var deserializeStream = nextBatchStream() | ||
| private var deserializeStream: Option[DeserializationStream] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps private var deserializeStream: Option[DeserializationStream] = nextBatchStream() ? Then we can avoid many other code changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jiangxb1987 But this may make deserializeStream still init when spilling and generate DiskMapIterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we just use null as initial value? For performance critical places we don't need follow Scala style and use None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also make this patch much smaller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok,i will fix later @jiangxb1987 @cloud-fan Thanks for your precious time
| try { | ||
| val k = deserializeStream.readKey().asInstanceOf[K] | ||
| val c = deserializeStream.readValue().asInstanceOf[C] | ||
| val k = deserializeStream.get.readKey().asInstanceOf[K] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should still check deserializeStream is not emtpy here to be safe
|
|
||
| private def cleanup() { | ||
| batchIndex = batchOffsets.length // Prevent reading any other batch | ||
| val ds = deserializeStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new val is not actually needed.
|
ok to test. |
| nextItem != null | ||
| } | ||
|
|
||
| override def next(): (K, C) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
if (!hasNext) {
throw new NoSuchElementException
}
val item = nextItem
nextItem = null
item
|
LGTM |
|
Test build #86582 has finished for PR 20292 at commit
|
|
Test build #86618 has finished for PR 20292 at commit
|
|
LGTM. |
|
LGTM |
…zily ## What changes were proposed in this pull request? Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times. We can avoid this by making deserializeStream init when it is used the first time. This patch make deserializeStream init lazily. ## How was this patch tested? Exist tests Author: zhoukang <[email protected]> Closes #20292 from caneGuy/zhoukang/lay-diskmapiterator. (cherry picked from commit 45b4bbf) Signed-off-by: Wenchen Fan <[email protected]>
|
thanks, merging to master/2.3! |
|
Thanks for your time! @cloud-fan @jerryshao @jiangxb1987 |
What changes were proposed in this pull request?
Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times.
We can avoid this by making deserializeStream init when it is used the first time.
This patch make deserializeStream init lazily.
How was this patch tested?
Exist tests