Skip to content

Commit 327bb1d

Browse files
committed
Implements partitioning support for data sources API
1 parent 3c5073a commit 327bb1d

File tree

20 files changed

+936
-134
lines changed

20 files changed

+936
-134
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,22 @@ import java.lang.reflect.Method
2222
import java.security.PrivilegedExceptionAction
2323
import java.util.{Arrays, Comparator}
2424

25+
import scala.collection.JavaConversions._
26+
import scala.concurrent.duration._
27+
import scala.language.postfixOps
28+
2529
import com.google.common.primitives.Longs
2630
import org.apache.hadoop.conf.Configuration
27-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
2831
import org.apache.hadoop.fs.FileSystem.Statistics
32+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
2933
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
3034
import org.apache.hadoop.mapred.JobConf
3135
import org.apache.hadoop.mapreduce.JobContext
3236
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3337

34-
import org.apache.spark.{Logging, SparkConf, SparkException}
3538
import org.apache.spark.annotation.DeveloperApi
3639
import org.apache.spark.util.Utils
37-
38-
import scala.collection.JavaConversions._
39-
import scala.concurrent.duration._
40-
import scala.language.postfixOps
40+
import org.apache.spark.{Logging, SparkConf, SparkException}
4141

4242
/**
4343
* :: DeveloperApi ::
@@ -199,13 +199,36 @@ class SparkHadoopUtil extends Logging {
199199
* that file.
200200
*/
201201
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
202-
def recurse(path: Path): Array[FileStatus] = {
203-
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
204-
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
202+
listLeafStatuses(fs, fs.getFileStatus(basePath))
203+
}
204+
205+
/**
206+
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
207+
* given path points to a file, return a single-element collection containing [[FileStatus]] of
208+
* that file.
209+
*/
210+
def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
211+
def recurse(status: FileStatus): Seq[FileStatus] = {
212+
val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
213+
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
214+
}
215+
216+
if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
217+
}
218+
219+
def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
220+
listLeafDirStatuses(fs, fs.getFileStatus(basePath))
221+
}
222+
223+
def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
224+
def recurse(status: FileStatus): Seq[FileStatus] = {
225+
val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
226+
val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
227+
leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
205228
}
206229

207-
val baseStatus = fs.getFileStatus(basePath)
208-
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
230+
assert(baseStatus.isDir)
231+
recurse(baseStatus)
209232
}
210233

211234
/**
@@ -275,7 +298,7 @@ class SparkHadoopUtil extends Logging {
275298
logDebug(text + " matched " + HADOOP_CONF_PATTERN)
276299
val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. }
277300
val eval = Option[String](hadoopConf.get(key))
278-
.map { value =>
301+
.map { value =>
279302
logDebug("Substituted " + matched + " with " + value)
280303
text.replace(matched, value)
281304
}

project/SparkBuild.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,7 @@ object TestSettings {
548548
javaOptions in Test += "-Dspark.driver.allowMultipleContexts=true",
549549
javaOptions in Test += "-Dspark.unsafe.exceptionOnMemoryLeak=true",
550550
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
551+
javaOptions in Test += "-agentlib:jdwp=transport=dt_socket,server=n,address=127.0.0.1:5005",
551552
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
552553
.map { case (k,v) => s"-D$k=$v" }.toSeq,
553554
javaOptions in Test += "-ea",

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,23 @@ import scala.reflect.runtime.universe.TypeTag
2727
import scala.util.control.NonFatal
2828

2929
import com.fasterxml.jackson.core.JsonFactory
30-
3130
import org.apache.commons.lang3.StringUtils
31+
3232
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3333
import org.apache.spark.api.java.JavaRDD
3434
import org.apache.spark.api.python.SerDeUtil
3535
import org.apache.spark.rdd.RDD
36-
import org.apache.spark.storage.StorageLevel
37-
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
38-
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, ResolvedStar}
36+
import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedAttribute, UnresolvedRelation}
3937
import org.apache.spark.sql.catalyst.expressions._
40-
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
41-
import org.apache.spark.sql.catalyst.plans.logical._
38+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
39+
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
40+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
4241
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
4342
import org.apache.spark.sql.jdbc.JDBCWriteDetails
4443
import org.apache.spark.sql.json.JacksonGenerator
44+
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, ResolvedDataSource}
4545
import org.apache.spark.sql.types._
46-
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
46+
import org.apache.spark.storage.StorageLevel
4747
import org.apache.spark.util.Utils
4848

4949

@@ -1372,6 +1372,7 @@ class DataFrame private[sql](
13721372
tableName,
13731373
source,
13741374
temporary = false,
1375+
Array.empty[String],
13751376
mode,
13761377
options,
13771378
logicalPlan)
@@ -1473,7 +1474,7 @@ class DataFrame private[sql](
14731474
mode: SaveMode,
14741475
options: java.util.Map[String, String],
14751476
partitionColumns: java.util.List[String]): Unit = {
1476-
???
1477+
save(source, mode, options.toMap, partitionColumns)
14771478
}
14781479

14791480
/**
@@ -1488,7 +1489,7 @@ class DataFrame private[sql](
14881489
source: String,
14891490
mode: SaveMode,
14901491
options: Map[String, String]): Unit = {
1491-
ResolvedDataSource(sqlContext, source, mode, options, this)
1492+
ResolvedDataSource(sqlContext, source, Array.empty[String], mode, options, this)
14921493
}
14931494

14941495
/**
@@ -1503,7 +1504,7 @@ class DataFrame private[sql](
15031504
mode: SaveMode,
15041505
options: Map[String, String],
15051506
partitionColumns: Seq[String]): Unit = {
1506-
???
1507+
ResolvedDataSource(sqlContext, source, partitionColumns.toArray, mode, options, this)
15071508
}
15081509

15091510
/**

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ private[spark] object SQLConf {
6666
// to its length exceeds the threshold.
6767
val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold"
6868

69+
// Whether to perform partition discovery when loading external data sources.
70+
val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
71+
6972
// Whether to perform eager analysis when constructing a dataframe.
7073
// Set to false when debugging requires the ability to look at invalid query plans.
7174
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
762762
*/
763763
@Experimental
764764
def load(source: String, options: Map[String, String]): DataFrame = {
765-
val resolved = ResolvedDataSource(this, None, source, options)
765+
val resolved = ResolvedDataSource(this, None, Array.empty[String], source, options)
766766
DataFrame(this, LogicalRelation(resolved.relation))
767767
}
768768

@@ -792,7 +792,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
792792
source: String,
793793
schema: StructType,
794794
options: Map[String, String]): DataFrame = {
795-
val resolved = ResolvedDataSource(this, Some(schema), source, options)
795+
val resolved = ResolvedDataSource(this, Some(schema), Array.empty[String], source, options)
796796
DataFrame(this, LogicalRelation(resolved.relation))
797797
}
798798

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
343343
case c: CreateTableUsing if c.temporary && c.allowExisting =>
344344
sys.error("allowExisting should be set to false when creating a temporary table.")
345345

346-
case CreateTableUsingAsSelect(tableName, provider, true, mode, opts, query) =>
347-
val cmd =
348-
CreateTempTableUsingAsSelect(tableName, provider, mode, opts, query)
346+
case CreateTableUsingAsSelect(tableName, provider, true, partitionsCols, mode, opts, query)
347+
if partitionsCols.nonEmpty =>
348+
sys.error("Cannot create temporary partitioned table.")
349+
350+
case CreateTableUsingAsSelect(tableName, provider, true, _, mode, opts, query) =>
351+
val cmd = CreateTempTableUsingAsSelect(
352+
tableName, provider, Array.empty[String], mode, opts, query)
349353
ExecutedCommand(cmd) :: Nil
350354
case c: CreateTableUsingAsSelect if !c.temporary =>
351355
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,6 @@ private[sql] class DefaultSource
136136
}
137137
}
138138

139-
private[sql] case class Partition(values: Row, path: String)
140-
141-
private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
142-
143139
/**
144140
* An alternative to [[ParquetRelation]] that plugs in using the data sources API. This class is
145141
* intended as a full replacement of the Parquet support in Spark SQL. The old implementation will
@@ -805,7 +801,7 @@ private[sql] object ParquetRelation2 extends Logging {
805801
val ordinalMap = metastoreSchema.zipWithIndex.map {
806802
case (field, index) => field.name.toLowerCase -> index
807803
}.toMap
808-
val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
804+
val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
809805
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
810806

811807
StructType(metastoreSchema.zip(reorderedParquetSchema).map {

sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,25 @@ private[sql] object DataSourceStrategy extends Strategy {
5353
filters,
5454
(a, _) => t.buildScan(a)) :: Nil
5555

56+
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) =>
57+
val selectedPartitions = prunePartitions(filters, t.partitionSpec)
58+
val inputPaths = selectedPartitions.map(_.path).toArray
59+
60+
// Don't push down predicates that reference partition columns
61+
val pushedFilters = {
62+
val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
63+
filters.filter { f =>
64+
val referencedColumnNames = f.references.map(_.name).toSet
65+
referencedColumnNames.intersect(partitionColumnNames).isEmpty
66+
}
67+
}
68+
69+
pruneFilterProject(
70+
l,
71+
projectList,
72+
pushedFilters,
73+
(a, f) => t.buildScan(a, f, inputPaths)) :: Nil
74+
5675
case l @ LogicalRelation(t: TableScan) =>
5776
createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
5877

@@ -63,6 +82,33 @@ private[sql] object DataSourceStrategy extends Strategy {
6382
case _ => Nil
6483
}
6584

85+
protected def prunePartitions(
86+
predicates: Seq[Expression],
87+
partitionSpec: PartitionSpec): Seq[Partition] = {
88+
val PartitionSpec(partitionColumns, partitions) = partitionSpec
89+
val partitionColumnNames = partitionColumns.map(_.name).toSet
90+
val partitionPruningPredicates = predicates.filter {
91+
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
92+
}
93+
94+
if (partitionPruningPredicates.nonEmpty) {
95+
val predicate =
96+
partitionPruningPredicates
97+
.reduceOption(expressions.And)
98+
.getOrElse(Literal(true))
99+
100+
val boundPredicate = InterpretedPredicate(predicate.transform {
101+
case a: AttributeReference =>
102+
val index = partitionColumns.indexWhere(a.name == _.name)
103+
BoundReference(index, partitionColumns(index).dataType, nullable = true)
104+
})
105+
106+
partitions.filter { case Partition(values, _) => boundPredicate(values) }
107+
} else {
108+
partitions
109+
}
110+
}
111+
66112
// Based on Public API.
67113
protected def pruneFilterProject(
68114
relation: LogicalRelation,

0 commit comments

Comments
 (0)