Skip to content

Commit dfdfc30

Browse files
petermaxleerxin
authored andcommitted
[SPARK-17235][SQL] Support purging of old logs in MetadataLog
## What changes were proposed in this pull request? This patch adds a purge interface to MetadataLog, and an implementation in HDFSMetadataLog. The purge function is currently unused, but I will use it to purge old execution and file source logs in follow-up patches. These changes are required in a production structured streaming job that runs for a long period of time. ## How was this patch tested? Added a unit test case in HDFSMetadataLogSuite. Author: petermaxlee <[email protected]> Closes #14802 from petermaxlee/SPARK-17235. (cherry picked from commit f64a1dd) Signed-off-by: Reynold Xin <[email protected]>
1 parent 52feb3f commit dfdfc30

File tree

3 files changed

+43
-4
lines changed

3 files changed

+43
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,20 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
227227
None
228228
}
229229

230+
/**
231+
* Removes all the log entry earlier than thresholdBatchId (exclusive).
232+
*/
233+
override def purge(thresholdBatchId: Long): Unit = {
234+
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
235+
.map(f => pathToBatchId(f.getPath))
236+
237+
for (batchId <- batchIds if batchId < thresholdBatchId) {
238+
val path = batchIdToPath(batchId)
239+
fileManager.delete(path)
240+
logTrace(s"Removed metadata log file: $path")
241+
}
242+
}
243+
230244
private def createFileManager(): FileManager = {
231245
val hadoopConf = sparkSession.sessionState.newHadoopConf()
232246
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,10 @@ trait MetadataLog[T] {
4848
* Return the latest batch Id and its metadata if exist.
4949
*/
5050
def getLatest(): Option[(Long, T)]
51+
52+
/**
53+
* Removes all the log entry earlier than thresholdBatchId (exclusive).
54+
* This operation should be idempotent.
55+
*/
56+
def purge(thresholdBatchId: Long): Unit
5157
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
4646
test("FileManager: FileContextManager") {
4747
withTempDir { temp =>
4848
val path = new Path(temp.getAbsolutePath)
49-
testManager(path, new FileContextManager(path, new Configuration))
49+
testFileManager(path, new FileContextManager(path, new Configuration))
5050
}
5151
}
5252

5353
test("FileManager: FileSystemManager") {
5454
withTempDir { temp =>
5555
val path = new Path(temp.getAbsolutePath)
56-
testManager(path, new FileSystemManager(path, new Configuration))
56+
testFileManager(path, new FileSystemManager(path, new Configuration))
5757
}
5858
}
5959

@@ -103,6 +103,25 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
103103
}
104104
}
105105

106+
testWithUninterruptibleThread("HDFSMetadataLog: purge") {
107+
withTempDir { temp =>
108+
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
109+
assert(metadataLog.add(0, "batch0"))
110+
assert(metadataLog.add(1, "batch1"))
111+
assert(metadataLog.add(2, "batch2"))
112+
assert(metadataLog.get(0).isDefined)
113+
assert(metadataLog.get(1).isDefined)
114+
assert(metadataLog.get(2).isDefined)
115+
assert(metadataLog.getLatest().get._1 == 2)
116+
117+
metadataLog.purge(2)
118+
assert(metadataLog.get(0).isEmpty)
119+
assert(metadataLog.get(1).isEmpty)
120+
assert(metadataLog.get(2).isDefined)
121+
assert(metadataLog.getLatest().get._1 == 2)
122+
}
123+
}
124+
106125
testWithUninterruptibleThread("HDFSMetadataLog: restart") {
107126
withTempDir { temp =>
108127
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
@@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
155174
}
156175
}
157176

158-
159-
def testManager(basePath: Path, fm: FileManager): Unit = {
177+
/** Basic test case for [[FileManager]] implementation. */
178+
private def testFileManager(basePath: Path, fm: FileManager): Unit = {
160179
// Mkdirs
161180
val dir = new Path(s"$basePath/dir/subdir/subsubdir")
162181
assert(!fm.exists(dir))

0 commit comments

Comments
 (0)