From 9b7e2de8d9b6a80095300b9e9f64bc620aa50023 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 16 Sep 2016 16:00:26 -0700 Subject: [PATCH 1/4] make StructuredStreaming fileSource batch generation faster --- .../spark/sql/execution/datasources/DataSource.scala | 10 ++++++++-- .../sql/execution/streaming/FileStreamSource.scala | 3 ++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 93154bd2ca69c..413976a7ef244 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -316,8 +316,14 @@ case class DataSource( /** * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this * [[DataSource]] + * + * @param checkFilesExist Whether to confirm that the files exist when generating the + * non-streaming file based datasource. StructuredStreaming jobs already + * list file existence, and when generating incremental jobs, the batch + * is considered as a non-streaming file based data source. Since we know + * that files already exist, we don't need to check them again. */ - def resolveRelation(): BaseRelation = { + def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = { val caseInsensitiveOptions = new CaseInsensitiveMap(options) val relation = (providingClass.newInstance(), userSpecifiedSchema) match { // TODO: Throw when too much is given. @@ -368,7 +374,7 @@ case class DataSource( throw new AnalysisException(s"Path does not exist: $qualified") } // Sufficient to check head of the globPath seq for non-glob scenario - if (!fs.exists(globPath.head)) { + if (checkFilesExist && !fs.exists(globPath.head)) { throw new AnalysisException(s"Path does not exist: ${globPath.head}") } globPath diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 42fb454c2d158..9abca90317d7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -136,7 +136,8 @@ class FileStreamSource( userSpecifiedSchema = Some(schema), className = fileFormatClassName, options = sourceOptions.optionMapWithoutPath) - Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation())) + Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation( + checkFilesExist = false))) } /** From 0cf9c0882ed99bc4b4cffafb730e28d7d0cdb69e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 19 Sep 2016 10:36:33 -0700 Subject: [PATCH 2/4] Add test --- .../streaming/FileStreamSourceSuite.scala | 52 ++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index c6db2fd3f908e..12a0252d3dc46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -17,9 +17,22 @@ package org.apache.spark.sql.execution.streaming +import java.io.File +import java.net.URI + +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} +import org.scalatest.mock.MockitoSugar + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType -class FileStreamSourceSuite extends SparkFunSuite { +class FileStreamSourceSuite extends SparkFunSuite + with SharedSQLContext + with MockitoSugar { import FileStreamSource._ @@ -73,4 +86,41 @@ class FileStreamSourceSuite extends SparkFunSuite { assert(map.isNewFile(FileEntry("b", 10))) } + testWithUninterruptibleThread("do not recheck that files exist during getBatch") { + withTempDir { temp => + spark.conf.set( + s"fs.$scheme.impl", + classOf[ExistsThrowsExceptionFileSystem].getName) + // add the metadata entries as a pre-req + val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir + val metadataLog = new HDFSMetadataLog[Array[FileEntry]](spark, dir.getAbsolutePath) + assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L)))) + + val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), + dir.getAbsolutePath, Map.empty) + // this method should throw an exception if `fs.exists` is called during resolveRelation + newSource.getBatch(None, LongOffset(1)) + } + } +} + +/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */ +class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { + override def getUri: URI = { + URI.create(s"$scheme:///") + } + + override def exists(f: Path): Boolean = { + throw new IllegalArgumentException("Exists shouldn't have been called!") + } + + override def listStatus(file: Path): Array[FileStatus] = { + val emptyFile = new FileStatus() + emptyFile.setPath(file) + Array(emptyFile) + } +} + +object ExistsThrowsExceptionFileSystem { + val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs" } From 6221b37ebc648d91e934f0425a6f60885215d78a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 19 Sep 2016 10:39:05 -0700 Subject: [PATCH 3/4] clean up --- .../execution/streaming/FileStreamSourceSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index 12a0252d3dc46..2e85eff2253fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -23,16 +23,13 @@ import java.net.URI import scala.util.Random import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} -import org.scalatest.mock.MockitoSugar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.StructType -class FileStreamSourceSuite extends SparkFunSuite - with SharedSQLContext - with MockitoSugar { +class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext { import FileStreamSource._ @@ -104,7 +101,9 @@ class FileStreamSourceSuite extends SparkFunSuite } } -/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */ +/** Fake FileSystem to test whether the method `fs.exists` is called during + * `DataSource.resolveRelation`. + */ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { override def getUri: URI = { URI.create(s"$scheme:///") @@ -114,6 +113,7 @@ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { throw new IllegalArgumentException("Exists shouldn't have been called!") } + /** Simply return an empty file for now. */ override def listStatus(file: Path): Array[FileStatus] = { val emptyFile = new FileStatus() emptyFile.setPath(file) From 666c2c50114349eeae5bf0d532e03a13de09c434 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 21 Sep 2016 14:57:16 -0700 Subject: [PATCH 4/4] address --- .../spark/sql/execution/streaming/FileStreamSourceSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index 2e85eff2253fa..e8fa6a59c57ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -90,7 +90,8 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext { classOf[ExistsThrowsExceptionFileSystem].getName) // add the metadata entries as a pre-req val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir - val metadataLog = new HDFSMetadataLog[Array[FileEntry]](spark, dir.getAbsolutePath) + val metadataLog = + new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath) assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L)))) val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),