Skip to content

Conversation

@pan3793
Copy link
Member

@pan3793 pan3793 commented Apr 30, 2025

What changes were proposed in this pull request?

On a busy Hadoop cluster, the GetFileInfo and GetBlockLocations contribute the most RPCs to the HDFS NameNode. After investigating the Spark Parquet vectorized reader, I think 3/4 RPCs can be reduced.

Xnip2025-04-30_16-23-27

Currently, the Parquet vectorized reader produces 4 NameNode RPCs on reading each file (or split):

  1. Read the footer - one GetFileInfo and one GetBlockLocations
  2. Read the data (row groups) - one GetFileInfo and one GetBlockLocations

The key idea of this PR is:

  1. Driver already knows the FileStatus for each Parquet file during the planning phase, we can transfer the FileStatus from the driver to the executor via PartitionFile, so that the task doesn't need to ask the NameNode again, this saves two GetFileInfo RPCs.
  2. Reuse the SeekableInputStream on reading footer and row groups, this saves one GetBlockLocations RPC.
image

The PR requires some changes on the Parquet side first. (Changes are already included in Parquet 1.16.0)

Why are the changes needed?

Reduce unnecessary RPCs of NameNode to improve performance and stability for large Hadoop clusters.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Pass UT

A few UTs are tuned to adapt to the change.

Manual test with TPC-H query.

Manually tested on a small Hadoop cluster, the test uses TPC-H Q4, based on sf3000 Parquet tables.

HDFS NameNode metrics (master VS. this PR)

Xnip2025-04-30_16-43-13 Xnip2025-04-30_16-43-31

Production Verification

The patch has also been deployed to a production cluster for 4 months, where 95% of workloads are Spark jobs.

Before
image

After
image

Was this patch authored or co-authored using generative AI tooling?

No.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor internally calls HadoopInputFile.fromPath(file, configuration), which produces an unnecessary GetFileInfo RPC

  public static HadoopInputFile fromPath(Path path, Configuration conf) throws IOException {
    FileSystem fs = path.getFileSystem(conf);
    return new HadoopInputFile(fs, fs.getFileStatus(path), conf);
  }

@pan3793
Copy link
Member Author

pan3793 commented May 6, 2025

cc @sunchao @wangyum @wgtmac I mark this PR draft because it requires changes on Parquet side first, would be great if you can take a look at this idea first, thank you in advance.

@pan3793 pan3793 marked this pull request as draft May 6, 2025 02:13
@pan3793 pan3793 changed the title [WIP] Reduce HDFS NameNode RPC on vectorized Parquet reader [WIP][SPARK-52011][SQL] Reduce HDFS NameNode RPC on vectorized Parquet reader May 6, 2025
@wangyum
Copy link
Member

wangyum commented May 6, 2025

also cc @turboFei

@pan3793 pan3793 changed the title [WIP][SPARK-52011][SQL] Reduce HDFS NameNode RPC on vectorized Parquet reader [SPARK-52011][SQL] Reduce HDFS NameNode RPC on vectorized Parquet reader Sep 4, 2025
@pan3793 pan3793 marked this pull request as ready for review September 4, 2025 13:38
@github-actions github-actions bot removed the BUILD label Sep 4, 2025
@pan3793
Copy link
Member Author

pan3793 commented Sep 5, 2025

ping @wangyum @sunchao @LuciferYang @yaooqinn

This PR is ready for review. Looking forward to your feedback!

@pan3793
Copy link
Member Author

pan3793 commented Sep 8, 2025

cc @viirya @peter-toth @cloud-fan

filePath: SparkPath,
start: Long,
length: Long,
fileStatus: FileStatus,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, due to the addition of fileStatus, the constructor of PartitionedFile can also be further simplified, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, since fileStatus will hold more state and also participate in serialization, will this lead to additional memory overhead and serialization pressure?

Copy link
Member Author

@pan3793 pan3793 Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fileStatus should occupy a little bit more memory, but I haven't received OOM issues during the rollout of this change to the online cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Are there also risks of breaking internal APIs with modifications similar to those made here and in FileFormat.createMetadataInternalRow?

try {
vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter))
vectorizedReader.initialize(
split, hadoopAttemptContext, Some(inputFile), Some(inputStream), Some(fileFooter))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Although fileFooter should not be null, it's still advisable to use Option(fileFooter) just to be safe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang After second thought, I went back to use Some.

Generally, if something goes wrong, let's fail it immediately rather than letting the illegal state propagate. Here, we should avoid NULL propagation as much as possible.

fileSize: Long = 0L,
otherConstantMetadataColumnValues: Map[String, Any] = Map.empty) {

@transient lazy val filePath: SparkPath = SparkPath.fromFileStatus(fileStatus)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SparkPath.fromFileStatus is cheap enough, we don't need to materialize filePath, so we can save some memory

.withMetadataFilter(metadataFilter).build

val inputFile = HadoopInputFile.fromStatus(file.fileStatus, sharedConf)
val inputStream = inputFile.newStream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to ensure this is properly closed if something goes wrong in the following code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

risks are low but still possible before transferring the ownership of the inputStream to the vectorizedReader, I wrap the code block with a try finally to ensure inputStream won't leak.

This causes indent change, please view the diff with Hide whitespace

image

(None, None, footer)
} else {
ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.SKIP_ROW_GROUPS)
// When there are vectorized reads, we can avoid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract this into a new util method in ParquetFooterReader which perhaps returns the footer and also the input stream? we can then avoid duplicating this code in two places.

Copy link
Member Author

@pan3793 pan3793 Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed, please check the updated ParquetFooterReader.

BTW, I don't see special reason to write this file in Java, as I'm going to use Scala data structures (Tuple, Option) in this class, I converted it to Scala.

.getMetadataFilter();
}
return readFooter(configuration, file.toPath(), filter);
return readFooter(HadoopInputFile.fromStatus(file.fileStatus(), configuration), filter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the SKIP_ROW_GROUPS and WITH_ROW_GROUPS now I think, as they are no longer used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SKIP_ROW_GROUPS is unused now, but WITH_ROW_GROUPS is used by ParquetPartitionReaderFactory.openFileAndReadFooter agg push down case, I removed them and replaced the WITH_ROW_GROUPS with the literal false instead.

private var hasNext = true
private lazy val row: InternalRow = {
val footer = getFooter(file)
val (_, _, footer) = openFileAndReadFooter(file)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read it correctly, previously for non-vectorized case, getFooter skips row groups (SKIP_ROW_GROUPS). But now openFileAndReadFooter always reading footer with row groups?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-vectorized case, detachFileInputStream is false, it still is SKIP_ROW_GROUPS

(None, None, footer)
} else {
ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.SKIP_ROW_GROUPS)
// When there are vectorized reads, we can avoid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was When there are vectorized reads... because getFooter has checked enableVectorizedReader. I think this else block handles both non-vectorized and vectorized cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, logic is controlled by enableVectorizedReader, comment only describes the vectorized reading optimization. If you think the current comment is confusing, I can update it to mention behavior for non-vectorized reading path too.

@pan3793 pan3793 requested review from sunchao and viirya September 11, 2025 16:38
@pan3793
Copy link
Member Author

pan3793 commented Sep 15, 2025

kindly ping @sunchao and @viirya, could you take another look when you have time?

filePath: SparkPath,
start: Long,
length: Long,
fileStatus: FileStatus,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the cost of serializing file status?

Copy link
Member Author

@pan3793 pan3793 Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I think path contributes the majority of the size.

public class FileStatus implements Writable, Comparable<Object>,
    Serializable, ObjectInputValidation {
  ...
  private Path path;           // backed by URI
  private long length;
  private Boolean isdir;
  private short block_replication;
  private long blocksize;
  private long modification_time;
  private long access_time;
  private FsPermission permission;
  private String owner;
  private String group;
  private Path symlink;        // likely be NULL
  private Set<AttrFlags> attr; // AttrFlags is enum
  ...
}

public class FsPermission implements Writable, Serializable,
    ObjectInputValidation {
  ...
  private FsAction useraction = null;  // FsAction is enum
  private FsAction groupaction = null;
  private FsAction otheraction = null;
  private Boolean stickyBit = false;
  ...
}

https://github.com/apache/hadoop/blob/branch-3.4.2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java

https://github.com/apache/hadoop/blob/branch-3.4.2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have a custom serde for it and only send the path string? This reminds me of SerializableConfiguration as these Hadoop classes are usually not optimized for serialization and transport.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems not feasible, because FileStatus has many sub-classes, i.e. S3AFileStatus, ViewFsFileStatus

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan the change basically moves the RPC cost from executor => storage service, to driver => executors, in my env (HDFS with RBF), the latter is much cheaper than the former. I don't have cloud env, so I can't give numbers for object storage services like S3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, then this may cause regression for short queries?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm not sure how much difference this will make it terms of driver memory usage. Is it easy to make the FileStatus optional in PartitionedFile and make it controllable via a flag?

It seems in Parquet Java the file status is only used in one case: https://github.com/apache/parquet-java/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java#L109-L132

Mostly we just need file path and length. But yea this one use case seems critical to avoid duplicated NN call to get the file status again.

Copy link
Member Author

@pan3793 pan3793 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao thanks for your suggestion, after an offline discussion with @cloud-fan, I understand his concerns about the overhead of FileStatus, let me summarize the conclusion and my thoughts:

  1. there may be different Hadoop FileSystem implementations, the getFileStatus might be cheap or have executor-side cache in some implementations, but for our case - HDFS with RBF, it's relatively heavy.
  2. there is an upcoming optimization to replace FileStatusCache with PathCache(only carry necessary metadata) on the driver side to reduce driver memory.
  3. @cloud-fan suggests constructing FileStatus from the executor side directly

so, I'm going to split this PR into two parts

  1. I will experiment (3), but I can only do it on HDFS cases (w/ and w/o RBF, w/ and w/o EC)
  2. span the rest of the executor-side changes into a dedicated PR.

@pan3793 pan3793 marked this pull request as draft September 23, 2025 05:35
dongjoon-hyun pushed a commit that referenced this pull request Sep 23, 2025
### What changes were proposed in this pull request?

Reuse InputStream in vectorized Parquet reader between reading the footer and row groups, on the executor side.

This PR is part of SPARK-52011, you can check more details at #50765

### Why are the changes needed?

Reduce unnecessary RPCs of NameNode to improve performance and stability for large Hadoop clusters.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

See #50765

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52384 from pan3793/SPARK-53633.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants