Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/sql-performance-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,28 @@ that these options will be deprecated in future release as more optimizations ar
</td>
<td>1.1.0</td>
</tr>
<tr>
<td><code>spark.sql.sources.parallelPartitionDiscovery.threshold</code></td>
<td>32</td>
<td>
Configures the threshold to enable parallel listing for job input paths. If the number of
input paths is larger than this threshold, Spark will list the files by using Spark distributed job.
Otherwise, it will fallback to sequential listing. This configuration is only effective when
using file-based data sources such as Parquet, ORC and JSON.
</td>
<td>1.5.0</td>
</tr>
<tr>
<td><code>spark.sql.sources.parallelPartitionDiscovery.parallelism</code></td>
<td>10000</td>
<td>
Configures the maximum listing parallelism for job input paths. In case the number of input
paths is larger than this value, it will be throttled down to use this value. Same as above,
this configuration is only effective when using file-based data sources such as Parquet, ORC
and JSON.
</td>
<td>2.1.1</td>
</tr>
</table>

## Join Strategy Hints for SQL Queries
Expand Down
11 changes: 11 additions & 0 deletions docs/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,17 @@ parent RDD's number of partitions. You can pass the level of parallelism as a se
or set the config property `spark.default.parallelism` to change the default.
In general, we recommend 2-3 tasks per CPU core in your cluster.

## Parallel Listing on Input Paths

Sometimes you may also need to increase directory listing parallelism when job input has large number of directories,
otherwise the process could take a very long time, especially when against object store like S3.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about remote HDFS? Since this looks like a general issue for remote storage access. Especially, in disaggregated clusters where remote storage (HDFS/S3) are used, can we generalize more like the following?

- especially when against object store like S3
- especially when against remote HDFS or S3 or in the disaggregated clusters

Copy link
Member Author

@sunchao sunchao Aug 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how "remote" the storage is. For HDFS, depending on the use case the compute and storage can still be deployed within the same region or even zone and therefore network/metadata cost is much cheaper than that from S3.

Therefore, I think we can stick with the S3 case as it is more characteristic. Let me know if you think otherwise.

If your job works on RDD with Hadoop input formats (e.g., via `SparkContext.sequenceFile`), the parallelism is
controlled via [`spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads`](https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml) (currently default is 1).

For Spark SQL with file-based data sources, you can tune `spark.sql.sources.parallelPartitionDiscovery.threshold` and
`spark.sql.sources.parallelPartitionDiscovery.parallelism` to improve listing parallelism. Please
refer to [Spark SQL performance tuning guide](sql-performance-tuning.html) for more details.

## Memory Usage of Reduce Tasks

Sometimes, you will get an OutOfMemoryError not because your RDDs don't fit in memory, but because the
Expand Down