-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27042][SS] Invalidate cached Kafka producer in case of task retry #26470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is just matching what the consumer code does, but I'm a little unsure that it's correct (in both cases).
Can't you re-use the consumer in different jobs? If so, you can have:
So I'm wondering if instead the task itself shouldn't be invalidating the producer when it detects an error.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly saying, we should discard the instance (not instances) when the instance throws some error, but then we may need to wrap the code to catch exception and discard the instance wherever we use the instance. A bit verbose, but we still have to return the instance to the pool even it succeeds, so maybe not impossible.
I'm not aware whether Kafka consumer or Kafka producer is self-healing - if they provide such functionality, we wouldn't need to even discard them at any case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To answer the question1: Yes, job 2 can use job 1 created producers in the same JVM. Such case another task retry is needed to recover in the following way:
If I consider the use-case this would mean 2 jobs must write the same
TopicPartitionat the same time. Until now I haven't seen use-cases where the exact same topic would be used by 2 different job (of course this doesn't mean it's not done).To answer the question2: Kafka consumers and producers doesn't provide any kind of self healing.
I think I have to take some time to check the other way because I've similar fear what @HeartSaVioR mentioned (horror complicated code with catch blocks all the places where producer touched). The general fear what I have with the invalidation from exception side is the following: if the code is leaking and doesn't catch one single exception then the task may not heal itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why at the same time? All you need to hit my case is two tasks on the first job using different producers, and can't that happen e.g. if you have executors with more than one core?
It seems like Spark would eventually self-heal at the cost of some failed tasks, but can't we do better? As Jungtaek said we already have to return the producer to the pool. It's just a matter of, instead of returning it, invalidating it if an error is detected.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this is trying to close a shared producer because of a random failure (including user code errors)? IMO, we should not do this for producer. It can be shared with multiple Spark jobs as long as they are using the same Kafka parameters. Kafka actually suggests to share the producer in their doc:
Hence I would assume it can self-heal. Kafka consumer is a different story. It's not thread-safe and cannot be shared with multiple tasks at the same time. That's why we can close the old one since we are pretty sure it's not used by another task in the same JVM.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We changed the producer pool to follow the same approach as Kafka consumer - #25853 to resolve the long-standing "producer close on idle" issue. #19096 proves closing idle producer while producer can be shard with multiple tasks is very complicated - we should consider adding "in-use" and then should deal with thread-safety. One task one producer is pretty much simpler, and we didn't observe performance issue there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought job 1 and job 2 are different queries. Single query + multiple cores could end-up in the same situation.
I would assume it can't heal if programming failure would be in the producer code itself. On the other had I see your general concern related this. What is your point considering #25853?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true. Softwares have bugs. But I have never heard any Kafka producer issues reported when using Spark. The task failure most likely is a user error, transient network error, or something unrelated. And a task failure like this should be isolated and should not impact other tasks in the same executor. This seems overkill.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing me to this PR. I posted my concerns there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since #25853 rolled back this PR just doesn't make sense, closing...