Skip to content

Commit 21ea881

Browse files
committed
Coverting blockSize to Int during preparation of push request
1 parent 6aae02a commit 21ea881

File tree

1 file changed

+10
-9
lines changed

1 file changed

+10
-9
lines changed

core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
241241
*/
242242
private def sliceReqBufferIntoBlockBuffers(
243243
reqBuffer: ManagedBuffer,
244-
blockSizes: Seq[Long]): Array[ManagedBuffer] = {
244+
blockSizes: Seq[Int]): Array[ManagedBuffer] = {
245245
if (blockSizes.size == 1) {
246246
Array(reqBuffer)
247247
} else {
@@ -256,7 +256,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
256256
case (offset, size) =>
257257
new NioManagedBuffer(inMemoryBuffer.duplicate()
258258
.position(offset)
259-
.limit(offset + size.toInt).asInstanceOf[ByteBuffer].slice())
259+
.limit(offset + size).asInstanceOf[ByteBuffer].slice())
260260
}.toArray
261261
}
262262
}
@@ -333,12 +333,12 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
333333
mergerLocs: Seq[BlockManagerId],
334334
transportConf: TransportConf): Seq[PushRequest] = {
335335
var offset = 0L
336-
var currentReqSize = 0L
336+
var currentReqSize = 0
337337
var currentReqOffset = 0L
338338
var currentMergerId = 0
339339
val numMergers = mergerLocs.length
340340
val requests = new ArrayBuffer[PushRequest]
341-
var blocks = new ArrayBuffer[(BlockId, Long)]
341+
var blocks = new ArrayBuffer[(BlockId, Int)]
342342
for (reduceId <- 0 until numPartitions) {
343343
val blockSize = partitionLengths(reduceId)
344344
logDebug(
@@ -357,13 +357,13 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
357357
&& blocks.size < maxBlocksInFlightPerAddress
358358
&& mergerId == currentMergerId && blockSize <= maxBlockSizeToPush) {
359359
// Add current block to current batch
360-
currentReqSize += blockSize
360+
currentReqSize += blockSize.toInt
361361
} else {
362362
if (blocks.nonEmpty) {
363363
// Convert the previous batch into a PushRequest
364364
requests += PushRequest(mergerLocs(currentMergerId), blocks.toSeq,
365365
createRequestBuffer(transportConf, dataFile, currentReqOffset, currentReqSize))
366-
blocks = new ArrayBuffer[(BlockId, Long)]
366+
blocks = new ArrayBuffer[(BlockId, Int)]
367367
}
368368
// Start a new batch
369369
currentReqSize = 0
@@ -374,13 +374,14 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
374374
}
375375
// Only push blocks under the size limit
376376
if (blockSize <= maxBlockSizeToPush) {
377-
blocks += ((ShufflePushBlockId(shuffleId, partitionId, reduceId), blockSize))
377+
val blockSizeInt = blockSize.toInt
378+
blocks += ((ShufflePushBlockId(shuffleId, partitionId, reduceId), blockSizeInt))
378379
// Only update currentReqOffset if the current block is the first in the request
379380
if (currentReqOffset == -1) {
380381
currentReqOffset = offset
381382
}
382383
if (currentReqSize == 0) {
383-
currentReqSize += blockSize
384+
currentReqSize += blockSizeInt
384385
}
385386
}
386387
}
@@ -415,7 +416,7 @@ private[spark] object ShuffleBlockPusher {
415416
*/
416417
private[spark] case class PushRequest(
417418
address: BlockManagerId,
418-
blocks: Seq[(BlockId, Long)],
419+
blocks: Seq[(BlockId, Int)],
419420
reqBuffer: ManagedBuffer) {
420421
val size = blocks.map(_._2).sum
421422
}

0 commit comments

Comments
 (0)