Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class FileStreamSource(
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
options = sourceOptions.optionMapWithoutPath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
checkPathExist = false)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ class FileCatalogSuite extends SharedSQLContext {
}
}

test("ListingFileCatalog: folders that don't exist don't throw exceptions") {
withTempDir { dir =>
val deletedFolder = new File(dir, "deleted")
assert(!deletedFolder.exists())
val catalog1 = new ListingFileCatalog(
spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None,
ignoreFileNotFound = true)
// doesn't throw an exception
assert(catalog1.listLeafFiles(catalog1.paths).isEmpty)
}
}

test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
class MockCatalog(
override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@

package org.apache.spark.sql.execution.streaming

import java.io.{File, FileNotFoundException}
import java.net.URI

import scala.util.Random

import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType

class FileStreamSourceSuite extends SparkFunSuite {
class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {

import FileStreamSource._

Expand Down Expand Up @@ -73,4 +83,43 @@ class FileStreamSourceSuite extends SparkFunSuite {
assert(map.isNewFile(FileEntry("b", 10)))
}

testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
withTempDir { temp =>
spark.conf.set(
s"fs.$scheme.impl",
classOf[ExistsThrowsExceptionFileSystem].getName)
// add the metadata entries as a pre-req
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0))))

val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
dir.getAbsolutePath, Map.empty)
// this method should throw an exception if `fs.exists` is called during resolveRelation
newSource.getBatch(None, LongOffset(1))
}
}
}

/** Fake FileSystem to test whether the method `fs.exists` is called during
* `DataSource.resolveRelation`.
*/
class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
override def getUri: URI = {
URI.create(s"$scheme:///")
}

override def exists(f: Path): Boolean = {
throw new IllegalArgumentException("Exists shouldn't have been called!")
}

/** Simply return an empty file for now. */
override def listStatus(file: Path): Array[FileStatus] = {
throw new FileNotFoundException("Folder was suddenly deleted but this should not make it fail!")
}
}

object ExistsThrowsExceptionFileSystem {
val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
}