Skip to content

Conversation

@Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Oct 21, 2019

What changes were proposed in this pull request?

When user defined a base path which is not an ancestor directory for all the input paths,
throw exception immediately.

Why are the changes needed?

Assuming that we have a DataFrame[c1, c2] be written out in parquet and partitioned by c1.

When using spark.read.parquet("/path/to/data/c1=1") to read the data, we'll have a DataFrame with column c2 only.

But if we use spark.read.option("basePath", "/path/from").parquet("/path/to/data/c1=1") to
read the data, we'll have a DataFrame with column c1 and c2.

This's happens because a wrong base path does not actually work in parsePartition(), so paring would continue until it reaches a directory without "=".

And I think the result of the second read way doesn't make sense.

Does this PR introduce any user-facing change?

Yes, with this change, user would hit IllegalArgumentException when given a wrong base path while previous behavior doesn't.

How was this patch tested?

Added UT.

@SparkQA
Copy link

SparkQA commented Oct 21, 2019

Test build #112393 has finished for PR 26195 at commit 746a97e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 21, 2019

Test build #112403 has finished for PR 26195 at commit f7ecb15.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Oct 22, 2019

Failed test:

org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.multiple joins

Error Message

org.scalatest.exceptions.TestFailedException: 2 did not equal 1
Stacktrace

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 2 did not equal 1
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:503)
	at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.checkNumLocalShuffleReaders(AdaptiveQueryExecSuite.scala:83)
	at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$new$10(AdaptiveQueryExecSuite.scala:167)
	at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:47)
	at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:31)
	at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(AdaptiveQueryExecSuite.scala:27)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:231)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:229)
	at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.withSQLConf(AdaptiveQueryExecSuite.scala:27)
	at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$new$9(AdaptiveQueryExecSuite.scala:151)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
	at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
	at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
	at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
	at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
	at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite.run(Suite.scala:1124)
	at org.scalatest.Suite.run$(Suite.scala:1106)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
	at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:317)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:510)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

It's weird. I can not pass it even in master branch, though locally.

@HeartSaVioR
Copy link
Contributor

I've already filed https://issues.apache.org/jira/browse/SPARK-29538 for that issue and left a comment in #26157. Let's see how it goes.

@Ngone51
Copy link
Member Author

Ngone51 commented Oct 28, 2019

retest this please.

@SparkQA
Copy link

SparkQA commented Oct 28, 2019

Test build #112776 has started for PR 26195 at commit f7ecb15.

@Ngone51
Copy link
Member Author

Ngone51 commented Nov 27, 2019

cc @cloud-fan Please take a look, thanks.

@cloud-fan
Copy link
Contributor

when did we add basePath? I have no idea what it is...

@SparkQA
Copy link

SparkQA commented Nov 27, 2019

Test build #114520 has finished for PR 26195 at commit 617ab8f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Nov 27, 2019

@cloud-fan

when did we add basePath? I have no idea what it is...

After tracking the code history, I think it was introduced in #9651 from Spark 1.6.

And here's documentation for basePath(ref: http://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery):

Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths by default. For the above example, if users pass path/to/table/gender=male to either SparkSession.read.parquet or SparkSession.read.load, gender will not be considered as a partitioning column. If users need to specify the base path that partition discovery should start with, they can set basePath in the data source options. For example, when path/to/table/gender=male is the path of the data and users set basePath to path/to/table/, gender will be a partitioning column.

"driver side must not be negative"))
}

test ("SPARK-29537: throw exception when user defined a wrong base path") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add an end-to-end test with DataFrameReader

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 261b9ad

def qualifiedPath(path: Path): Path = path.makeQualified(fs.getUri, fs.getWorkingDirectory)

val qualifiedBasePath = qualifiedPath(userDefinedBasePath)
rootPaths.find(p => !qualifiedPath(p).toString.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to check sub-path using some FS APIs instead of relying on path string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find it either in Path or FileSystem.


val qualifiedBasePath = qualifiedPath(userDefinedBasePath)
rootPaths.find(p => !qualifiedPath(p).toString.
startsWith(qualifiedBasePath.toString)) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indent is off here. But can you just use .find(...).foreach(rp => ...?
Or require(rootPaths.forall(p => qualifiedPath(p)...), "error message")

@SparkQA
Copy link

SparkQA commented Nov 28, 2019

Test build #114560 has finished for PR 26195 at commit 66f0bd3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
def qualifiedPath(path: Path): Path = path.makeQualified(fs.getUri, fs.getWorkingDirectory)

val qualifiedBasePath = qualifiedPath(userDefinedBasePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call toString here, to avoid calling toString later many times

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can even call toString in qualifiedPath and remove the needs to call .toString altogether.

throw new IllegalArgumentException(
s"Wrong basePath $userDefinedBasePath for the root path: $rp")
}
Set(fs.makeQualified(userDefinedBasePath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simply Set(qualifiedBasePath) as we now calculated it before; if we want to change qualifiedPath() to return String, Set(new Path(qualifiedBasePath)).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

}
def qualifiedPath(path: Path): Path = path.makeQualified(fs.getUri, fs.getWorkingDirectory)

val qualifiedBasePath = qualifiedPath(userDefinedBasePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can even call toString in qualifiedPath and remove the needs to call .toString altogether.

throw new IllegalArgumentException(
s"Wrong basePath $userDefinedBasePath for the root path: $rp")
}
Set(new Path(qualifiedBasePath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should reduce overhead as possible as we can

val qualifiedBasePath = fs.makeQualified(userDefinedBasePath)
val qualifiedBasePathStr = qualifiedBasePath.toString
rootPaths.find...
Set(qualifiedBasePath)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see.

val qualifiedBasePath = fs.makeQualified(userDefinedBasePath)
val qualifiedBasePathStr = qualifiedBasePath.toString
rootPaths
.find(!fs.makeQualified(_).toString.startsWith(qualifiedBasePathStr))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review note: I've inlined the qualified() function into find() clause.

@SparkQA
Copy link

SparkQA commented Dec 2, 2019

Test build #114729 has finished for PR 26195 at commit e270fea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2019

Test build #114732 has finished for PR 26195 at commit e889cda.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan closed this in 075ae1e Dec 3, 2019
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@Ngone51
Copy link
Member Author

Ngone51 commented Dec 3, 2019

attilapiros pushed a commit to attilapiros/spark that referenced this pull request Dec 6, 2019
### What changes were proposed in this pull request?

When user defined a base path which is not an ancestor directory for all the input paths,
throw exception immediately.

### Why are the changes needed?

Assuming that we have a DataFrame[c1, c2] be written out in parquet and partitioned by c1.

When using `spark.read.parquet("/path/to/data/c1=1")` to read the data, we'll have a DataFrame with column c2 only.

But if we use `spark.read.option("basePath", "/path/from").parquet("/path/to/data/c1=1")` to
read the data, we'll have a DataFrame with column c1 and c2.

This's happens because a wrong base path does not actually work in `parsePartition()`, so paring would continue until it reaches a directory without "=".

And I think the result of the second read way doesn't make sense.

### Does this PR introduce any user-facing change?

Yes, with this change, user would hit `IllegalArgumentException ` when given a wrong base path while previous behavior doesn't.

### How was this patch tested?

Added UT.

Closes apache#26195 from Ngone51/dev-wrong-basePath.

Lead-authored-by: wuyi <[email protected]>
Co-authored-by: wuyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants