Skip to content

Conversation

@fidato13
Copy link
Contributor

@fidato13 fidato13 commented Oct 2, 2016

What changes were proposed in this pull request?

This Pull request comprises of the critical bug SPARK-16575 changes. This change rectifies the issue with BinaryFileRDD partition calculations as upon creating an RDD with sc.binaryFiles, the resulting RDD always just consisted of two partitions only.

How was this patch tested?

The original issue ie. getNumPartitions on binary Files RDD (always having two partitions) was first replicated and then tested upon the changes. Also the unit tests have been checked and passed.

This contribution is my original work and I licence the work to the project under the project's open source license

@srowen @hvanhovell @rxin @vanzin @skyluc @kmader @zsxwing @datafarmer Please have a look .

@rxin
Copy link
Contributor

rxin commented Oct 2, 2016

I don't actually think this is a bug, because it is intended to do some coalescing. If there is an issue, the issue would be that we don't take the cost of individual files into account in this code path. The Spark SQL automatic coalescing code path does take that into account.

@fidato13
Copy link
Contributor Author

fidato13 commented Oct 2, 2016

Hi @rxin ,
Yes, I agree and I believe SQL does take it into account. Should we proceed with this pull request as it does give the correct partitions for other components which may be helpful and rectifies the previous mismatch introduced in 1.6.1 . Please advise.

@rxin
Copy link
Contributor

rxin commented Oct 2, 2016

The problem is that it does not make sense to create one partition per file, when files are small. This was actually a bug with the previous implementation.

@fidato13
Copy link
Contributor Author

fidato13 commented Oct 2, 2016

@rxin Yes, it makes perfect sense to not create a partition per file.
Looking at the code in PortableDataStream.setMinPartitions:-
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong

Unless the user specifies minPartitions , the default picked would most likely create just two partitions always irrespective of the size and count of the files.

The change in this pull request makes the partition count consistent with other RDD types . For example , textFile and binaryFiles would be presenting the same number of partitions.

For sure, your concern is to be kept at top priority that is to not create one partition per small file and which most likely may be kept as an improvement change in the near future?

@rxin
Copy link
Contributor

rxin commented Oct 4, 2016

Somehow github didn't email me at all. I think we can follow something like what Spark SQL does, i.e. two settings: one for the size of each partition, and another for the cost of opening a file.

PS: Would it make more sense to just add binary file support to Spark SQL, and then call it a day?

@fidato13
Copy link
Contributor Author

fidato13 commented Oct 4, 2016

@rxin, Thanks for this. I think we can surely go for creating one partition per 1 block size(default block size 128MB) and would you be able to point me about the second's(cost of opening a file) implementation please.

@kmader
Copy link
Contributor

kmader commented Oct 4, 2016

@rxin on the PS, how would you foresee the SQL implementation for binary support? is there a standard method of going from bytestreams to dataframes?

@fidato13
Copy link
Contributor Author

@rxin, The support for openCostInBytes, similar to SQL has now been added for the affected binaryFiles case. Can I please seek for your review and valuable suggestion for taking this forward and merging this pull request. Thanks for your awesome suggestion!

@fidato13
Copy link
Contributor Author

@srowen
Copy link
Member

srowen commented Oct 14, 2016

If this mirrors some code in Spark SQL, can we refactor the commonality? I don't know this code well but given the discussion I was expecting more sharing. What is the corresponding config code that this is following?

@fidato13
Copy link
Contributor Author

Hi @srowen

The Spark SQL property/algo that binaryFiles partition calculation is now implementing is the property "spark.files.openCostInBytes" (in org.apache.spark.sql.internal.SQLConf) utilized in org.apache.spark.sql.execution.FileSourceScanExec inside method createNonBucketedReadRDD .

As the original issue states that sc.binaryFiles is always creating an RDD with number of partitions as 2 (irrespective of number and size of files say for 1000 files or for even GB's of file , It will only create two partitions only).

To resolve , Reynold suggested that we can probably implement somthing like Spark SQL does i.e. to consider the cost of opening a file(more description in the JIRA issue) so that it doesn't create large number of partitions for small files.

Please let me know your suggestions on this, I have now verified that it's now creating an optimized number of partitions . Also, since both properties are used for their respective config's and for the SPARK CORE it's used just once for this issue , Can you please advise about if it would be okay to keep them or How would you like to proceed on this.

Thanks

@fidato13
Copy link
Contributor Author

/**

  • Create an RDD for non-bucketed reads.

  • The bucketed variant of this function is [[createBucketedReadRDD]].
    *

  • @param readFile a function to read each (part of a) file.

  • @param selectedPartitions Hive-style partition that are part of the read.

  • @param fsRelation [[HadoopFsRelation]] associated with the read.
    */
    private def createNonBucketedReadRDD(
    readFile: (PartitionedFile) => Iterator[InternalRow],
    selectedPartitions: Seq[Partition],
    fsRelation: HadoopFsRelation): RDD[InternalRow] = {
    val defaultMaxSplitBytes =
    fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
    val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
    val totalBytes = selectedPartitions.flatMap(.files.map(.getLen + openCostInBytes)).sum
    val bytesPerCore = totalBytes / defaultParallelism

    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

================ This is the calculation happening currently in Spark SQL while considering openCostInBytes to avoid creating large number of partitions.

@fidato13 fidato13 changed the title [SPARK-16575] [spark core] partition calculation mismatch with sc.binaryFiles [SPARK-16575][CORE] partition calculation mismatch with sc.binaryFiles Oct 14, 2016
@fidato13
Copy link
Contributor Author

@srowen @rxin @zsxwing Can you please have a look and advise.
Thanks

@rxin
Copy link
Contributor

rxin commented Nov 2, 2016

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Nov 2, 2016

Test build #3394 has finished for PR 15327 at commit 4ebf501.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fidato13
Copy link
Contributor Author

fidato13 commented Nov 3, 2016

ping @rxin @srowen @zsxwing Can you please have a look and advise Jenkins to retest . In this latest commit fixed "SPARK-12527, it is discarded unused. You may specify targets with meta-annotations" warning, which caused the build to fail due to the above mentioned warning.

@rxin
Copy link
Contributor

rxin commented Nov 3, 2016

triggered.

@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #3406 has finished for PR 15327 at commit 18a8554.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fidato13
Copy link
Contributor Author

fidato13 commented Nov 4, 2016

ping @rxin @srowen @zsxwing The build and tests have passed. Request you to have a look and merge as you get time. Thanks!

@srowen
Copy link
Member

srowen commented Nov 7, 2016

@fidato13 I've gotten 5 pings in the past week. Please address the person you're working with for the review and give people time to reply.

@fidato13
Copy link
Contributor Author

fidato13 commented Nov 7, 2016

Sure, I had tried to involve all the persons having their previous commits on the file(as mentioned in the guideline). I will remove all but Reynold from the list. Thanks.

@rxin
Copy link
Contributor

rxin commented Nov 8, 2016

Merging in master/branch-2.1. Thanks.

asfgit pushed a commit that referenced this pull request Nov 8, 2016
## What changes were proposed in this pull request?

This Pull request comprises of the critical bug SPARK-16575 changes. This change rectifies the issue with BinaryFileRDD partition calculations as  upon creating an RDD with sc.binaryFiles, the resulting RDD always just consisted of two partitions only.
## How was this patch tested?

The original issue ie. getNumPartitions on binary Files RDD (always having two partitions) was first replicated and then tested upon the changes. Also the unit tests have been checked and passed.

This contribution is my original work and I licence the work to the project under the project's open source license

srowen hvanhovell rxin vanzin skyluc kmader zsxwing datafarmer Please have a look .

Author: fidato <[email protected]>

Closes #15327 from fidato13/SPARK-16575.

(cherry picked from commit 6f36971)
Signed-off-by: Reynold Xin <[email protected]>
@asfgit asfgit closed this in 6f36971 Nov 8, 2016
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

This Pull request comprises of the critical bug SPARK-16575 changes. This change rectifies the issue with BinaryFileRDD partition calculations as  upon creating an RDD with sc.binaryFiles, the resulting RDD always just consisted of two partitions only.
## How was this patch tested?

The original issue ie. getNumPartitions on binary Files RDD (always having two partitions) was first replicated and then tested upon the changes. Also the unit tests have been checked and passed.

This contribution is my original work and I licence the work to the project under the project's open source license

srowen hvanhovell rxin vanzin skyluc kmader zsxwing datafarmer Please have a look .

Author: fidato <[email protected]>

Closes apache#15327 from fidato13/SPARK-16575.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants