From f7e6b4ebfd83fcdd18734abd15b9deca95ed6f52 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 26 Jun 2020 16:09:31 +0900 Subject: [PATCH 1/2] [SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory --- .../execution/streaming/FileStreamSink.scala | 19 +++++++++++-------- .../sql/streaming/FileStreamSinkSuite.scala | 10 ++++++---- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 32245470d8f5..1440313eb53d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -45,8 +45,7 @@ object FileStreamSink extends Logging { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) if (fs.isDirectory(hdfsPath)) { - val metadataPath = new Path(hdfsPath, metadataDir) - checkEscapedMetadataPath(fs, metadataPath, sqlConf) + val metadataPath = getMetadataLogPath(hdfsPath, hadoopConf, sqlConf) fs.exists(metadataPath) } else { false @@ -55,6 +54,13 @@ object FileStreamSink extends Logging { } } + def getMetadataLogPath(path: Path, hadoopConf: Configuration, sqlConf: SQLConf): Path = { + val metadataDir = new Path(path, FileStreamSink.metadataDir) + val fs = metadataDir.getFileSystem(hadoopConf) + FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sqlConf) + metadataDir + } + def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = { if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) && StreamExecution.containsSpecialCharsInPath(metadataPath)) { @@ -125,14 +131,11 @@ class FileStreamSink( partitionColumnNames: Seq[String], options: Map[String, String]) extends Sink with Logging { + import FileStreamSink._ + private val hadoopConf = sparkSession.sessionState.newHadoopConf() private val basePath = new Path(path) - private val logPath = { - val metadataDir = new Path(basePath, FileStreamSink.metadataDir) - val fs = metadataDir.getFileSystem(hadoopConf) - FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf) - metadataDir - } + private val logPath = getMetadataLogPath(basePath, hadoopConf, sparkSession.sessionState.conf) private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index a25451bef62f..5f584c3721bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -555,10 +555,12 @@ abstract class FileStreamSinkSuite extends StreamTest { } } - val fs = new Path(outputDir.getCanonicalPath).getFileSystem( - spark.sessionState.newHadoopConf()) - val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, - outputDir.getCanonicalPath) + val outputDirPath = new Path(outputDir.getCanonicalPath) + val hadoopConf = spark.sessionState.newHadoopConf() + val logPath = FileStreamSink.getMetadataLogPath(outputDirPath, hadoopConf, conf) + val fs = logPath.getFileSystem(hadoopConf) + + val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toString) val allFiles = sinkLog.allFiles() // only files from non-empty partition should be logged From 8f949a1c615c2c8824dd027e52cdb7f26d085c3f Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 29 Jun 2020 14:54:07 +0900 Subject: [PATCH 2/2] Reflect review comment --- .../spark/sql/execution/streaming/FileStreamSink.scala | 8 ++++---- .../apache/spark/sql/streaming/FileStreamSinkSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 1440313eb53d..ecaf4f8160a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -45,7 +45,7 @@ object FileStreamSink extends Logging { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) if (fs.isDirectory(hdfsPath)) { - val metadataPath = getMetadataLogPath(hdfsPath, hadoopConf, sqlConf) + val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf) fs.exists(metadataPath) } else { false @@ -54,9 +54,8 @@ object FileStreamSink extends Logging { } } - def getMetadataLogPath(path: Path, hadoopConf: Configuration, sqlConf: SQLConf): Path = { + def getMetadataLogPath(fs: FileSystem, path: Path, sqlConf: SQLConf): Path = { val metadataDir = new Path(path, FileStreamSink.metadataDir) - val fs = metadataDir.getFileSystem(hadoopConf) FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sqlConf) metadataDir } @@ -135,7 +134,8 @@ class FileStreamSink( private val hadoopConf = sparkSession.sessionState.newHadoopConf() private val basePath = new Path(path) - private val logPath = getMetadataLogPath(basePath, hadoopConf, sparkSession.sessionState.conf) + private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath, + sparkSession.sessionState.conf) private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 5f584c3721bc..4ccab58d24fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -557,8 +557,8 @@ abstract class FileStreamSinkSuite extends StreamTest { val outputDirPath = new Path(outputDir.getCanonicalPath) val hadoopConf = spark.sessionState.newHadoopConf() - val logPath = FileStreamSink.getMetadataLogPath(outputDirPath, hadoopConf, conf) - val fs = logPath.getFileSystem(hadoopConf) + val fs = outputDirPath.getFileSystem(hadoopConf) + val logPath = FileStreamSink.getMetadataLogPath(fs, outputDirPath, conf) val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toString)