Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
fab24cf
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 1, 2016
8b2e33b
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 5, 2016
2ee1876
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 11, 2016
b9f0090
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 12, 2016
ade6f7e
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 15, 2016
9fd63d2
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 19, 2016
5199d49
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 22, 2016
404214c
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 23, 2016
c001dd9
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 25, 2016
59daa48
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 5, 2016
41d5f64
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 7, 2016
472a6e3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 10, 2016
0fba10a
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 12, 2016
cbf73b3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 21, 2016
c08f561
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 22, 2016
474df88
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 22, 2016
3d9828d
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 24, 2016
72d2361
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 26, 2016
07afea5
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 29, 2016
8bf2007
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 30, 2016
87a165b
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 31, 2016
b9359cd
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 1, 2016
65bd090
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 5, 2016
babf2da
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 5, 2016
9e09469
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 6, 2016
50a8e4a
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 6, 2016
f3337fa
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 10, 2016
09cc36d
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 12, 2016
83a1915
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 14, 2016
0483145
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 19, 2016
236a5f4
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 20, 2016
08aaa4d
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 21, 2016
64f704e
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 24, 2016
006ea2d
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 26, 2016
0c0dc8a
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 27, 2016
7c4b2f0
Merge remote-tracking branch 'upstream/master'
gatorsmile May 1, 2016
38f3af9
Merge remote-tracking branch 'upstream/master'
gatorsmile May 1, 2016
461441c
initial fix.
gatorsmile May 2, 2016
b230e33
address comments
gatorsmile May 3, 2016
bf98150
revert
gatorsmile May 3, 2016
3ebaf73
Merge remote-tracking branch 'upstream/master' into readPartitionedTable
gatorsmile May 4, 2016
252065c
address comments.
gatorsmile May 4, 2016
8089c6f
Merge remote-tracking branch 'upstream/master'
gatorsmile May 4, 2016
a6c7518
Merge remote-tracking branch 'upstream/master'
gatorsmile May 4, 2016
546c1db
Merge remote-tracking branch 'upstream/master'
gatorsmile May 4, 2016
e92e9b2
Merge branch 'readPartitionedTable' into readPartitionedTableNew
gatorsmile May 4, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,37 @@ abstract class PartitioningAwareFileCatalog(
/**
* Contains a set of paths that are considered as the base dirs of the input datasets.
* The partitioning discovery logic will make sure it will stop when it reaches any
* base path. By default, the paths of the dataset provided by users will be base paths.
* For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
* will be `/path/something=true/`, and the returned DataFrame will not contain a column of
* `something`. If users want to override the basePath. They can set `basePath` in the options
* to pass the new base path to the data source.
* For the above example, if the user-provided base path is `/path/`, the returned
* base path.
*
* By default, the paths of the dataset provided by users will be base paths.
* Below are three typical examples,
* Case 1) `sqlContext.read.parquet("/path/something=true/")`: the base path will be
* `/path/something=true/`, and the returned DataFrame will not contain a column of `something`.
* Case 2) `sqlContext.read.parquet("/path/something=true/a.parquet")`: the base path will be
* still `/path/something=true/`, and the returned DataFrame will also not contain a column of
* `something`.
* Case 3) `sqlContext.read.parquet("/path/")`: the base path will be `/path/`, and the returned
* DataFrame will have the column of `something`.
*
* Users also can override the basePath by setting `basePath` in the options to pass the new base
* path to the data source.
* For example, `sqlContext.read.option("basePath", "/path/").parquet("/path/something=true/")`,
* and the returned DataFrame will have the column of `something`.
*/
private def basePaths: Set[Path] = {
val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
userDefinedBasePath.getOrElse {
// If the user does not provide basePath, we will just use paths.
paths.toSet
}.map { hdfsPath =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = hdfsPath.getFileSystem(hadoopConf)
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
parameters.get("basePath").map(new Path(_)) match {
case Some(userDefinedBasePath) =>
val fs = userDefinedBasePath.getFileSystem(hadoopConf)
if (!fs.isDirectory(userDefinedBasePath)) {
throw new IllegalArgumentException("Option 'basePath' must be a directory")
}
Set(fs.makeQualified(userDefinedBasePath))

case None =>
paths.map { path =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path)
if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,29 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
checkThrows[AssertionError]("file://path/a=", "Empty partition column value")
}

test("parse partition with base paths") {
// when the basePaths is the same as the path to a leaf directory
val partitionSpec1: Option[PartitionValues] = parsePartition(
path = new Path("file://path/a=10"),
defaultPartitionName = defaultPartitionName,
typeInference = true,
basePaths = Set(new Path("file://path/a=10")))._1

assert(partitionSpec1.isEmpty)

// when the basePaths is the path to a base directory of leaf directories
val partitionSpec2: Option[PartitionValues] = parsePartition(
path = new Path("file://path/a=10"),
defaultPartitionName = defaultPartitionName,
typeInference = true,
basePaths = Set(new Path("file://path")))._1

assert(partitionSpec2 ==
Option(PartitionValues(
ArrayBuffer("a"),
ArrayBuffer(Literal.create(10, IntegerType)))))
}

test("parse partitions") {
def check(
paths: Seq[String],
Expand Down Expand Up @@ -413,6 +436,43 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}

test("read partitioned table using different path options") {
withTempDir { base =>
val pi = 1
val ps = "foo"
val path = makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)
makeParquetFile(
(1 to 10).map(i => ParquetData(i, i.toString)), path)

// when the input is the base path containing partitioning directories
val baseDf = sqlContext.read.parquet(base.getCanonicalPath)
assert(baseDf.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps"))

// when the input is a path to the leaf directory containing a parquet file
val partDf = sqlContext.read.parquet(path.getCanonicalPath)
assert(partDf.schema.map(_.name) === Seq("intField", "stringField"))

path.listFiles().foreach { f =>
if (f.getName.toLowerCase().endsWith(".parquet")) {
// when the input is a path to a parquet file
val df = sqlContext.read.parquet(f.getCanonicalPath)
assert(df.schema.map(_.name) === Seq("intField", "stringField"))
}
}

path.listFiles().foreach { f =>
if (f.getName.toLowerCase().endsWith(".parquet")) {
// when the input is a path to a parquet file but `basePath` is overridden to
// the base path containing partitioning directories
val df = sqlContext
.read.option("basePath", base.getCanonicalPath)
.parquet(f.getCanonicalPath)
assert(df.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps"))
}
}
}
}

test("read partitioned table - partition key included in Parquet file") {
withTempDir { base =>
for {
Expand Down