Skip to content

Conversation

@xkrogen
Copy link
Contributor

@xkrogen xkrogen commented Apr 28, 2021

What changes were proposed in this pull request?

Introduce new shared methods to ShuffleBlockFetcherIteratorSuite to replace copy-pasted code. Use modern, Scala-like Mockito Answer syntax.

Why are the changes needed?

ShuffleFetcherBlockIteratorSuite has tons of duplicate code, like

val iterator = new ShuffleBlockFetcherIterator(
taskContext,
transfer,
blockManager,
blocksByAddress,
(_, in) => in,
48 * 1024 * 1024,
Int.MaxValue,
Int.MaxValue,
Int.MaxValue,
true,
false,
metrics,
false)
. It's challenging to tell what the interesting parts are vs. what is just being set to some default/unused value.

Similarly but not as bad, there are many calls like the following

verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), any())
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())).thenAnswer ...

These changes result in about 10% reduction in both lines and characters in the file:

# Before
> wc core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
    1063    3950   43201 core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

# After
> wc core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
     928    3609   39053 core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

It also helps readability, e.g.:

    val iterator = createShuffleBlockIteratorWithDefaults(
      transfer,
      blocksByAddress,
      maxBytesInFlight = 1000L
    )

Now I can clearly tell that maxBytesInFlight is the main parameter we're interested in here.

Does this PR introduce any user-facing change?

No, test only. There aren't even any behavior changes, just refactoring.

How was this patch tested?

Unit tests pass.

@github-actions github-actions bot added the CORE label Apr 28, 2021
@SparkQA
Copy link

SparkQA commented Apr 28, 2021

Test build #138054 has finished for PR 32389 at commit 322593f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xkrogen xkrogen force-pushed the xkrogen-spark-35263-refactor-shuffleblockfetcheriteratorsuite branch from 322593f to c96a16e Compare April 28, 2021 20:50
@SparkQA
Copy link

SparkQA commented Apr 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42573/

@SparkQA
Copy link

SparkQA commented Apr 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42573/

@SparkQA
Copy link

SparkQA commented Apr 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42574/

@SparkQA
Copy link

SparkQA commented Apr 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42574/

@SparkQA
Copy link

SparkQA commented Apr 28, 2021

Test build #138055 has finished for PR 32389 at commit c96a16e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

cc @Ngone51 @mridulm FYI

@Ngone51
Copy link
Member

Ngone51 commented Apr 29, 2021

This's a great refactor. I left some minor comments. Overall, looks good to me.

@xkrogen xkrogen force-pushed the xkrogen-spark-35263-refactor-shuffleblockfetcheriteratorsuite branch from 59137e2 to 596c3dd Compare April 29, 2021 16:21
@xkrogen
Copy link
Contributor Author

xkrogen commented Apr 29, 2021

Thanks @Ngone51 for the suggestions!

I put up two more commits. The first pretty directly answers your comments and has some other small fixes I noticed.

The second one is a bit larger and was inspired by your comment about moving the helper methods from the object to the class. All of the tests make use of val transfer = mock(classOf[BlockTransferService]) and then pass this around to the helper methods. I moved this to be a field instead and then we can remove transfer from being passed in the method signatures. I think it's cleaner but open to feedback on that part, I am happy to revert if you/others don't find it useful.

It does reduce the size by another ~40 lines:

> wc core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
     894    3509   37951 core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42607/

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42607/

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Test build #138087 has finished for PR 32389 at commit 596c3dd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xkrogen
Copy link
Contributor Author

xkrogen commented May 4, 2021

@Ngone51 do you have any more comments here? Thanks a lot for your comments so far!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be extended to support the case of multiple blockmanagers, e.g.,

val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
  (localBmId, localBlocks.keys.map(blockId => (blockId, 1L, 0)).toSeq),
  (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 1L, 1)).toSeq),
  (hostLocalBmId, hostLocalBlocks.keys.map(blockId => (blockId, 1L, 1)).toSeq)
).toIterator

We can pass in a Map[BlockManagerId, (blocks, size, mapIndex)] instead.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I focused on the single-BM case because it was simpler, but you're right that there was still a lot of common logic to be reduced. Actually, your suggestion made me realize that we only ever use this method (and the new multi-BM method I created) to create an iterator which is then passed to getShuffleIteratorWithDefaults, so I just made getShuffleIteratorWithDefaults directly accept a Map. I think it's very clean now, thank you for the suggestion!

@Ngone51
Copy link
Member

Ngone51 commented May 8, 2021

cc @mridulm @tgravescs @attilapiros

@xkrogen xkrogen force-pushed the xkrogen-spark-35263-refactor-shuffleblockfetcheriteratorsuite branch from 596c3dd to 196cb06 Compare May 10, 2021 16:21
@xkrogen
Copy link
Contributor Author

xkrogen commented May 10, 2021

Great comments @Ngone51 ! Pushed up a new set of commits addressing your comments.

@SparkQA
Copy link

SparkQA commented May 10, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42860/

@SparkQA
Copy link

SparkQA commented May 10, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42860/

@SparkQA
Copy link

SparkQA commented May 10, 2021

Test build #138338 has finished for PR 32389 at commit 196cb06.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@otterc
Copy link
Contributor

otterc commented May 11, 2021

@xkrogen @Ngone51 @mridulm Gentle ping folks to check if this PR can be merged. To me it looks like it is ready.
I need to rework the tests in my PR once this is merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I was oscillating between this being a Map vs Seq ... currently, a Map is fine based on how ShuffleBlockFetcherIterator is used ... but might be something we revisit in future.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor nits, really nice work @xkrogen !

@xkrogen xkrogen force-pushed the xkrogen-spark-35263-refactor-shuffleblockfetcheriteratorsuite branch from b2abb87 to 63bf0c4 Compare May 17, 2021 18:04
@SparkQA
Copy link

SparkQA commented May 17, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43161/

@SparkQA
Copy link

SparkQA commented May 17, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43161/

@SparkQA
Copy link

SparkQA commented May 17, 2021

Test build #138641 has finished for PR 32389 at commit 4fcc6be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, thanks for working on this @xkrogen !
+CC @Ngone51, @otterc PTAL

Copy link
Contributor

@otterc otterc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Just a minor nit

xkrogen added 11 commits May 18, 2021 10:10
…especially instantiating the ShuffleBlockFetcherIterator
…ito syntax and pull out common calls to when(transfer.fetchBlocks) and verify(transfer, ...).fetchBlocks
…BlocksCount to verifyFetchBlocksInvocationCount. Restore helpful comments. Remove one redundant parameter override. Fix upa few minor issues such as unnecessary specification of very long types.
@xkrogen xkrogen force-pushed the xkrogen-spark-35263-refactor-shuffleblockfetcheriteratorsuite branch from 4fcc6be to eea80f5 Compare May 18, 2021 17:14
@SparkQA
Copy link

SparkQA commented May 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43211/

@SparkQA
Copy link

SparkQA commented May 18, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43211/

@SparkQA
Copy link

SparkQA commented May 18, 2021

Test build #138690 has finished for PR 32389 at commit eea80f5.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@asfgit asfgit closed this in 186477c May 19, 2021
@mridulm
Copy link
Contributor

mridulm commented May 19, 2021

Merging to master, thanks @xkrogen.
Thanks for the reviews @Ngone51 , @otterc !

@xkrogen xkrogen deleted the xkrogen-spark-35263-refactor-shuffleblockfetcheriteratorsuite branch May 19, 2021 16:44
@xkrogen
Copy link
Contributor Author

xkrogen commented May 19, 2021

Thanks for the reviews @mridulm , @otterc, and @Ngone51 ! Much appreciated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants