Skip to content

Conversation

@wForget
Copy link
Member

@wForget wForget commented Jul 30, 2022

What changes were proposed in this pull request?

Add forceUseStagingDir config to force use of staging dir when writing.

When setting forceUseStagingDir to true, I set committerOutputPath to staging dir in InsertIntoHadoopFsRelationCommand and for HadoopMapReduceCommitProtocol.newTaskTempFile method I calculate absolute dir and call newTaskTempFileAbsPath.

Why are the changes needed?

As discussed in SPARK-37210, errors or data loss may occur under some concurrent write scenarios.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added test case in InsertSuite.

@wForget
Copy link
Member Author

wForget commented Jul 30, 2022

Hi @dongjoon-hyun , could you please help me review it?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-37210] Allow forced use of staging directory [SPARK-37210][CORE][SQL] Allow forced use of staging directory Aug 1, 2022
@dongjoon-hyun
Copy link
Member

Thank you for making a PR, @wForget .

To @viirya and @sunchao . This issue has a reproducible example in the JIRA.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it is an issue particular for InsertIntoHadoopFsRelationCommand?

@wForget
Copy link
Member Author

wForget commented Aug 2, 2022

Why it is an issue particular for InsertIntoHadoopFsRelationCommand?

InsertIntoHiveTable always uses hive staging dir

val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)

InsertIntoHadoopFsRelationCommand only uses spark staging dir in dynamic overwrite mode, otherwise it uses table_location/_temporary which leads to concurrency conflicts.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usecase looks suspicious to me. Is it a valid one? I'm not sure that InsertIntoHadoopFsRelationCommand guarantees concurrent writing to same table.

@wForget
Copy link
Member Author

wForget commented Aug 3, 2022

The usecase looks suspicious to me. Is it a valid one? I'm not sure that InsertIntoHadoopFsRelationCommand guarantees concurrent writing to same table.

It seems a reasonable requirement to concurrently write to different partitions of the same table. Is there some blocking issues?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Nov 12, 2022
@github-actions github-actions bot closed this Nov 13, 2022
@zhengchenyu
Copy link
Contributor

@viirya @wForget @dongjoon-hyun
I found the same phenomenon. I have this audit log.

# first rename op
cmd=rename	src=/user/testuser/testdb.db/test_table/_temporary/0/task_xxx/pt=20250908000000
dst=/user/testuser/testdb.db/test_table/pt=20250908000000	
# second delete op
cmd=delete	src=/user/testuser/testdb.db/test_table/_temporary

For any partition application, will delete /user/testuser/testdb.db/test_table/_temporary. When multiple application for different partitions are running at the same time, data loss may occur. we can just solve this problem to replace /user/testuser/testdb.db/test_table/_temporary with a unique directory.

How about reopen this PR? And I think the use case is not suspicious. For example, if I want to recalculate the partition data for the last month, I will run multiple application in parallel.

@zhengchenyu
Copy link
Contributor

@viirya @dongjoon-hyun @wForget

After some research, I discovered that the .spark_staging_xxx directory is only used for custom partition paths (introduced in #15814) and dynamic partitions overwrite (introduced in #18714, with appropriate modifications in #29000). I suspect the purpose of introducing .spark_staging_xxx is to avoid conflicts, for example, in scenarios where dynamic partitions overwrite to prevent data contamination.

I believe the issue of running multiple partitions application in parallel is similar to the two above. Could we make writing to .spark_staging_xxx as the default behavior? This would not only solve this problem but also make the code structure more clean?

new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}
if (forceUseStagingDir && !dynamicPartitionOverwrite) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when spark.sql.hive.convertMetastoreParquet or spark.sql.hive.convertMetastoreOrc is false, mean use hive serde. We also call newTaskTempFileAbsPath. Here will trigger rename. I suspect this is a conflict with hive serde logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants