-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-21869][SS] A cached Kafka producer should not be closed if any task is using it - adds inuse tracking. #19096
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21869][SS] A cached Kafka producer should not be closed if any task is using it - adds inuse tracking. #19096
Conversation
|
Test build #81281 has finished for PR 19096 at commit
|
|
Test build #81282 has finished for PR 19096 at commit
|
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
Outdated
Show resolved
Hide resolved
c7705a2 to
7013e8a
Compare
|
Test build #81558 has finished for PR 19096 at commit
|
|
Hi @zsxwing, are you okay with the changes? |
7013e8a to
ab4c1dd
Compare
|
Test build #85022 has finished for PR 19096 at commit
|
f609d7e to
488c70f
Compare
|
Test build #85169 has finished for PR 19096 at commit
|
|
Test build #85170 has finished for PR 19096 at commit
|
488c70f to
dbabbf9
Compare
|
Test build #85174 has finished for PR 19096 at commit
|
dbabbf9 to
edd5bc3
Compare
|
Test build #85176 has finished for PR 19096 at commit
|
|
Test build #85180 has finished for PR 19096 at commit
|
edd5bc3 to
024a407
Compare
|
Test build #86262 has finished for PR 19096 at commit
|
|
@zsxwing, please take another look. |
...rnal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
Outdated
Show resolved
Hide resolved
|
This looks like abandoned due to just not reviewed in time, but it seems this patch is still needed so I'd like to bump. |
|
@HeartSaVioR Thanks for your interest, let me reopen and take some time to update it. |
|
Test build #102535 has finished for PR 19096 at commit
|
|
@ScrapCodes just wondering which scenario ends-up in multi-threaded access to the same |
|
As a general comment +1 on solving this issue. If a task uses a producer more than the configured eviction time it will be closed. |
e3890a5 to
a3443ea
Compare
|
Test build #104539 has finished for PR 19096 at commit
|
|
Test build #104540 has finished for PR 19096 at commit
|
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @ScrapCodes thanks for your hard work and @HeartSaVioR for your review effort!
|
Hi @zsxwing, can you please take a look. |
|
I am not sure why "Changes requested" status did not clear up, inspite of making changes on the PR. Do I need to force push? |
|
@zsxwing requested changes, I think he has to change this. |
|
@zsxwing Ping ! |
|
@ScrapCodes could you resolve the conflict? |
|
Test build #105355 has finished for PR 19096 at commit
|
|
@ScrapCodes may I ask to resolve conflicts? @vanzin not much movement here but this blocks to finish delegation token (additionally several users suffering with long running batch queries). #23956 is required to pick up the latest token but this PR is needed to finish it. I would like to ask you to have a look please. |
|
@gaborgsomogyi Yes will do it soon. |
|
Test build #107440 has finished for PR 19096 at commit
|
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a look at the test results and seems like the failure is relevant.
...l/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #107531 has finished for PR 19096 at commit
|
jose-torres
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The JIRA says "Right now a cached Kafka producer may be closed if a large task uses it for more than 10 minutes.", and I'm not sure this change is the right way to handle that. If we think it should be longer than 10 minutes, let's increase it (and maybe we'll have to make some code changes to do it, I don't know where the 10 minutes comes from). To say that we will never release a resource until a task says it's okay is inherently dangerous in a distributed system, and it looks like we'd have to do some risky changes to make it happen too.
| topic: Option[String]) extends KafkaRowWriter(inputSchema, topic) { | ||
| // used to synchronize with Kafka callbacks | ||
| private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _ | ||
| protected val producer: CachedKafkaProducer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a change in lifecycle for the producer. Are we sure that's safe?
| val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedAuthConfigIfNeeded(kafkaParamsMap)) | ||
| try { | ||
| guavaCache.get(paramsSeq) | ||
| val producer = this.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this required? It's risky to add new global locks to things.
| /* Release a kafka producer back to the kafka cache. We simply decrement it's inuse count. */ | ||
| private[kafka010] def release(producer: CachedKafkaProducer, failing: Boolean): Unit = { | ||
| this.synchronized { | ||
| // It should be ok to call release multiple times on the same producer object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's not really okay, right? If task A calls release multiple times, the producer might have its inUseCount decremented to 0 even though task B is using it.
That's also the way how consumer instances are cached (that's why missing |
I don't think it's realistic to expect the user to measure the average time of a batch query and set the timeout accordingly.
There is a config named
True! I'm not telling it's the most safe solution but on the consumer side we've similar approach already. Catching committer attention is super hard and thought it would be less risky from committer perspective to add ref counting than introducing a complete new lib (like Commons Pool). Thinking about alternatives I see mainly 2 (pretty sure there are others):
|
|
#22138 has been merged which changed my view on how to solve this issue here (until know I was not sure committers have enough confidence to merge that new technology). Proposal: use Apache Commons Pool
If you agree happy to give helping hand during review. @ScrapCodes if you don't have time to invest then I'm happy to do the coding part. My PR #23956 is depending on this for long time and would like to push this forward (not all the cases will delegation token work). Guys, please share your thoughts. |
|
I second on this, as that's in line what I proposed before. Either I can help reviewing or even work on this. We still have some consideration regarding performance and number of connections (as we allow multiple-threads access whereas Apache Commons Pool will prevent this), but even in javadoc of Kafka producer it just says |
|
@gaborgsomogyi Interestingly, I also wanted to make a fresh start on this. This is important for me as well, and thanks for pointing in new direction, I will start working on it. @HeartSaVioR Thanks for your continued interest and encouragement. |
What changes were proposed in this pull request?
We track the producer, by maintaining an
inusethread count of the producer. If the producer isinuseand we get eviction orders from guava, we move such producers to a queue(closeQueue) and periodically (in a non-thread way) check for itsinusestatus and eventually close it. This way a producer will not be closed while being used and also not get assigned to a new task when evicted.We had to do this because, guava has a limitation that it does not allow for custom eviction strategy. google/guava#3013
How was this patch tested?
Updated existing and added appropriate test case.