Skip to content

Conversation

@eyalfa
Copy link

@eyalfa eyalfa commented Aug 5, 2017

What changes were proposed in this pull request?

introduced DiskBlockData, a new implementation of BlockData representing a whole file.
this is somehow related to SPARK-6236 as well

This class follows the implementation of EncryptedBlockData just without the encryption. hence:

  • toInputStream is implemented using a FileInputStream (todo: encrypted version actually uses Channels.newInputStream, not sure if it's the right choice for this)
  • toNetty is implemented in terms of io.netty.channel.DefaultFileRegion
  • toByteBuffer fails for files larger than 2GB (same behavior of the original code, just postponed a bit), it also respects the same configuration keys defined by the original code to choose between memory mapping and simple file read.

How was this patch tested?

added test to DiskStoreSuite and MemoryManagerSuite

@eyalfa
Copy link
Author

eyalfa commented Aug 5, 2017

@rxin, @JoshRosen , @cloud-fan ,
you seem to be the last guys to touch this class, can you please review?

@eyalfa eyalfa changed the title [Spark 3151][Block Manager] DiskStore.getBytes fails for files larger than 2GB [Spark-3151][Block Manager] DiskStore.getBytes fails for files larger than 2GB Aug 5, 2017
@eyalfa eyalfa changed the title [Spark-3151][Block Manager] DiskStore.getBytes fails for files larger than 2GB [SPARK-3151][Block Manager] DiskStore.getBytes fails for files larger than 2GB Aug 6, 2017
}

override def toByteBuffer(): ByteBuffer = {
require( size < Int.MaxValue
Copy link
Member

@kiszk kiszk Aug 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to check blockSize since this method refers to blockSize?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk , not sure I'm following your comment.
this requirement results with an explicit errors when one tries to obtain a ByteBuffer larger than java.nio's limitations.
original code used to fail in line 115 when calling ByteBuffer.allocate with block size larger than 2GB, newer code fails explicitly in this cae.

...or do you mean refer the blockSize val rather than the size method? can't really see a difference in that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for confusing you. I mean the last line in your comment. I know the size method has the same value as blockSize since I saw the size method.
Other places in toByteBuffer uses blockSize instead of size. For ease of code reading, it would be good to use blockSize instead of size here.
What do you think?


override def dispose(): Unit = {}

private def open() = new FileInputStream(file).getChannel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove 2 spaces for better indent.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do 😎

val blockId = BlockId("rdd_1_2")
diskStore.put(blockId) { chan =>
val arr = new Array[Byte](mb)
for{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for (

val arr = new Array[Byte](mb)
for{
_ <- 0 until 2048
}{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: } {

val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf))

val mb = 1024*1024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 1024 * 1024

@eyalfa
Copy link
Author

eyalfa commented Aug 6, 2017

@kiszk , fixed styling+readability according to your comments.
BTW, any idea why JIRA didn't associate this PR with SPARK-3151?

@HyukjinKwon
Copy link
Member

@cloud-fan
Copy link
Contributor

Looks reasonable, can you explain which end-to-end cases this patch fixed? and also which end-to-end cases remain problematic.

@kiszk
Copy link
Member

kiszk commented Aug 7, 2017

Sounds good to me except BlockManagerSuite.scala. I am not familiar whether this new test checks new code well or not.

@eyalfa
Copy link
Author

eyalfa commented Aug 7, 2017

@HyukjinKwon , interesting reading but I couldn't find a concrete reason or solution to the issue.
@cloud-fan, I've encountered this bug when working with a disk persisted RDD, it turned out some of our partitions exceeded the 2GB limit and failed on DiskStore.getBytes.

since we're using a specific commercial distro I couldn't test this patch on my use case (we got rid of the large partitions anyway by tuning number of partitions and tweaking the DAG altogether 😎 ), so I did the next best thing: reproduce the issue in a test case, please see test "blocks larger than 2gb" in DiskStoreSuite inthis PR.

}
}

override def toByteBuffer(): ByteBuffer = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will still hit the 2g limitation here, I'm wondering which end-to-end use cases are affected by it.

Copy link
Author

@eyalfa eyalfa Aug 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed.
I chose to postpone the failure from DiskStroe.getBytes to this place as I believe it introduces no regression while still allowing the more common 'streaming' like use-case.

further more, I think this plays well with the comment about future deprecation of org.apache.spark.network.buffer.ManagedBuffer#nioByteBuffer which seems to be the main reason for BlockData exposing the toByteBuffer method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
it took me roughly 4 hours, but I looked both at the shuffle cod path and at BlockManager.getRemoteBytes:
it seems the first is robust to large blocks by using Netty's stream capabilities,
the later seems to be broken as it's not using the Netty's streaming capabilities and actually tries to copy the result buffer into a heap based buffer. I think this deserves its own JIRA/PR.
I think these two places plus the external shuffle server cover most of the relevant use cases (aside from local caching which i believe this PR completes in terms of being 2GB proof).

}
}

val blockData = diskStore.getBytes(blockId)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk, this is the test case I was referring to.
I actually introduced it prior to actually (hopefully) fixing the bug in DiskStore.getBytes.

@eyalfa eyalfa changed the title [SPARK-3151][Block Manager] DiskStore.getBytes fails for files larger than 2GB [SPARK-3151] [Block Manager] DiskStore.getBytes fails for files larger than 2GB Aug 7, 2017
}

private class DiskBlockData(
conf: SparkConf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can pass in minMemoryMapBytes directly.

}
}

def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: storageLevel: StorageLevel

implicitly[ClassTag[Array[Byte]]],
mkBlobs _
)
withClue(res1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does res1 have a reasonable string representation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd print an Either where left side is a case class with members: iterator (prints as empty/non empty iterator), an enum and number of bytes.
right side is an iterator, again this'd print an empty/not-empty iterator.

withClue(res1) {
assert(res1.isLeft)
assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
case (a, b) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a === b?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't compare Arrays, you get identity equality which is usually not what you want. hence the .seq that forces it to be wrapped with a Seq

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=== is a helper method in scala test and should be able to compare arrays

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if === does not work, you have Arrays.equal, which is null-safe.

}
}

test("getOrElseUpdate > 2gb, storage level = disk only") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just write a test in DiskStoreSuite?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh we already have, then why we have these tests?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests cover more than just the DiskOnly storage level, they were crafted when I had bigger ambitions of solving the entire 2GB issue 😎 , that was before seeing some ~100 files pull requests being abandoned or rejected.
aside, these tests also test the entire orchestration done by BlockManager when an RDD requests a cached partition, notice that these tests intentionally makes two calls to the BlockManager in order to simulate both code paths (cache-hit, cache-miss).

}

val blockData = diskStore.getBytes(blockId)
assert(blockData.size == 2 * gb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test with 3gb to be more explicit that it's larger than 2gb?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possible, will fix.
I guess I aimed for the lowest possible failing value

@eyalfa
Copy link
Author

eyalfa commented Aug 9, 2017 via email

@eyalfa
Copy link
Author

eyalfa commented Aug 11, 2017

@cloud-fan I think I found the sbt setting that controlled max heap size for forked tests, I've increased it from 3g to 6g.
cc: @srowen, @vanzin and @a-roberts you guys seem to be the last ones to update this area in sbt, please review.

@SparkQA
Copy link

SparkQA commented Aug 11, 2017

Test build #80534 has finished for PR 18855 at commit 6cbe8d0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.map { case (k,v) => s"-D$k=$v" }.toSeq,
javaOptions in Test += "-ea",
javaOptions in Test ++= "-Xmx3g -Xss4096k"
javaOptions in Test ++= "-Xmx6g -Xss4096k"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about this change. Since the change to BlockManagerSuite is not very related to this PR, can we revert and revisit it in follow-up PR? Then we can unblock this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , let's wait few hours and see what the other guys CCed for this (the last ones to edit the build) have to say about this. if they are also worried or do not comment I'll revert this.

I must say I'm reluctant to revert these tests as I personally believe that lack of such tests contributed to spark's 2GB issues, including this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am +1 for separating it if this can be. Let's get some changes we are sure of into the code base first.

}

override def toByteBuffer(): ByteBuffer = {
require( blockSize < Int.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no space after (; comma should be on this line.

withClue(res1) {
assert(res1.isLeft)
assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
case (a, b) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if === does not work, you have Arrays.equal, which is null-safe.

val iter = store
.serializerManager
.dataDeserializeStream(RDDBlockId(42, 0)
, inpStrm)(implicitly[ClassTag[Array[Byte]]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comma goes in the previous line. inpStrm is kind of an ugly variable name; pick one: is, in, inputStream.

}
}

def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you measured how long these tests take? I've seen this tried before in other changes related to 2g limits, and this kind of test was always ridiculously slow.

You can avoid this kind of test by making the chunk size configurable, e.g. in this line you're adding above:

val chunkSize = math.min(remaining, Int.MaxValue)

Then your test can run fast and not use a lot of memory. You just need to add extra checks that the data is being chunked properly, instead of relying on the JVM not throwing errors at you.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin ,
I've measured, test cases times range from 7-25 seconds on my laptop.
point well taken 😎

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7-25 seconds is really a long time for a unit test...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I know,
it even gets worse when using the === operator.

I'm currently exploring the second direction pointed by @vanzin , introducing a test-only configuration key to configure the max page size

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , @vanzin ,
taking the 'parameterized approach', I'd remove most of the tests from BlockManagerSuite as they'd require propagating this parameter to too many subsystems.
so, I'm going to modify DiskStore and DiskStoreSuite to use such a parameter, I'm not sure about leaving a test-case in BlockManagerSuite that tests DISK_ONLY persistence, what do you guys think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do them together in a follow-up PR? I think the test case in DiskStoreSuite is enough.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, currently working on:

  1. parameterizing DiskStore and DiskStoreSuite
  2. revert the tests in BlockManagerSuite
  3. revert the 6gb change in sbt

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be probably easier to propagate the chunk size as a SparkConf entry that is not documented. But up to you guys.

@eyalfa
Copy link
Author

eyalfa commented Aug 15, 2017 via email

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80697 has finished for PR 18855 at commit 8be899f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok to me but I'll let Wenchen take a final look.

val chunkedByteBuffer = blockData.toChunkedByteBuffer(ByteBuffer.allocate)
val chunks = chunkedByteBuffer.chunks
assert(chunks.size === 2)
for( chunk <- chunks ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for (chunk...) {

.set("spark.storage.memoryMapLimitForTests", "10k" )
val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this empty line

blockData.toByteBuffer()
}

assert(e.getMessage ==
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ===

@SparkQA
Copy link

SparkQA commented Aug 16, 2017

Test build #80716 has finished for PR 18855 at commit 732073c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@eyalfa
Copy link
Author

eyalfa commented Aug 16, 2017

@cloud-fan , @vanzin ,
any idea what happened to this build? seem environment issue after a successful build (0 failed tests, 'Build step 'Execute shell' marked build as failure')
can one of you kindly ask jenkins to retest?

@cloud-fan
Copy link
Contributor

retest this please


private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
s"${Int.MaxValue}b")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just Int.MaxValue.toString

}

private class DiskBlockData(
minMemoryMapBytes : Long,
Copy link
Contributor

@cloud-fan cloud-fan Aug 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space before :

// I chose to leave to original error message here
// since users are unfamiliar with the configureation key
// controling maxMemoryMapBytes for tests
require(blockSize < maxMemoryMapBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need this? I think we can see the original error message if we don't have this check and go to the memory map code path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this is to verify the bug fix.

// controling maxMemoryMapBytes for tests
require(blockSize < maxMemoryMapBytes,
s"can't create a byte buffer of size $blockSize" +
s" since it exceeds Int.MaxValue ${Int.MaxValue}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it exceeds $maxMemoryMapBytes is more accurate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or call Utils.bytesToString to make it more readable.

@cloud-fan
Copy link
Contributor

LGTM except some minor comments, thanks for working on it!

@SparkQA
Copy link

SparkQA commented Aug 16, 2017

Test build #80728 has finished for PR 18855 at commit 732073c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2017

Test build #80747 has finished for PR 18855 at commit d0c98a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in b8ffb51 Aug 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants