Skip to content

Commit be1abfa

Browse files
committed
Address the comments
1 parent 56a00ae commit be1abfa

File tree

2 files changed

+12
-13
lines changed

2 files changed

+12
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
6868
protected def deserializeData(encodedString: String): T
6969

7070
/**
71-
* Filter out the obsolote logs.
71+
* Filter out the obsolete logs.
7272
*/
7373
def compactLogs(logs: Seq[T]): Seq[T]
7474

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20+
import java.util.{LinkedHashMap => JLinkedHashMap}
21+
import java.util.Map.Entry
22+
2023
import scala.collection.mutable
2124

2225
import org.json4s.NoTypeHints
@@ -49,17 +52,13 @@ class FileStreamSourceLog(
4952

5053
private implicit val formats = Serialization.formats(NoTypeHints)
5154

52-
// A fixed size log cache to cache the file entries belong to the compaction batch. It is used
53-
// to avoid scanning the compacted log file to retrieve it's own batch data.
55+
// A fixed size log entry cache to cache the file entries belong to the compaction batch. It is
56+
// used to avoid scanning the compacted log file to retrieve it's own batch data.
5457
private val cacheSize = compactInterval
55-
private val fileEntryCache = new mutable.LinkedHashMap[Long, Array[FileEntry]]
56-
57-
private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = {
58-
if (fileEntryCache.size >= cacheSize) {
59-
fileEntryCache.drop(1)
58+
private val fileEntryCache = new JLinkedHashMap[Long, Array[FileEntry]] {
59+
override def removeEldestEntry(eldest: Entry[Long, Array[FileEntry]]): Boolean = {
60+
size() > cacheSize
6061
}
61-
62-
fileEntryCache.put(batchId, logs)
6362
}
6463

6564
protected override def serializeData(data: FileEntry): String = {
@@ -76,7 +75,7 @@ class FileStreamSourceLog(
7675

7776
override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
7877
if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) {
79-
updateCache(batchId, logs)
78+
fileEntryCache.put(batchId, logs)
8079
true
8180
} else if (!isCompactionBatch(batchId, compactInterval)) {
8281
true
@@ -90,8 +89,8 @@ class FileStreamSourceLog(
9089
val endBatchId = getLatest().map(_._1).getOrElse(0L)
9190

9291
val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id =>
93-
if (isCompactionBatch(id, compactInterval) && fileEntryCache.contains(id)) {
94-
(id, Some(fileEntryCache(id)))
92+
if (isCompactionBatch(id, compactInterval) && fileEntryCache.containsKey(id)) {
93+
(id, Some(fileEntryCache.get(id)))
9594
} else {
9695
val logs = super.get(id).map(_.filter(_.batchId == id))
9796
(id, logs)

0 commit comments

Comments
 (0)