Skip to content

Commit d39fee9

Browse files
committed
New constructor for NonFateSharingCache
1 parent 6697725 commit d39fee9

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.util
1919

20-
import java.util.concurrent.Callable
20+
import java.util.concurrent.{Callable, TimeUnit}
2121

2222
import com.google.common.cache.{Cache, CacheBuilder, CacheLoader, LoadingCache}
2323

@@ -68,6 +68,20 @@ private[spark] object NonFateSharingCache {
6868
override def load(k: K): V = loadingFunc.apply(k)
6969
}))
7070
}
71+
72+
def apply[K, V](
73+
maximumSize: Long,
74+
expireAfterAccessTime: Long,
75+
expireAfterAccessTimeUnit: TimeUnit): NonFateSharingCache[K, V] = {
76+
val builder = CacheBuilder.newBuilder().asInstanceOf[CacheBuilder[K, V]]
77+
if (maximumSize > 0L) {
78+
builder.maximumSize(maximumSize)
79+
}
80+
if(expireAfterAccessTime > 0) {
81+
builder.expireAfterAccess(expireAfterAccessTime, expireAfterAccessTimeUnit)
82+
}
83+
new NonFateSharingCache(builder.build[K, V]())
84+
}
7185
}
7286

7387
private[spark] class NonFateSharingCache[K, V](protected val cache: Cache[K, V]) {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2323

2424
import scala.util.control.NonFatal
2525

26-
import com.google.common.cache.CacheBuilder
2726
import org.apache.hadoop.conf.Configuration
2827
import org.apache.hadoop.fs.Path
2928

@@ -613,15 +612,15 @@ object RocksDBStateStoreProvider {
613612
val VIRTUAL_COL_FAMILY_PREFIX_BYTES = 2
614613

615614
private val MAX_AVRO_ENCODERS_IN_CACHE = 1000
616-
// Add the cache at companion object level so it persists across provider instances
617-
private val avroEncoderMap: NonFateSharingCache[String, AvroEncoder] = {
618-
val guavaCache = CacheBuilder.newBuilder()
619-
.maximumSize(MAX_AVRO_ENCODERS_IN_CACHE) // Adjust size based on your needs
620-
.expireAfterAccess(1, TimeUnit.HOURS) // Optional: Add expiration if needed
621-
.build[String, AvroEncoder]()
615+
private val AVRO_ENCODER_LIFETIME_HOURS = 1L
622616

623-
new NonFateSharingCache(guavaCache)
624-
}
617+
// Add the cache at companion object level so it persists across provider instances
618+
private val avroEncoderMap: NonFateSharingCache[String, AvroEncoder] =
619+
NonFateSharingCache(
620+
maximumSize = MAX_AVRO_ENCODERS_IN_CACHE,
621+
expireAfterAccessTime = AVRO_ENCODER_LIFETIME_HOURS,
622+
expireAfterAccessTimeUnit = TimeUnit.HOURS
623+
)
625624

626625
def getAvroEnc(
627626
stateStoreEncoding: String,

0 commit comments

Comments
 (0)