Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command

import java.io.File
import java.net.URI
import java.nio.file.FileSystems
import java.util.Date

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -246,7 +247,27 @@ case class LoadDataCommand(
val loadPath =
if (isLocal) {
val uri = Utils.resolveURI(path)
if (!new File(uri.getPath()).exists()) {
val filePath = uri.getPath()
val exists = if (filePath.contains("*")) {
val fileSystem = FileSystems.getDefault
val pathPattern = fileSystem.getPath(filePath)
val dir = pathPattern.getParent.toString
if (dir.contains("*")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the * appears in the grandparent dir? e.g. dir*/subdir/fileName

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will result in an AnalysisException and that seems like the intended behavior. Only a * in the file itself is supported.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @cloud-fan and @srowen . Yes. It's the intended behavior of this PR.

throw new AnalysisException(
s"LOAD DATA input path allows only filename wildcard: $path")
}

val files = new File(dir).listFiles()
if (files == null) {
false
} else {
val matcher = fileSystem.getPathMatcher("glob:" + pathPattern.toAbsolutePath)
files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
}
} else {
new File(filePath).exists()
}
if (!exists) {
throw new AnalysisException(s"LOAD DATA input path does not exist: $path")
}
uri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.spark.sql.hive.execution

import java.io.{File, PrintWriter}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try

import com.google.common.io.Files
import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
Expand Down Expand Up @@ -1886,6 +1889,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH") {
withTempDir { dir =>
for (i <- 1 to 3) {
Files.write(s"$i", new File(s"$dir/part-r-0000$i"), StandardCharsets.UTF_8)
}
for (i <- 5 to 7) {
Files.write(s"$i", new File(s"$dir/part-s-0000$i"), StandardCharsets.UTF_8)
}

withTable("load_t") {
sql("CREATE TABLE load_t (a STRING)")
sql(s"LOAD DATA LOCAL INPATH '$dir/*part-r*' INTO TABLE load_t")
checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), Row("3")))

val m = intercept[AnalysisException] {
sql("LOAD DATA LOCAL INPATH '/non-exist-folder/*part*' INTO TABLE load_t")
}.getMessage
assert(m.contains("LOAD DATA input path does not exist"))

val m2 = intercept[AnalysisException] {
sql(s"LOAD DATA LOCAL INPATH '$dir*/*part*' INTO TABLE load_t")
}.getMessage
assert(m2.contains("LOAD DATA input path allows only filename wildcard"))
}
}
}

def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0
Expand Down