diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 32245470d8f5..ecaf4f8160a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -45,8 +45,7 @@ object FileStreamSink extends Logging { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) if (fs.isDirectory(hdfsPath)) { - val metadataPath = new Path(hdfsPath, metadataDir) - checkEscapedMetadataPath(fs, metadataPath, sqlConf) + val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf) fs.exists(metadataPath) } else { false @@ -55,6 +54,12 @@ object FileStreamSink extends Logging { } } + def getMetadataLogPath(fs: FileSystem, path: Path, sqlConf: SQLConf): Path = { + val metadataDir = new Path(path, FileStreamSink.metadataDir) + FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sqlConf) + metadataDir + } + def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = { if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) && StreamExecution.containsSpecialCharsInPath(metadataPath)) { @@ -125,14 +130,12 @@ class FileStreamSink( partitionColumnNames: Seq[String], options: Map[String, String]) extends Sink with Logging { + import FileStreamSink._ + private val hadoopConf = sparkSession.sessionState.newHadoopConf() private val basePath = new Path(path) - private val logPath = { - val metadataDir = new Path(basePath, FileStreamSink.metadataDir) - val fs = metadataDir.getFileSystem(hadoopConf) - FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf) - metadataDir - } + private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath, + sparkSession.sessionState.conf) private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index a25451bef62f..4ccab58d24fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -555,10 +555,12 @@ abstract class FileStreamSinkSuite extends StreamTest { } } - val fs = new Path(outputDir.getCanonicalPath).getFileSystem( - spark.sessionState.newHadoopConf()) - val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, - outputDir.getCanonicalPath) + val outputDirPath = new Path(outputDir.getCanonicalPath) + val hadoopConf = spark.sessionState.newHadoopConf() + val fs = outputDirPath.getFileSystem(hadoopConf) + val logPath = FileStreamSink.getMetadataLogPath(fs, outputDirPath, conf) + + val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toString) val allFiles = sinkLog.allFiles() // only files from non-empty partition should be logged