-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21869][SS] Revise Kafka producer pool to implement 'expire' correctly #26845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc. @zsxwing @vanzin @gaborgsomogyi (Gabor may not be available in this month but cc. to let him catch up sometimes next month) |
|
Similarly I feel it would be ideal to move the files regarding consumer pool to What do you think? If it makes sense I'll craft another PR for this. Thanks! |
|
Test build #115144 has finished for PR 26845 at commit
|
|
Could you fix the failure? |
|
Ah yes I had been investigating the issue. Thanks for reminding! Will update soon. |
|
Test build #115153 has finished for PR 26845 at commit
|
|
retest this, please |
|
Test build #115161 has finished for PR 26845 at commit
|
| producer.foreach(_.producer.flush()) | ||
| checkForErrors() | ||
| KafkaDataWriterCommitMessage | ||
| } finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource cleanup can be simplified once SPARK-30227 (#26855) is introduced.
13fd4f0 to
a7aac78
Compare
|
Test build #115297 has finished for PR 26845 at commit
|
|
Test build #115301 has finished for PR 26845 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few mostly minor things.
| import org.apache.spark.internal.Logging | ||
|
|
||
| private[kafka010] class CachedKafkaProducer( | ||
| val cacheKey: Seq[(String, Object)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent more
| private def evictExpired(): Unit = { | ||
| val producers = new mutable.ArrayBuffer[CachedProducerEntry]() | ||
| synchronized { | ||
| cache.filter { case (_, v) => v.expired }.foreach { case (k, v) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use cache.retain instead of filter + foreach.
| } | ||
| } | ||
|
|
||
| def expired: Boolean = _refCount <= 0 && _expireAt < clock.getTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really should use nanoTime() when checking for expiration. Also, if you pass the current time to handleReturned() and expired(), you don't even need this class to know about clocks at all. (That also avoids calling clock.blah() for each entry in the cache when checking for expiration, which is a pretty small but cheap optimization.)
(The ref count check is also redundant, but that's ok for clarity.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel a bit odd to pass current time to handleReturned() and expired(), but actually I also felt a bit odd to pass Clock as well, so good to change. And yes, the ref count check is intended to be placed for clarity.
|
|
||
| def handleReturned(): Unit = { | ||
| _refCount -= 1 | ||
| if (_refCount <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, shouldn't you assert that ref counts never go below 0? Or at least warn loudly with a stack trace so that if it happens, the stack trace helps with debugging.
| val kafkaParams = getTestKafkaParams() | ||
| val producer = pool.acquire(kafkaParams) | ||
| val producer2 = pool.acquire(kafkaParams) | ||
| assert(producer === producer2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You want them to be the same instance, right? So probably eq is more correct than ===.
| val paramsSeq: Seq[(String, Object)] = kafkaParams.asScala.toSeq.sortBy(x => x._1) | ||
| paramsSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
single statement?
| cache.foreach { case (k, v) => | ||
| cache.remove(k) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is foreach + remove safe in Scala? (In Java maps you'd get a "concurrent modification exception" at some point.)
I'd just use foreach + a separate call to clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to work for Scala mutable HashMap, but yes that's clearer and avoid the wondering.
| cache.remove(k) | ||
| v.producer.close() | ||
| } | ||
| scheduled = startEvictorThread() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure why you need to restart this task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because reset is to ensure the state of producer pool is back to first initialization; so canceling the evict task to make sure evict task is not running, closing and clearing all cached producers, restarting the evict task.
It's ideal to simply recreate pool, but in tests we would like to reset pool in object instead of class instance. It's only used from testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if you want to be absolutely sure that everything is being reset then, you need to use cancel(false), and wait for any pending run of the task to finish. Otherwise you may interrupt the task in the middle of closing an expired producer. Or maybe tests don't run into that because [insert reason here].
It just seems safer to me to not bother messing with the task. Leaving it alone will not break anything, and will reset the pool just as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just seems safer to me to not bother messing with the task. Leaving it alone will not break anything, and will reset the pool just as well.
Agreed. Not touching the evict task while resetting seems to also work. Will change.
| } | ||
|
|
||
| private[producer] def shutdown(): Unit = { | ||
| ThreadUtils.shutdown(executorService) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to cancel the scheduler task before calling shutdown.
|
Test build #115408 has finished for PR 26845 at commit
|
|
Test build #115410 has finished for PR 26845 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor things.
| /** exposed for testing */ | ||
| private[producer] val cacheExpireTimeoutMillis: Long = conf.get(PRODUCER_CACHE_TIMEOUT) | ||
|
|
||
| private val evictorThreadRunIntervalMillis = conf.get(PRODUCER_CACHE_EVICTOR_THREAD_RUN_INTERVAL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only used in evictorThreadRunIntervalMillis, doesn't need to be a field.
| } | ||
|
|
||
| private[producer] def release(producer: CachedKafkaProducer): Unit = { | ||
| def closeProducerNotInCache(producer: CachedKafkaProducer): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only called in one place, inline.
| } | ||
|
|
||
| private[producer] def shutdown(): Unit = { | ||
| scheduled.foreach(_.cancel(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
false is safer
| } | ||
|
|
||
| def handleReturned(curTimeNs: Long): Unit = { | ||
| _refCount -= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be pedantic, you should check the state before modifying it.
|
Test build #115473 has finished for PR 26845 at commit
|
|
retest this please |
|
Test build #115658 has finished for PR 26845 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. You could avoid creating the thread pool if the config is set to not expire, but not worth the trouble.
Merging to master (I'll fix the comment during merge).
| } | ||
| } | ||
|
|
||
| private var scheduled = startEvictorThread() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a val now.
|
Thanks for reviewing and merging! |
… package ### What changes were proposed in this pull request? There're too many classes placed in a single package "org.apache.spark.sql.kafka010" which classes can be grouped by purpose. As a part of change in SPARK-21869 (#26845), we moved out producer related classes to "org.apache.spark.sql.kafka010.producer" and only expose necessary classes/methods to the outside of package. This patch applies the same to consumer related classes. ### Why are the changes needed? Described above. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UTs. Closes #26991 from HeartSaVioR/SPARK-30336. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Sean Owen <[email protected]>
|
Thanks guys to take care of this better approach. Just had a slight view and looks good. I'm going to have a deeper look to catch up... |
|
Thanks for the suggestion! Submitted #27146 to address documentation. Actually the producer pool is very simple in end users' point of view, so it's most likely closer to introduce available configurations. |
…uration ### What changes were proposed in this pull request? This patch documents the configuration for the Kafka producer pool, newly revised via SPARK-21869 (#26845) ### Why are the changes needed? The explanation of new Kafka producer pool configuration is missing, whereas the doc has Kafka consumer pool configuration. ### Does this PR introduce any user-facing change? Yes. This is a documentation change.  ### How was this patch tested? N/A Closes #27146 from HeartSaVioR/SPARK-21869-FOLLOWUP. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…rrectly This patch revises Kafka producer pool (cache) to implement 'expire' correctly. Current implementation of Kafka producer cache leverages Guava cache, which decides cached producer instance to be expired if the instance is not "accessed" from cache. The behavior defines expiration time as "last accessed time + timeout", which is incorrect because some task may use the instance longer than timeout. There's no concept of "returning" in Guava cache as well, so it cannot be fixed with Guava cache. This patch introduces a new pool implementation which tracks "reference count" of cached instance, and defines expiration time for the instance as "last returned time + timeout" if the reference count goes 0, otherwise Long.MaxValue (effectively no expire). Expiring instances will be done with evict thread explicitly instead of evicting in part of handling acquire. (It might bring more overhead, but it ensures clearing expired instances even the pool is idle.) This patch also creates a new package `producer` under `kafka010`, to hide the details from `kafka010` package. In point of `kafka010` package's view, only acquire()/release()/reset() are available in pool, and even for CachedKafkaProducer the package cannot close the producer directly. Explained above. Yes, but only for the way of expiring cached instances. (The difference is described above.) Each executor leveraging spark-sql-kafka would have one eviction thread. New and existing UTs. Closes apache#26845 from HeartSaVioR/SPARK-21869-revised. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
What changes were proposed in this pull request?
This patch revises Kafka producer pool (cache) to implement 'expire' correctly.
Current implementation of Kafka producer cache leverages Guava cache, which decides cached producer instance to be expired if the instance is not "accessed" from cache. The behavior defines expiration time as "last accessed time + timeout", which is incorrect because some task may use the instance longer than timeout. There's no concept of "returning" in Guava cache as well, so it cannot be fixed with Guava cache.
This patch introduces a new pool implementation which tracks "reference count" of cached instance, and defines expiration time for the instance as "last returned time + timeout" if the reference count goes 0, otherwise Long.MaxValue (effectively no expire). Expiring instances will be done with evict thread explicitly instead of evicting in part of handling acquire. (It might bring more overhead, but it ensures clearing expired instances even the pool is idle.)
This patch also creates a new package
producerunderkafka010, to hide the details fromkafka010package. In point ofkafka010package's view, only acquire()/release()/reset() are available in pool, and even for CachedKafkaProducer the package cannot close the producer directly.Why are the changes needed?
Explained above.
Does this PR introduce any user-facing change?
Yes, but only for the way of expiring cached instances. (The difference is described above.) Each executor leveraging spark-sql-kafka would have one eviction thread.
How was this patch tested?
New and existing UTs.