-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-22068][CORE]Reduce the duplicate code between putIteratorAsValues and putIteratorAsBytes #19285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Hi @cloud-fan @jiangxb1987 , would you mind take a look ? Thanks a lot. |
|
ok to test. |
|
Test build #81961 has finished for PR 19285 at commit
|
|
Test build #81971 has finished for PR 19285 at commit
|
|
retest this please |
|
Test build #81981 has finished for PR 19285 at commit
|
| new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) | ||
| val size = entry.size | ||
| // get the precise size | ||
| val size = estimateSize(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need estimateSize(true)? Is this just creating the entry and getting entry.size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just unrolled the iterator successfully until here. But the size of underlying vector maybe greater than the unrollMemoryUsedByThisBlock which we requested memory for unroll the block. So we need check it again and determine whether we need request more memory. And we only should call bbos.toChunkedByteBuffer or vector.toArray after we requested enough memory.
Here, because the underlying storage is different. For putIteratorAsValues, it use SizeTrackingVector, while putIteratorAsBytes use ChunkedByteBufferOutputStream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the previous code just calls entry.size, are you fixing a new bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, the putIteratorAsValues seems no problem. But the putIteratorAsBytes doesn't check again after unrolled the iterator. Now the putIterator is copied form previous putIteratorAsValues . For SizeTrackingVector, we could call arrayValues.toIterator to get a iterator again after call SizeTrackingVector.toArray. But for ChunkedByteBufferOutputStream, we can't back to stream after called ChunkedByteBufferOutputStream.toChunkedByteBuffer (the PartiallySerializedBlock need a stream).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems deserialized values do not have a precise size, even for SizeEstimator.estimate(arrayValues). This would be confused.
| } | ||
| // Acquire storage memory if necessary to store this block in memory. | ||
| val enoughStorageMemory = { | ||
| if (unrollMemoryUsedByThisBlock <= size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the size of underlying vector or bytebuffer maybe greater than the unrollMemoryUsedByThisBlock .
| reserveAdditionalMemoryIfNecessary() | ||
| def estimateSize(precise: Boolean): Long = { | ||
| if (precise) { | ||
| serializationStream.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see anywhere in the previous code call flush.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there are some data cached in the serializationStream, we can't get the precise size if don't call flush. Previous we don't check again after unrolled the block, and it directly call the serializationStream.close(). But here we maybe need the serializationStream again if we can't get anther unroll memory, so we only should call flush.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you send a PR to fix this issue for putIteratorAsBytes first? It will make this PR easier to review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I'll do it tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Sorry for the previous saying, I read the code again. Here seems call serializationStream .close is also OK. Because the the iterator is has not value need write, that's meaning the serializationStream don't need anymore.
|
Test build #82084 has started for PR 19285 at commit |
|
@ConeyLiu Could you rebase this with the latest master so we can continue review it? Thanks! |
|
It's updated. Thanks a lot. |
|
Test build #83527 has finished for PR 19285 at commit
|
| * original input iterator. The caller must either fully consume this iterator or call | ||
| * `close()` on it in order to free the storage memory consumed by the partially-unrolled | ||
| * block. | ||
| * @param memoryMode The values saved mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: also add param description for blockId、 values and classTag.
| // We only call need the precise size after all values unrolled. | ||
| arrayValues = vector.toArray | ||
| preciseSize = SizeEstimator.estimate(arrayValues) | ||
| vector = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks scary to put vector to null in the function estimateSize.
| def createMemoryEntry(): MemoryEntry[T] = { | ||
| // We successfully unrolled the entirety of this block | ||
| assert(arrayValues != null, "arrayValue shouldn't be null!") | ||
| assert(preciseSize != -1, "preciseSize shouldn't be -1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under which condition would preciseSize be -1?
| // We successfully unrolled the entirety of this block | ||
| assert(arrayValues != null, "arrayValue shouldn't be null!") | ||
| assert(preciseSize != -1, "preciseSize shouldn't be -1") | ||
| val entry = new DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to create the val entry?
|
Test build #83575 has finished for PR 19285 at commit
|
| memoryMode: MemoryMode, | ||
| storeValue: T => Unit, | ||
| estimateSize: Boolean => Long, | ||
| createMemoryEntry: () => MemoryEntry[T]): Either[Long, Long] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of passing 3 functions, I'd like to introduce
class ValuesHolder {
def storeValue(value)
def esitimatedSize()
def buildEntry(): MemoryEntry
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trait?
| * OOM exceptions, this method will gradually unroll the iterator while periodically checking | ||
| * whether there is enough free memory. If the block is successfully materialized, then the | ||
| * temporary unroll memory used during the materialization is "transferred" to storage memory, | ||
| * so we won't acquire more memory than is actually needed to store the block. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not duplicated this document
|
overall looks good |
|
Are we targeting this to 2.3 or 2.4? |
|
It's just a refactor so I'd like to target it for 2.4 |
|
Thanks for your valuable suggestion, the code has been updated. |
|
Test build #86557 has finished for PR 19285 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be a local variable.
c988762 to
f392217
Compare
|
Test build #86565 has finished for PR 19285 at commit
|
| val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, | ||
| memoryMode, serializerManager) | ||
|
|
||
| if (keepUnrolling) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to use this code structure?
if (keepUnrolling) {
// get precise size and reserve extra memory if needed
}
if (keepUnrolling) {
// create the entry
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand what you mean, could you explain it more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
putIteratorAsValues and putIteratorAsBytes have different code structure for the last step. In the new putIterator method, you followed the code structure of putIteratorAsValues, is it better to follow the one from putIteratorAsBytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed explanation. I have been updated, the code looks more clearly now.
|
|
||
| private trait ValuesHolder[T] { | ||
| def storeValue(value: T): Unit | ||
| def estimatedSize(roughly: Boolean): Long |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not a good API design, we can do
trait ValuesHolder {
def putValue(value: T)
def estimatedSize: Long
def getBuilder(): ValuesBuilder
}
trait ValuesBuilder {
def preciseSize: Long
def build(): MemoryEntry
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an example
class DeserializedValuesHolder extends ValuesHolder {
...
def getBuilder = new ValuesBuilder {
val valuesArray = vector.toArray
def preciseSize = SizeEstimator.estimate(valuesArray)
def buid = ...
}
}
class SerializedValuesHolder extends ValuesHolder {
...
def getBuilder = new ValuesBuilder {
serializationStream.close()
def preciseSize = bbos.size
def build = ...
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very thanks, I'll update it tomorrow.
| } | ||
| } | ||
|
|
||
| if (keepUnrolling) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a little improvement
if (keepUnrolling) {
val builder = valuesHolder.getBuilder()
...
if (keepUnrolling) {
val entry = builder.build()
...
Right(entry.size)
} else {
...
logUnrollFailureMessage(blockId, builder.preciseSize)
Left(unrollMemoryUsedByThisBlock)
}
} else {
...
logUnrollFailureMessage(blockId, valueHolder.estimatedSize)
Left(unrollMemoryUsedByThisBlock)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| // We successfully unrolled the entirety of this block | ||
| serializationStream.close() | ||
|
|
||
| override val preciseSize: Long = bbos.size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be a def?
| private trait ValuesHolder[T] { | ||
| def storeValue(value: T): Unit | ||
| def estimatedSize(): Long | ||
| def getBuilder(): ValuesBuilder[T] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment to say that, after getBuilder is called, this ValuesHolder becomes invalid.
|
Test build #86620 has finished for PR 19285 at commit
|
|
Test build #86619 has finished for PR 19285 at commit
|
|
Test build #86629 has finished for PR 19285 at commit
|
|
Test build #86630 has finished for PR 19285 at commit
|
|
retest this please |
| } | ||
|
|
||
| private trait ValuesBuilder[T] { | ||
| def preciseSize: Long |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey guys, why not name the trait as MemoryEntryBuilder? As I see from the code, it is used to build the MemoryEntry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
|
Test build #86634 has finished for PR 19285 at commit
|
|
Test build #86674 has finished for PR 19285 at commit
|
|
thanks, merging to master! |
|
thanks all. |
What changes were proposed in this pull request?
The code logic between
MemoryStore.putIteratorAsValuesandMemory.putIteratorAsBytesare almost same, so we should reduce the duplicate code between them.How was this patch tested?
Existing UT.