Skip to content

Commit bd1a813

Browse files
committed
Review Comments: Part 7
Minor changes to variable names and comments
1 parent e7d9894 commit bd1a813

File tree

4 files changed

+17
-14
lines changed

4 files changed

+17
-14
lines changed

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,9 @@ final class ShuffleBlockFetcherIterator(
584584
}
585585

586586
private[storage] def throwFetchFailedException(
587-
blockId: BlockId, address: BlockManagerId, e: Throwable) = {
587+
blockId: BlockId,
588+
address: BlockManagerId,
589+
e: Throwable) = {
588590
blockId match {
589591
case ShuffleBlockId(shufId, mapId, reduceId) =>
590592
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
@@ -605,15 +607,15 @@ private class BufferReleasingInputStream(
605607
private val iterator: ShuffleBlockFetcherIterator,
606608
private val blockId: BlockId,
607609
private val address: BlockManagerId,
608-
private val streamCompressedOrEncrypted: Boolean)
610+
private val detectCorruption: Boolean)
609611
extends InputStream {
610612
private[this] var closed = false
611613

612614
override def read(): Int = {
613615
try {
614616
delegate.read()
615617
} catch {
616-
case e: IOException if streamCompressedOrEncrypted =>
618+
case e: IOException if detectCorruption =>
617619
IOUtils.closeQuietly(this)
618620
iterator.throwFetchFailedException(blockId, address, e)
619621
}
@@ -635,7 +637,7 @@ private class BufferReleasingInputStream(
635637
try {
636638
delegate.skip(n)
637639
} catch {
638-
case e: IOException if streamCompressedOrEncrypted =>
640+
case e: IOException if detectCorruption =>
639641
IOUtils.closeQuietly(this)
640642
iterator.throwFetchFailedException(blockId, address, e)
641643
}
@@ -647,7 +649,7 @@ private class BufferReleasingInputStream(
647649
try {
648650
delegate.read(b)
649651
} catch {
650-
case e: IOException if streamCompressedOrEncrypted =>
652+
case e: IOException if detectCorruption =>
651653
IOUtils.closeQuietly(this)
652654
iterator.throwFetchFailedException(blockId, address, e)
653655
}
@@ -657,7 +659,7 @@ private class BufferReleasingInputStream(
657659
try {
658660
delegate.read(b, off, len)
659661
} catch {
660-
case e: IOException if streamCompressedOrEncrypted =>
662+
case e: IOException if detectCorruption =>
661663
IOUtils.closeQuietly(this)
662664
iterator.throwFetchFailedException(blockId, address, e)
663665
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ private[spark] object Utils extends Logging {
347347
def copyStreamUpTo(in: InputStream, maxSize: Long): (Boolean, InputStream) = {
348348
var count = 0L
349349
val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate)
350-
val streamCopied = tryWithSafeFinally {
350+
val fullyCopied = tryWithSafeFinally {
351351
val bufSize = Math.min(8192L, maxSize)
352352
val buf = new Array[Byte](bufSize.toInt)
353353
var n = 0
@@ -368,10 +368,10 @@ private[spark] object Utils extends Logging {
368368
out.close()
369369
}
370370
}
371-
if (streamCopied) {
372-
(streamCopied, out.toChunkedByteBuffer.toInputStream(dispose = true))
371+
if (fullyCopied) {
372+
(fullyCopied, out.toChunkedByteBuffer.toInputStream(dispose = true))
373373
} else {
374-
(streamCopied, new SequenceInputStream(
374+
(fullyCopied, new SequenceInputStream(
375375
out.toChunkedByteBuffer.toInputStream(dispose = true), in))
376376
}
377377
}

core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -495,8 +495,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
495495
true,
496496
taskContext.taskMetrics.createTempShuffleReadMetrics())
497497

498-
// Only one block should be returned which has corruption after maxBytesInFlight/3 because the
499-
// other block will detect corruption on first fetch, and then get added to the queue again for
498+
// We'll get back the block which has corruption after maxBytesInFlight/3 because the other
499+
// block will detect corruption on first fetch, and then get added to the queue again for
500500
// a retry
501501
val (id, st) = iterator.next()
502502
assert(id === shuffleBlockId2)
@@ -507,7 +507,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
507507
}
508508

509509
// Following will succeed as it reads part of the stream which is not corrupt. This will read
510-
// maxBytesInFlight/3 bytes from first stream and remaining from the second stream
510+
// maxBytesInFlight/3 bytes from the portion copied into memory, and remaining from the
511+
// underlying stream
511512
new DataInputStream(st).readFully(
512513
new Array[Byte](streamNotCorruptTill), 0, streamNotCorruptTill)
513514

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
220220

221221
val limit = 1000
222222
// testing for inputLength less than, equal to and greater than limit
223-
List(998, 999, 1000, 1001, 1002).foreach { inputLength =>
223+
(limit - 2 to limit + 2).foreach { inputLength =>
224224
val in = new ByteArrayInputStream(bytes.take(inputLength))
225225
val (fullyCopied: Boolean, mergedStream: InputStream) = Utils.copyStreamUpTo(in, limit)
226226
try {

0 commit comments

Comments
 (0)