Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ object FileStreamSink extends Logging {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
val metadataPath = new Path(hdfsPath, metadataDir)
checkEscapedMetadataPath(fs, metadataPath, sqlConf)
val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
fs.exists(metadataPath)
} else {
false
Expand All @@ -55,6 +54,12 @@ object FileStreamSink extends Logging {
}
}

def getMetadataLogPath(fs: FileSystem, path: Path, sqlConf: SQLConf): Path = {
val metadataDir = new Path(path, FileStreamSink.metadataDir)
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sqlConf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is new code. So this PR fixes a bug instead of just fixing a test?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jun 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry I should have explained before. That's to avoid test to copy same code (logic) what FileStreamSink does, so that we don't break the test when we somehow change it. So that's a refactor and doesn't mean there's a bug in FileStreamSink.

metadataDir
}

def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = {
if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
&& StreamExecution.containsSpecialCharsInPath(metadataPath)) {
Expand Down Expand Up @@ -125,14 +130,12 @@ class FileStreamSink(
partitionColumnNames: Seq[String],
options: Map[String, String]) extends Sink with Logging {

import FileStreamSink._

private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val basePath = new Path(path)
private val logPath = {
val metadataDir = new Path(basePath, FileStreamSink.metadataDir)
val fs = metadataDir.getFileSystem(hadoopConf)
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf)
metadataDir
}
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
sparkSession.sessionState.conf)
private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,12 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}

val fs = new Path(outputDir.getCanonicalPath).getFileSystem(
spark.sessionState.newHadoopConf())
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
outputDir.getCanonicalPath)
val outputDirPath = new Path(outputDir.getCanonicalPath)
val hadoopConf = spark.sessionState.newHadoopConf()
val fs = outputDirPath.getFileSystem(hadoopConf)
val logPath = FileStreamSink.getMetadataLogPath(fs, outputDirPath, conf)

val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toString)

val allFiles = sinkLog.allFiles()
// only files from non-empty partition should be logged
Expand Down