Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,20 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
None
}

/**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
*/
override def purge(thresholdBatchId: Long): Unit = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))

for (batchId <- batchIds if batchId < thresholdBatchId) {
val path = batchIdToPath(batchId)
fileManager.delete(path)
logTrace(s"Removed metadata log file: $path")
}
}

private def createFileManager(): FileManager = {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ trait MetadataLog[T] {
* Return the latest batch Id and its metadata if exist.
*/
def getLatest(): Option[(Long, T)]

/**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
* This operation should be idempotent.
*/
def purge(thresholdBatchId: Long): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
test("FileManager: FileContextManager") {
withTempDir { temp =>
val path = new Path(temp.getAbsolutePath)
testManager(path, new FileContextManager(path, new Configuration))
testFileManager(path, new FileContextManager(path, new Configuration))
}
}

test("FileManager: FileSystemManager") {
withTempDir { temp =>
val path = new Path(temp.getAbsolutePath)
testManager(path, new FileSystemManager(path, new Configuration))
testFileManager(path, new FileSystemManager(path, new Configuration))
}
}

Expand Down Expand Up @@ -103,6 +103,25 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("HDFSMetadataLog: purge") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
assert(metadataLog.add(1, "batch1"))
assert(metadataLog.add(2, "batch2"))
assert(metadataLog.get(0).isDefined)
assert(metadataLog.get(1).isDefined)
assert(metadataLog.get(2).isDefined)
assert(metadataLog.getLatest().get._1 == 2)

metadataLog.purge(2)
assert(metadataLog.get(0).isEmpty)
assert(metadataLog.get(1).isEmpty)
assert(metadataLog.get(2).isDefined)
assert(metadataLog.getLatest().get._1 == 2)
}
}

testWithUninterruptibleThread("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
Expand Down Expand Up @@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}


def testManager(basePath: Path, fm: FileManager): Unit = {
/** Basic test case for [[FileManager]] implementation. */
private def testFileManager(basePath: Path, fm: FileManager): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this because initially I thought it's a noun meaning "manager for testing", rather than "to test the file manager".

// Mkdirs
val dir = new Path(s"$basePath/dir/subdir/subsubdir")
assert(!fm.exists(dir))
Expand Down