Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ class NewHadoopRDD[K, V](

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.getConstructor().newInstance()
// setMinPartitions below will call FileInputFormat.listStatus(), which can be quite slow when
// traversing a large number of directories and files. Parallelize it.
_conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS,
Runtime.getRuntime.availableProcessors().toString)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we shall revamp the value for k8s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you refer to the existing code if we have done this before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,

conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS,
Runtime.getRuntime.availableProcessors().toString)

conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS,
Runtime.getRuntime.availableProcessors().toString)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this availableProcessors get the actual number of available CPU cores in container instances, VM instances and physical machine instances?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if using Runtime.getRuntime.availableProcessors() is really reasonable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, as I mentioned #34792 (comment), maybe some magic number 8/16 works better for all scenarios if we choose to hardcode it

inputFormat match {
case configurable: Configurable =>
configurable.setConf(_conf)
Expand Down