Skip to content

Commit be9d57f

Browse files
petermaxleerxin
authored andcommitted
[SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata
## What changes were proposed in this pull request? This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the purge method that was added as part of SPARK-17235. This is based on work by frreiss in #15067, but fixed the test case along with some typos. ## How was this patch tested? A new test case in StreamingQuerySuite. The test case would fail without the changes in this pull request. Author: petermaxlee <[email protected]> Author: frreiss <[email protected]> Closes #15126 from petermaxlee/SPARK-17513.
1 parent 26145a5 commit be9d57f

File tree

3 files changed

+32
-0
lines changed

3 files changed

+32
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming
2424
* - Allow the user to query the latest batch id.
2525
* - Allow the user to query the metadata object of a specified batch id.
2626
* - Allow the user to query metadata objects in a range of batch ids.
27+
* - Allow the user to remove obsolete metadata
2728
*/
2829
trait MetadataLog[T] {
2930

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,13 @@ class StreamExecution(
290290
assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
291291
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
292292
logInfo(s"Committed offsets for batch $currentBatchId.")
293+
294+
// Now that we have logged the new batch, no further processing will happen for
295+
// the previous batch, and it is safe to discard the old metadata.
296+
// Note that purge is exclusive, i.e. it purges everything before currentBatchId.
297+
// NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
298+
// flight at the same time), this cleanup logic will need to change.
299+
offsetLog.purge(currentBatchId)
293300
} else {
294301
awaitBatchLock.lock()
295302
try {

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter {
125125
)
126126
}
127127

128+
testQuietly("StreamExecution metadata garbage collection") {
129+
val inputData = MemoryStream[Int]
130+
val mapped = inputData.toDS().map(6 / _)
131+
132+
// Run 3 batches, and then assert that only 1 metadata file is left at the end
133+
// since the first 2 should have been purged.
134+
testStream(mapped)(
135+
AddData(inputData, 1, 2),
136+
CheckAnswer(6, 3),
137+
AddData(inputData, 1, 2),
138+
CheckAnswer(6, 3, 6, 3),
139+
AddData(inputData, 4, 6),
140+
CheckAnswer(6, 3, 6, 3, 1, 1),
141+
142+
AssertOnQuery("metadata log should contain only one file") { q =>
143+
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
144+
val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
145+
val toTest = logFileNames // Workaround for SPARK-17475
146+
assert(toTest.size == 1 && toTest.head == "2")
147+
true
148+
}
149+
)
150+
}
151+
128152
/**
129153
* A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`.
130154
*

0 commit comments

Comments
 (0)