Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2659,7 +2659,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
# It makes sure that we can omit path argument in write.df API and then it calls
# DataFrameWriter.save() without path.
expect_error(write.df(df, source = "csv"),
"Error in save : illegal argument - Expected exactly one path to be specified")
"Error in save : illegal argument - 'path' is not specified")
expect_error(write.json(df, jsonPath),
"Error in json : analysis error - path file:.*already exists")
expect_error(write.text(df, jsonPath),
Expand All @@ -2684,8 +2684,7 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume
# It makes sure that we can omit path argument in read.df API and then it calls
# DataFrameWriter.load() without path.
expect_error(read.df(source = "json"),
paste("Error in loadDF : analysis error - Unable to infer schema for JSON at .",
"It must be specified manually"))
paste("Error in loadDF : illegal argument - 'path' is not specified"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall this test is intentionally testing without path argument?
cc @HyukjinKwon

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cc'ing me. Yes, I did. It seems the changes are reasonable as it seems this checking applies to the data sources that need path.

expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not exist")
expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist")
expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
Expand Down Expand Up @@ -322,6 +323,9 @@ case class DataSource(
val equality = sparkSession.sessionState.conf.resolver
StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
}.orElse {
if (allPaths.isEmpty && !format.isInstanceOf[TextFileFormat]) {
Copy link
Member

@HyukjinKwon HyukjinKwon Nov 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gatorsmile, would this be better if we explain here text data source is excluded because text datasource always uses a schema consisting of a string field if the schema is not explicitly given?

BTW, should we maybe change text.TextFileFormat to TextFileFormat https://github.com/gatorsmile/spark/blob/45110370fb1889f244a6750ef2a18dbc9f1ba9c2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L139 ?

throw new IllegalArgumentException("'path' is not specified")
}
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
Expand Down Expand Up @@ -369,6 +373,8 @@ case class DataSource(
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else if (allPaths.length < 1) {
throw new IllegalArgumentException("'path' is not specified")
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${allPaths.mkString(", ")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[parquet] class ParquetOptions(
if (!shortParquetCompressionCodecNames.contains(codecName)) {
val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
s"is not available. Known codecs are ${availableCodecs.mkString(", ")}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make it consistent with the output of the other cases. See the code:

case e: ClassNotFoundException =>
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Available was intentionally used because Parquet only supports snappy, gzip or lzo whereas Known was used for text-based ones (Please see #10805 (comment)) as they support compression codecs including other codecs but that lists the known ones.

}
shortParquetCompressionCodecNames(codecName).name()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,32 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}

test("save API - empty path or illegal path") {
var e = intercept[IllegalArgumentException] {
spark.range(1).coalesce(1).write.format("csv").save()
}.getMessage
assert(e.contains("'path' is not specified"))

e = intercept[IllegalArgumentException] {
spark.range(1).coalesce(1).write.csv("")
}.getMessage
assert(e.contains("Can not create a Path from an empty string"))
}

test("load API - empty path") {
val expectedErrorMsg = "'path' is not specified"
var e = intercept[IllegalArgumentException] {
spark.read.csv()
}.getMessage
assert(e.contains(expectedErrorMsg))

e = intercept[IllegalArgumentException] {
spark.read.format("csv").load()
}.getMessage
assert(e.contains(expectedErrorMsg))
}


test("save csv with quote") {
withTempDir { dir =>
val csvDir = new File(dir, "csv").getCanonicalPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,42 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
df.collect()
}

test("save API - empty path or illegal path") {
var e = intercept[IllegalArgumentException] {
spark.range(1).coalesce(1).write.format("json").save()
}.getMessage
assert(e.contains("'path' is not specified"))

e = intercept[IllegalArgumentException] {
spark.range(1).coalesce(1).write.json("")
}.getMessage
assert(e.contains("Can not create a Path from an empty string"))
}

test("load API - empty path") {
val expectedErrorMsg = "'path' is not specified"
var e = intercept[IllegalArgumentException] {
spark.read.json()
}.getMessage
assert(e.contains(expectedErrorMsg))

e = intercept[IllegalArgumentException] {
spark.read.format("json").load()
}.getMessage
assert(e.contains(expectedErrorMsg))
}

test("illegal compression") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val df = spark.range(0, 10)
val e = intercept[IllegalArgumentException] {
df.write.option("compression", "illegal").mode("overwrite").format("json").save(path)
}.getMessage
assert(e.contains("Codec [illegal] is not available. Known codecs are"))
}
}

test("Write dates correctly with dateFormat option") {
val customSchema = new StructType(Array(StructField("date", DateType, true)))
withTempDir { dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,42 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}

test("save API - empty path or illegal path") {
var e = intercept[IllegalArgumentException] {
spark.range(1).coalesce(1).write.format("parquet").save()
}.getMessage
assert(e.contains("'path' is not specified"))

e = intercept[IllegalArgumentException] {
spark.range(1).coalesce(1).write.parquet("")
}.getMessage
assert(e.contains("Can not create a Path from an empty string"))
}

test("load API - empty path") {
val expectedErrorMsg = "'path' is not specified"
var e = intercept[IllegalArgumentException] {
spark.read.parquet()
}.getMessage
assert(e.contains(expectedErrorMsg))

e = intercept[IllegalArgumentException] {
spark.read.format("parquet").load()
}.getMessage
assert(e.contains(expectedErrorMsg))
}

test("illegal compression") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val df = spark.range(0, 10)
val e = intercept[IllegalArgumentException] {
df.write.option("compression", "illegal").mode("overwrite").format("parquet").save(path)
}.getMessage
assert(e.contains("Codec [illegal] is not available. Known codecs are"))
}
}

test("SPARK-6315 regression test") {
// Spark 1.1 and prior versions write Spark schema as case class string into Parquet metadata.
// This has been deprecated by JSON format since 1.2. Notice that, 1.3 further refactored data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,35 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
}

test("save API - empty path or illegal path") {
var e = intercept[IllegalArgumentException] {
spark.range(1).coalesce(1).write.format("text").save()
}.getMessage
assert(e.contains("'path' is not specified"))

e = intercept[IllegalArgumentException] {
spark.range(1).coalesce(1).write.text("")
}.getMessage
assert(e.contains("Can not create a Path from an empty string"))
}

test("illegal compression") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val df = spark.range(0, 10).selectExpr("CAST(id AS STRING) AS s")
val e = intercept[IllegalArgumentException] {
df.write.option("compression", "illegal").mode("overwrite").format("text").save(path)
}.getMessage
assert(e.contains("Codec [illegal] is not available. Known codecs are"))
}
}

test("load API - empty path") {
val res = Seq.empty[String].toDF("value")
checkAnswer(spark.read.text(), res)
checkAnswer(spark.read.textFile().toDF(), res)
}

private def testFile: String = {
Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,10 @@ class CatalogSuite
}

test("createExternalTable should fail if path is not given for file-based data source") {
val e = intercept[AnalysisException] {
val e = intercept[IllegalArgumentException] {
spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String])
}
assert(e.message.contains("Unable to infer schema"))
assert(e.getMessage.contains("'path' is not specified"))

val e2 = intercept[AnalysisException] {
spark.catalog.createExternalTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,34 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}

test("illegal format name") {
val e = intercept[ClassNotFoundException] {
spark.read.format("illegal").load("/test")
}
assert(e.getMessage.contains("Failed to find data source: illegal"))
}

test("empty partitionBy") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val input = spark.range(10).toDF()
input.write.format("parquet").mode("overwrite").partitionBy().save(path)
val output = spark.read.parquet(path)
checkAnswer(input, output)
}
}

test("prevent all column partitioning") {
withTempDir { dir =>
val path = dir.getCanonicalPath
intercept[AnalysisException] {
var e = intercept[AnalysisException] {
spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
}
intercept[AnalysisException] {
}.getMessage
assert(e.contains("Cannot use all columns for partition columns"))
e = intercept[AnalysisException] {
spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path)
}
}.getMessage
assert(e.contains("Cannot use all columns for partition columns"))
spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path)
}
}
Expand Down Expand Up @@ -396,9 +415,10 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
val schema = df.schema

