Skip to content

Conversation

@steveloughran
Copy link
Contributor

What changes were proposed in this pull request?

Follow on to SPARK-40034.

Dynamic partitioning though the PathOutputCommitProtocol needs to add the dirs to the superclass's partition list else the partition delete doesn't
take place.

Fix:

  • add an addPartition() method subclasses can use
  • add a getPartitions method to return an immutable
    copy of the list for testing.
  • add tests to verify all of this.

Also fix newTaskTempFileAbsPath to return a path, irrespective of committer type.

In dynamic mode, because the parent dir of an absolute path is deleted, there's a safety check to reject any requests for a file in a parent dir. This
is something which could be pulled up to HadoopMapReduceCommitProtocol because it needs the same check, if the risk is considered realistic.

The patch now downgrades from failing on dynamic partitioning if the committer doesn't declare it supports it to printing a warning. Why this? well, it
does work through the s3a committers, it's just O(data). If someone does want to do INSERT OVERWRITE then they can be allowed to, just warned about
it. The outcome will be correct except in the case of: "if the driver fails partway through dir rename, only some of the files will be there"

Google GCS has that same non-atomic rename issue. But: even on an FS with atomic dir rename, any job which fails partway through the overwrite process
is going to leave the fs in an inconsistent state, such as

  • some parts with new data, some parts not yet overwritten
  • a directory deleted and the new data not instantiated

So it's not that much worse.

The patch tries to update the protocol spec in HadoopMapReduceCommitProtocol to cover both newFileAbsPath() semantics/commit and failure modes of
dynamic partition commit.

Why are the changes needed?

  • dynamic partition jobs aren't actually incorrect through the s3 committers, just slow, so if someone really wants it they should be free to.
  • the code in SPARK-40034 was incomplete. The new tests show that it works now.
  • the newFileAbsPath() code is required of all committers, even though (thankfully) it is not used much.

This is an attempt to do a minimal commit for complete functionality, with no attempt to improve performance with parallel dir setup, rename, etc.
That'd really be damage limitation: if you want performance in cloud, use a manifest format.

Does this PR introduce any user-facing change?

Updates the cloud docs to say that dynamic partition overwrite does work
everywhere, just may be really slow.

How was this patch tested?

New unit tests; run through spark and hadoop 3.3.5 RC0

One test area which is not covered is "does the absolute path mapping work store the correct paths and marshal them correctly?"

I've just realised that I can test this without adding an accessor for the map just by calling commitTask() and parsing the TaskCommitMessage which comes back.
Shall I add that?

@github-actions github-actions bot added the CORE label Dec 22, 2022
@steveloughran steveloughran changed the title SPARK-41551. Dynamic/absolute path support in PathOutputCommitters [SPARK-41551][SQL]. Dynamic/absolute path support in PathOutputCommitters Dec 23, 2022
@steveloughran steveloughran changed the title [SPARK-41551][SQL]. Dynamic/absolute path support in PathOutputCommitters [SPARK-41551][SQL] Dynamic/absolute path support in PathOutputCommitters Dec 23, 2022
@steveloughran steveloughran force-pushed the SPARK-41551-path-output-dynamic branch from faa30f3 to d19510a Compare December 23, 2022 14:30
@github-actions github-actions bot added the DOCS label Dec 23, 2022
@steveloughran
Copy link
Contributor Author

  1. I think i should add a test which verifies the abs path temp file is always under .spark-staging and includes the task attempt id in it.
  2. jobs created through RDDs don't get unique IDs across processes, they just get the RDD counter. so > 1 job writing to same dest may really ruin the lives of others even when updating different partitions. I can fix that here or file a separate JIRA

@steveloughran steveloughran marked this pull request as draft January 1, 2023 20:56
@steveloughran steveloughran force-pushed the SPARK-41551-path-output-dynamic branch from d19510a to 8ac2521 Compare February 3, 2023 16:59
@steveloughran
Copy link
Contributor Author

latest commit adds parallel file/dir preparation and rename, for performance on slow/very slow cloud stores.

As a consequence, it now needs better tests, FileOutputCommitter can be used for the work, though it doesn't do the other feature: IOStatistics collection

@github-actions github-actions bot added the BUILD label Feb 6, 2023
@steveloughran
Copy link
Contributor Author

steveloughran commented Feb 6, 2023

latest pr will save to a report dir aggregate IOStats collected from the task attempts, in addition any _SUCCESS reports created by the inner committer. This is to align later with collection of context IOStats, where all stream read/write stats are collected.

