Skip to content

Conversation

@bogdanrdc
Copy link
Contributor

What changes were proposed in this pull request?

Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor.

How was this patch tested?

New test in FileIndexSuite

(SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale
if (estimate > Int.MaxValue) {
throw new IllegalStateException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be an error. It's just a weight. Capping at Int.MaxValue is no problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's capped, the cache might use more memory than configured with spark.sql.hive.filesourcePartitionFileCacheSize.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, though, this can only happen if the filesourcePartitionFileCacheSize is at least 64GB and some object is at least 64GB. The effect is to possibly cache things longer than they should, which seems better than failing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I guess it's better to fail later than sooner. I made it a warning instead.

* than the size of one [[FileStatus]]).
* so it will support objects up to 64GB in size.
*/
private val weightScale = 32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than make it a member variable, just a local variable in the initializer for cache?

@SparkQA
Copy link

SparkQA commented Apr 10, 2017

Test build #75660 has finished for PR 17591 at commit f4ae52a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 10, 2017

Test build #75663 has finished for PR 17591 at commit ef8e9e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
val fileStatusCache = FileStatusCache.getOrCreate(spark)
fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
// scalastyle:off
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove this comment block, the JIRA should be used for tracking these things.

.removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]])
private val cache: Cache[(ClientId, Path), Array[FileStatus]] = {
/* [[Weigher]].weigh returns Int so we could only cache objects < 2GB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Could you use java style comments // here?

}
}
})
.removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kinda hard to read. Can we just initialize the weighter and the listener in separate variables?

@hvanhovell
Copy link
Contributor

hvanhovell commented Apr 10, 2017

LGTM - pending jenkins

@SparkQA
Copy link

SparkQA commented Apr 10, 2017

Test build #75665 has finished for PR 17591 at commit e02fc4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

hvanhovell commented Apr 10, 2017

Merging to master/2.1. Thanks!

asfgit pushed a commit that referenced this pull request Apr 10, 2017
## What changes were proposed in this pull request?

Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor.

## How was this patch tested?
New test in FileIndexSuite

Author: Bogdan Raducanu <[email protected]>

Closes #17591 from bogdanrdc/SPARK-20280.

(cherry picked from commit f6dd8e0)
Signed-off-by: Herman van Hovell <[email protected]>
@asfgit asfgit closed this in f6dd8e0 Apr 10, 2017
peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
## What changes were proposed in this pull request?

Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor.

## How was this patch tested?
New test in FileIndexSuite

Author: Bogdan Raducanu <[email protected]>

Closes apache#17591 from bogdanrdc/SPARK-20280.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants