@@ -394,7 +394,7 @@ private[spark] class MemoryStore(
394394 redirectableStream,
395395 unrollMemoryUsedByThisBlock,
396396 memoryMode,
397- bbos.toChunkedByteBuffer ,
397+ bbos,
398398 values,
399399 classTag))
400400 }
@@ -714,7 +714,8 @@ private class RedirectableOutputStream extends OutputStream {
714714 * @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
715715 * @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
716716 * @param memoryMode whether the unroll memory is on- or off-heap
717- * @param unrolled a byte buffer containing the partially-serialized values.
717+ * @param unrolledBbos a byte buffer containing the partially-serialized values.
718+ * [[redirectableOutputStream ]] initially points to this output stream.
718719 * @param rest the rest of the original iterator passed to
719720 * [[MemoryStore.putIteratorAsValues() ]].
720721 * @param classTag the [[ClassTag ]] for the block.
@@ -727,10 +728,12 @@ private[storage] class PartiallySerializedBlock[T](
727728 redirectableOutputStream : RedirectableOutputStream ,
728729 unrollMemory : Long ,
729730 memoryMode : MemoryMode ,
730- unrolled : ChunkedByteBuffer ,
731+ unrolledBbos : ChunkedByteBufferOutputStream ,
731732 rest : Iterator [T ],
732733 classTag : ClassTag [T ]) {
733734
735+ private lazy val unrolled : ChunkedByteBuffer = unrolledBbos.toChunkedByteBuffer
736+
734737 // If the task does not fully consume `valuesIterator` or otherwise fails to consume or dispose of
735738 // this PartiallySerializedBlock then we risk leaking of direct buffers, so we use a task
736739 // completion listener here in order to ensure that `unrolled.dispose()` is called at least once.
@@ -782,6 +785,9 @@ private[storage] class PartiallySerializedBlock[T](
782785 * `close()` on it to free its resources.
783786 */
784787 def valuesIterator : PartiallyUnrolledIterator [T ] = {
788+ // Close the serialization stream so that the serializer's internal buffers are freed and any
789+ // "end-of-stream" markers can be written out so that `unrolled` is a valid serialized stream.
790+ serializationStream.close()
785791 // `unrolled`'s underlying buffers will be freed once this input stream is fully read:
786792 val unrolledIter = serializerManager.dataDeserializeStream(
787793 blockId, unrolled.toInputStream(dispose = true ))(classTag)
0 commit comments