For now, a json summary of the stats collected/reported by the commmitters are collected, for example (omitting all min/max/mean values where no results were collected)

{
  "counters" : {
    "committer_commit_job" : 1,
    "op_msync" : 1,
    "op_msync.failures" : 0,
    "job_stage_optional_validate_output" : 0,
    "job_stage_create_target_dirs" : 1,
    "op_load_manifest.failures" : 0,
    "op_rename.failures" : 0,
    "op_load_all_manifests" : 1,
    "job_stage_load_manifests" : 1,
    "committer_commit_job.failures" : 0,
    "task_stage_save_task_manifest" : 3,
    "job_stage_rename_files" : 1,
    "committer_bytes_committed" : 9,
    "op_create_one_directory.failures" : 0,
    "op_mkdirs.failures" : 0,
    "committer_files_committed" : 1,
    "store_io_rate_limited.failures" : 0,
    "op_load_all_manifests.failures" : 0,
    "committer_task_manifest_file_size" : 0,
    "task_stage_save_manifest" : 2,
    "task_stage_scan_directory" : 3,
    "task_stage_setup.failures" : 0,
    "op_is_file.failures" : 0,
    "op_create_directories.failures" : 0,
    "op_create_one_directory" : 0,
    "committer_task_file_count" : 3,
    "task_stage_save_manifest.failures" : 0,
    "committer_task_directory_depth" : 6,
    "job_stage_setup.failures" : 0,
    "job_stage_load_manifests.failures" : 0,
    "op_directory_scan" : 0,
    "op_rename" : 3,
    "job_stage_abort" : 0,
    "op_get_file_status" : 9,
    "commit_file_rename_recovered" : 0,
    "store_io_rate_limited" : 0,
    "op_delete_file_under_destination.failures" : 0,
    "op_prepare_dir_ancestors" : 0,
    "job_stage_abort.failures" : 0,
    "op_is_directory.failures" : 0,
    "job_stage_optional_validate_output.failures" : 0,
    "task_stage_setup" : 3,
    "commit_file_rename.failures" : 0,
    "op_delete" : 6,
    "op_delete_file_under_destination" : 0,
    "job_stage_save_success_marker" : 1,
    "commit_file_rename" : 1,
    "op_mkdir_returned_false" : 0,
    "op_directory_scan.failures" : 0,
    "object_list_request.failures" : 0,
    "object_continue_list_request.failures" : 0,
    "op_load_manifest" : 1,
    "committer_tasks_completed" : 2,
    "job_stage_rename_files.failures" : 0,
    "op_get_file_status.failures" : 8,
    "task_stage_scan_directory.failures" : 0,
    "op_delete.failures" : 0,
    "object_list_request" : 0,
    "op_list_status" : 8,
    "task_stage_abort_task.failures" : 0,
    "job_stage_setup" : 1,
    "task_stage_commit" : 3,
    "committer_tasks_failed" : 0,
    "task_stage_commit.failures" : 0,
    "task_stage_abort_task" : 0,
    "committer_task_file_size" : 27,
    "job_stage_create_target_dirs.failures" : 0,
    "committer_task_directory_count" : 3,
    "object_continue_list_request" : 0,
    "job_stage_save_success_marker.failures" : 0,
    "op_create_directories" : 1,
    "op_mkdir_returned_false.failures" : 0,
    "task_stage_save_task_manifest.failures" : 0,
    "job_stage_cleanup.failures" : 0,
    "job_stage_cleanup" : 1,
    "op_mkdirs" : 6,
    "op_is_directory" : 0,
    "op_prepare_dir_ancestors.failures" : 0,
    "op_is_file" : 0,
    "op_list_status.failures" : 0
  },
  "gauges" : { },
  "minimums" : {
    "task_stage_commit.min" : 1,
    "task_stage_save_task_manifest.min" : 21,
    "job_stage_cleanup.min" : 14,
    "op_list_status.min" : 0,
    "job_stage_rename_files.min" : 13,
    "op_load_all_manifests.min" : 12,
    "task_stage_setup.min" : 29,
    "job_stage_create_target_dirs.min" : 14,
    "job_stage_save_success_marker.min" : 28,
    "committer_commit_job.min" : 83,
    "op_delete.min" : 0,
    "job_stage_setup.min" : 74,
    "committer_task_directory_depth" : 2,
    "op_get_file_status.min" : 0,
    "job_stage_load_manifests.min" : 13,
    "op_get_file_status.failures.min" : 0,
    "op_load_manifest.min" : 2,
    "op_mkdirs.min" : 13,
    "op_msync.min" : 0,
    "commit_file_rename.min" : 0,
    "committer_task_file_size" : 9,
    "committer_task_directory_count" : 1,
    "op_create_directories.min" : 13,
    "op_is_directory.min" : -1,
    "task_stage_scan_directory.min" : 1,
    "task_stage_save_manifest.min" : 21,
    "op_rename.min" : 0
  },
  "maximums" : {
    "job_stage_cleanup.max" : 14,
    "job_stage_load_manifests.max" : 13,
    "op_msync.max" : 0,
    "job_stage_save_success_marker.max" : 28,
    "task_stage_save_task_manifest.max" : 28,
    "job_stage_setup.max" : 74,
    "op_mkdirs.max" : 51,
    "op_get_file_status.failures.max" : 0,
    "commit_file_rename.max" : 0,
    "committer_task_file_count" : 1,
    "committer_task_directory_depth" : 2,
    "op_delete.max" : 2,
    "task_stage_setup.max" : 29,
    "op_get_file_status.max" : 0,
    "committer_commit_job.max" : 83,
    "task_stage_save_manifest.max" : 21,
    "job_stage_create_target_dirs.max" : 14,
    "op_load_manifest.max" : 2,
    "op_create_directories.max" : 13,
    "op_list_status.max" : 1,
    "committer_task_file_size" : 9,
    "task_stage_scan_directory.max" : 1,
    "job_stage_rename_files.max" : 13,
    "op_rename.max" : 0,
    "committer_task_directory_count" : 1,
    "op_load_all_manifests.max" : 12,
    "task_stage_commit.max" : 22
  },
  "meanstatistics" : {
    "task_stage_save_manifest.mean" : {
      "samples" : 2,
      "sum" : 42
    },
    "op_rename.mean" : {
      "samples" : 3,
      "sum" : 0
    },
    "commit_file_rename.mean" : {
      "samples" : 1,
      "sum" : 0
    },
    "job_stage_load_manifests.mean" : {
      "samples" : 1,
      "sum" : 13
    },
    "task_stage_save_task_manifest.mean" : {
      "samples" : 3,
      "sum" : 70
    },
    "op_load_manifest.mean" : {
      "samples" : 1,
      "sum" : 2
    },
    "committer_task_manifest_file_size" : {
      "samples" : 1,
      "sum" : 18835
    },
    "committer_task_file_count" : {
      "samples" : 3,
      "sum" : 3
    },
    "committer_task_directory_depth" : {
      "samples" : 3,
      "sum" : 6
    },
    "job_stage_setup.mean" : {
      "samples" : 1,
      "sum" : 74
    },
    "task_stage_commit.mean" : {
      "samples" : 5,
      "sum" : 47
    },
    "job_stage_cleanup.mean" : {
      "samples" : 1,
      "sum" : 14
    },
    "op_get_file_status.mean" : {
      "samples" : 1,
      "sum" : 0
    },
    "job_stage_create_target_dirs.mean" : {
      "samples" : 1,
      "sum" : 14
    },
    "op_load_all_manifests.mean" : {
      "samples" : 1,
      "sum" : 12
    },
    "job_stage_save_success_marker.mean" : {
      "samples" : 1,
      "sum" : 28
    },
    "committer_commit_job.mean" : {
      "samples" : 1,
      "sum" : 83
    },
    "op_mkdirs.mean" : {
      "samples" : 6,
      "sum" : 174
    },
    "op_list_status.mean" : {
      "samples" : 8,
      "sum" : 4
    },
    "committer_task_file_size" : {
      "samples" : 3,
      "sum" : 27
    },
    "committer_task_directory_count" : {
      "samples" : 3,
      "sum" : 3
    },
    "op_create_directories.mean" : {
      "samples" : 1,
      "sum" : 13
    },
    "task_stage_scan_directory.mean" : {
      "samples" : 3,
      "sum" : 3
    },
    "job_stage_rename_files.mean" : {
      "samples" : 1,
      "sum" : 13
    },
    "op_msync.mean" : {
      "samples" : 1,
      "sum" : 0
    },
  }
}

Follow on to SPARK-40034.

Dynamic partitioning though the PathOutputCommitProtocol needs
to add the dirs to the superclass's partition list else the
partition delete doesn't take place.

Fix:
* add an addPartition() method subclasses can use
* add a getPartitions method to return an immutable
  copy of the list for testing.
* add tests to verify all of this.

Also fix newTaskTempFileAbsPath to return
a path, irrespective of committer type.

In dynamic mode, because the parent dir of
an absolute path is deleted, there's a
safety check to reject any requests
for a file in a parent dir.
This is something which could be pulled up to
HadoopMapReduceCommitProtocol because it needs
the same check, if the risk is considered realistic.

The patch now downgrades from failing on dynamic
partitioning
if the committer doesn't declare it supports it to printing
a warning.
Why this? well, it *does* work through the s3a committers,
it's just O(data). If someone does want to do INSERT OVERWRITE
then they can be allowed to, just warned about it.
The outcome will be correct except in the case
of: "if the driver fails partway through dir rename,
only some of the files will be there"

Google GCS has that same non-atomic rename issue.
But: even on an FS with atomic dir rename, any job
which fails partway through the overwrite process
is going to leave the fs in an inconsistent state,
such as
* some parts with new data, some parts not yet overwritten
* a directory deleted and the new data not instantiated