// Reader, without user specified schema
intercept[AnalysisException] {
val e = intercept[IllegalArgumentException] {
testRead(spark.read.json(), Seq.empty, schema)
}
}.getMessage
assert(e.contains("'path' is not specified"))
testRead(spark.read.json(dir), data, schema)
testRead(spark.read.json(dir, dir), data ++ data, schema)
testRead(spark.read.json(Seq(dir, dir): _*), data ++ data, schema)
Expand All @@ -422,9 +442,10 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
val schema = df.schema

// Reader, without user specified schema
intercept[AnalysisException] {
val e = intercept[IllegalArgumentException] {
testRead(spark.read.parquet(), Seq.empty, schema)
}
}.getMessage
assert(e.contains("'path' is not specified"))
testRead(spark.read.parquet(dir), data, schema)
testRead(spark.read.parquet(dir, dir), data ++ data, schema)
testRead(spark.read.parquet(Seq(dir, dir): _*), data ++ data, schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[orc] class OrcOptions(@transient private val parameters: Map[String, Str
if (!shortOrcCompressionCodecNames.contains(codecName)) {
val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase)
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
s"is not available. Known codecs are ${availableCodecs.mkString(", ")}.")
}
shortOrcCompressionCodecNames(codecName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,18 +544,18 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}

test("path required error") {
assert(
intercept[AnalysisException] {
sparkSession.catalog.createExternalTable(
"createdJsonTable",
"org.apache.spark.sql.json",
Map.empty[String, String])

table("createdJsonTable")
}.getMessage.contains("Unable to infer schema"),
"We should complain that path is not specified.")
withTable("createdJsonTable") {
assert(
intercept[IllegalArgumentException] {
sparkSession.catalog.createExternalTable(
"createdJsonTable",
"org.apache.spark.sql.json",
Map.empty[String, String])

sql("DROP TABLE IF EXISTS createdJsonTable")
table("createdJsonTable")
}.getMessage.contains("'path' is not specified"),
"We should complain that path is not specified.")
}
}

test("scan a parquet table created through a CTAS statement") {
Expand Down
Loading