-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27042][SS] Invalidate cached Kafka producer in case of task retry #26470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { | ||
| val cacheKey = new CacheKey(topicPartition, kafkaParams) | ||
|
|
||
| logDebug(s"Task re-attempt detected, invalidating consumers with key $cacheKey") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related but thought it would be good to have symmetric logs.
|
Test build #113593 has finished for PR 26470 at commit
|
|
cc @vanzin @zsxwing @HeartSaVioR since you have context. |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except very minor.
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
Show resolved
Hide resolved
|
Test build #113704 has finished for PR 26470 at commit
|
| .setAuthenticationConfigIfNeeded() | ||
| .build() | ||
| val key = toCacheKey(updatedKafkaParams) | ||
| if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is just matching what the consumer code does, but I'm a little unsure that it's correct (in both cases).
Can't you re-use the consumer in different jobs? If so, you can have:
- job 1 task 1 attempt 0 runs with producer 1 and fails
- job 1 task 1 attempt 1 runs with producer 2 and succeeds
- job 2 task 1 attempt 0 runs with producer 1 and re-uses the same instance that should have been closed?
So I'm wondering if instead the task itself shouldn't be invalidating the producer when it detects an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly saying, we should discard the instance (not instances) when the instance throws some error, but then we may need to wrap the code to catch exception and discard the instance wherever we use the instance. A bit verbose, but we still have to return the instance to the pool even it succeeds, so maybe not impossible.
I'm not aware whether Kafka consumer or Kafka producer is self-healing - if they provide such functionality, we wouldn't need to even discard them at any case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To answer the question1: Yes, job 2 can use job 1 created producers in the same JVM. Such case another task retry is needed to recover in the following way:
- job 2 task 1 attempt 0 runs with producer 1 and fails
- job 2 task 1 attempt 1 runs with producer 3 (this would invalidate producer 1 and 2 if not used)
If I consider the use-case this would mean 2 jobs must write the same TopicPartition at the same time. Until now I haven't seen use-cases where the exact same topic would be used by 2 different job (of course this doesn't mean it's not done).
To answer the question2: Kafka consumers and producers doesn't provide any kind of self healing.
I think I have to take some time to check the other way because I've similar fear what @HeartSaVioR mentioned (horror complicated code with catch blocks all the places where producer touched). The general fear what I have with the invalidation from exception side is the following: if the code is leaking and doesn't catch one single exception then the task may not heal itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 jobs must write the same TopicPartition at the same time
Why at the same time? All you need to hit my case is two tasks on the first job using different producers, and can't that happen e.g. if you have executors with more than one core?
It seems like Spark would eventually self-heal at the cost of some failed tasks, but can't we do better? As Jungtaek said we already have to return the producer to the pool. It's just a matter of, instead of returning it, invalidating it if an error is detected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this is trying to close a shared producer because of a random failure (including user code errors)? IMO, we should not do this for producer. It can be shared with multiple Spark jobs as long as they are using the same Kafka parameters. Kafka actually suggests to share the producer in their doc:
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
Hence I would assume it can self-heal. Kafka consumer is a different story. It's not thread-safe and cannot be shared with multiple tasks at the same time. That's why we can close the old one since we are pretty sure it's not used by another task in the same JVM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this is trying to close a shared producer because of a random failure (including user code errors)? IMO, we should not do this for producer. It can be shared with multiple Spark jobs as long as they are using the same Kafka parameters.
We changed the producer pool to follow the same approach as Kafka consumer - #25853 to resolve the long-standing "producer close on idle" issue. #19096 proves closing idle producer while producer can be shard with multiple tasks is very complicated - we should consider adding "in-use" and then should deal with thread-safety. One task one producer is pretty much simpler, and we didn't observe performance issue there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why at the same time?
I thought job 1 and job 2 are different queries. Single query + multiple cores could end-up in the same situation.
Hence I would assume it can self-heal.
I would assume it can't heal if programming failure would be in the producer code itself. On the other had I see your general concern related this. What is your point considering #25853?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would assume it can't heal if programming failure would be in the producer code itself.
That's true. Softwares have bugs. But I have never heard any Kafka producer issues reported when using Spark. The task failure most likely is a user error, transient network error, or something unrelated. And a task failure like this should be isolated and should not impact other tasks in the same executor. This seems overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is your point considering #25853?
Thanks for pointing me to this PR. I posted my concerns there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since #25853 rolled back this PR just doesn't make sense, closing...
|
#25853 rolled back so doesn't make sense. |
What changes were proposed in this pull request?
If a task is failing due to a corrupt cached Kafka producer and the task is retried in the same executor, then the task may get the same producer over and over again. After several retries the query may stop.
In this PR I'm invalidating the old cached producers for a specific key and re-opening a new one. This will reduce the possibility of a faulty producer re-use. It must be mentioned if a producer under the key is used by another task, then in won't be closed. The functionality is similar to the consumer side here.
Why are the changes needed?
Increase producer side availability.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing + additional unit tests.