So it's not that much worse.

The patch tries to update the protocol spec in
HadoopMapReduceCommitProtocol to cover both
newFileAbsPath() semantics/commit and failure
modes of dynamic partition commit.

Change-Id: Ibdf1bd43c82d792d8fcf2cace417830663dcc541
Clarify that abs paths files aren't saved in the TA dir,
but in the final the staging dir, which will be
created even in a classic non-dynamic job just
for these files.

Change-Id: I86de8fb190a44bfc8c6e33ede163eebc1939e332
Moving PathOutputCommitProtocol off being a subclass of
HadoopMapReduceCommitProtocol makes it possible to
support dynamic overwrite without making changes to
HadoopMapReduceCommitProtocol, and to add parallel
file/dir commit to compensate for file/dir rename performance.

* Copy HadoopMapReduceCommitProtocol methods into PathOutputCommitProtocol
* Make PathOutputCommitProtocol extend FileCommitProtocol directly

Change-Id: Ic8ee1b7917538da0f99434768df0aae8bdc12f01
Parallelize abs and dynamic dir delete/create and rename

This addresses scale issues with google GCS.
For S3 it marginally improves performance as directories
will be copied in parallel -but time to copy objects
in there is O(data). There's parallel copy in the S3A
code too, but it is limited and not configurable.

That is: this change does not speed up INSERT OVERWRITE on
S3 to the point where it is fast, instead simply "less slow".

Change-Id: I4396d3fe2562c753d5a54a9ecdb9be2877bd81b0
Tests to verify the dynamic overwrite are wired up.
Done by subclassing PartitionedWriteSuite and reconfiguring
it.
It's hard to verify the committers are being picked up, which
is addressed by
* during dev: triggering errors
* soon: looking for json iostats reports.

Note we could be *very* clever in this test: if the ManifestCommitterFactory
is on the classpath, we could use it as the committer.

Change-Id: I47716c43f1d34f226f34bfbe330e862f101b73d2
If the hadoop version used in the tests contains the
manifest committer, that is switched to.
This will collect and report IO statistics and other
summary information into the directory target/reports/summaries.

The same information is also collected and aggregated through
the task commit messages.
As/when thread context IOStats are collected (especially read
IO information), that will only be reported via the spark
protocol.

Change-Id: I60c0fcfe0a538c147207349fd15aa991b2f2a0f0
All stat reports use spark.sql.sources.writeJobUUID in
filename if set.

Change-Id: Ie32033dae6da9c6bd24c1927b15bab8d3b49458b
Some extra diligence: test ParquetV2QuerySuite through
the protocol and on hadoop 3.3.5+, the manifest committer.

Change-Id: Ia9b869c65dd97463b7af02990cef8582e7680046
@steveloughran steveloughran force-pushed the SPARK-41551-path-output-dynamic branch from 56c77ef to f4a3352 Compare February 20, 2023 11:09
@steveloughran
Copy link
Contributor Author

I've created a more minimal PR with the earlier production code changes and the newer tests: #40221 ; if all is happy there it can go in while this new protocol variant can be considered/stabilised.

Once spark switches to 3.3.5 as the hadoop 3 version, testing all this stuff becomes easier, as the ManifestCommitter will always be there, which tests can then code for.

@github-actions
Copy link

github-actions bot commented Jun 9, 2023

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jun 9, 2023
@github-actions github-actions bot closed this Jun 10, 2023
@wgqcd88
Copy link

wgqcd88 commented Jul 3, 2025

Hi, after applying the patch with Spark 3.5.5 and Hadoop 3.3.6, I encountered an issue where using the Manifest committer for INSERT OVERWRITE with dynamic partitions leads to data being deleted during the cleanup phase, right after the rename step. Could this be a misuse on my part, or is Manifest committer not supported by this patch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants