Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
7dd8dd5
Adds new interfaces and stub methods for data sources API partitionin…
liancheng Apr 15, 2015
012ed2d
Adds PartitioningOptions
liancheng Apr 15, 2015
aa8ba9a
Javadoc fix
liancheng Apr 15, 2015
1b8231f
Renames FSBasedPrunedFilteredScan to FSBasedRelation
liancheng Apr 15, 2015
3ba9bbf
Adds DataFrame.saveAsTable() overrides which support partitioning
liancheng Apr 15, 2015
770b5ba
Adds tests for FSBasedRelation
liancheng Apr 23, 2015
95d0b4d
Renames PartitionedSchemaRelationProvider to FSBasedRelationProvider
liancheng Apr 23, 2015
5de194a
Forgot Apache licence header
liancheng Apr 23, 2015
9d17607
Adds the contract that OutputWriter should have zero-arg constructor
liancheng Apr 23, 2015
fb5a607
Fixes compilation error
liancheng Apr 23, 2015
3c5073a
Fixes SaveModes used in test cases
liancheng Apr 25, 2015
327bb1d
Implements partitioning support for data sources API
liancheng Apr 29, 2015
ea6c8dd
Removes remote debugging stuff
liancheng Apr 29, 2015
9b487bf
Fixes compilation errors introduced while rebasing
liancheng Apr 29, 2015
b746ab5
More tests
liancheng Apr 29, 2015
f18dec2
More strict schema checking
liancheng Apr 29, 2015
ca1805b
Removes duplicated partition discovery code in new Parquet
liancheng Apr 29, 2015
8d2ff71
Merges partition columns when reading partitioned relations
liancheng Apr 30, 2015
ce52353
Adds new SQLContext.load() overload with user defined dynamic partiti…
liancheng Apr 30, 2015
422ff4a
Fixes style issue
liancheng Apr 30, 2015
f320766
Adds prepareForWrite() hook, refactored writer containers
liancheng May 2, 2015
0bc6ad1
Resorts to new Hadoop API, and now FSBasedRelation can customize outp…
liancheng May 3, 2015
be0c268
Uses TaskAttempContext rather than Configuration in OutputWriter.init
liancheng May 3, 2015
54c3d7b
Enforces that FileOutputFormat must be used
liancheng May 3, 2015
5f423d3
Bug fixes. Lets data source to customize OutputCommitter rather than …
liancheng May 4, 2015
a29e663
Bug fix: should only pass actuall data files to FSBaseRelation.buildScan
liancheng May 4, 2015
c4ed4fe
Bug fixes and a new test suite
liancheng May 5, 2015
51be443
Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.pr…
liancheng May 6, 2015
5849dd0
Fixes doc typos. Fixes partition discovery refresh.
liancheng May 7, 2015
fa543f3
Addresses comments
liancheng May 8, 2015
0b8cd70
Adds Scala/Catalyst row conversion when writing non-partitioned tables
liancheng May 8, 2015
795920a
Fixes compilation error after rebasing
liancheng May 8, 2015
bc3f9b4
Uses projection to separate partition columns and data columns while …
liancheng May 8, 2015
c466de6
Addresses comments
liancheng May 8, 2015
52b0c9b
Adjusts project/MimaExclude.scala
liancheng May 8, 2015
0349e09
Fixes compilation error introduced while rebasing
liancheng May 8, 2015
7552168
Fixes typo in MimaExclude.scala
liancheng May 9, 2015
c71ac6c
Addresses comments from @marmbrus
liancheng May 10, 2015
8d12e69
Fixes compilation error
liancheng May 10, 2015
ad4d4de
Enables HDFS style globbing
liancheng May 10, 2015
348a922
Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPa…
liancheng May 10, 2015
edf49e7
Removed commented stale code block
liancheng May 10, 2015
43ba50e
Avoids serializing generated projection code
liancheng May 11, 2015
1f9b1a5
Tweaks data schema passed to FSBasedRelations
liancheng May 11, 2015
5351a1b
Fixes compilation error introduced while rebasing
liancheng May 12, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.{Logging, SparkConf, SparkException}

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -199,13 +199,43 @@ class SparkHadoopUtil extends Logging {
* that file.
*/
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
def recurse(path: Path): Array[FileStatus] = {
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
listLeafStatuses(fs, fs.getFileStatus(basePath))
}

/**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
*/
def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
}

val baseStatus = fs.getFileStatus(basePath)
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
}

def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
listLeafDirStatuses(fs, fs.getFileStatus(basePath))
}

def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
}

assert(baseStatus.isDir)
recurse(baseStatus)
}

def globPath(pattern: Path): Seq[Path] = {
val fs = pattern.getFileSystem(conf)
Option(fs.globStatus(pattern)).map { statuses =>
statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
}.getOrElse(Seq.empty[Path])
}

/**
Expand Down
14 changes: 13 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,23 @@ object MimaExcludes {
// This `protected[sql]` method was removed in 1.3.1
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.sql.SQLContext.checkAnalysis"),
// This `private[sql]` class was removed in 1.4.0:
// These `private[sql]` class were removed in 1.4.0:
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.execution.AddExchange"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.execution.AddExchange$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.PartitionSpec"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.PartitionSpec$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.Partition"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.Partition$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues$"),
// These test support classes were moved out of src/main and into src/test:
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetTestData"),
Expand Down
107 changes: 96 additions & 11 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import com.fasterxml.jackson.core.JsonFactory

import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, ResolvedStar}
import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
import org.apache.spark.sql.jdbc.JDBCWriteDetails
import org.apache.spark.sql.json.JacksonGenerator
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -400,7 +400,9 @@ class DataFrame private[sql](
joined.left,
joined.right,
joinType = Inner,
Some(EqualTo(joined.left.resolve(usingColumn), joined.right.resolve(usingColumn))))
Some(expressions.EqualTo(
joined.left.resolve(usingColumn),
joined.right.resolve(usingColumn))))
)
}

Expand Down Expand Up @@ -465,8 +467,8 @@ class DataFrame private[sql](
// By the time we get here, since we have already run analysis, all attributes should've been
// resolved and become AttributeReference.
val cond = plan.condition.map { _.transform {
case EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) =>
EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
case expressions.EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) =>
expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
}}
plan.copy(condition = cond)
}
Expand Down Expand Up @@ -1324,6 +1326,28 @@ class DataFrame private[sql](
saveAsTable(tableName, source, mode, options.toMap)
}

/**
* :: Experimental ::
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source, [[SaveMode]] specified by mode, a set of options, and a list of
* partition columns.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
* @group output
*/
@Experimental
def saveAsTable(
tableName: String,
source: String,
mode: SaveMode,
options: java.util.Map[String, String],
partitionColumns: java.util.List[String]): Unit = {
saveAsTable(tableName, source, mode, options.toMap, partitionColumns)
}

/**
* :: Experimental ::
* (Scala-specific)
Expand All @@ -1350,13 +1374,44 @@ class DataFrame private[sql](
tableName,
source,
temporary = false,
Array.empty[String],
mode,
options,
logicalPlan)

sqlContext.executePlan(cmd).toRdd
}

/**
* :: Experimental ::
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source, [[SaveMode]] specified by mode, a set of options, and a list of
* partition columns.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
* @group output
*/
@Experimental
def saveAsTable(
tableName: String,
source: String,
mode: SaveMode,
options: Map[String, String],
partitionColumns: Seq[String]): Unit = {
sqlContext.executePlan(
CreateTableUsingAsSelect(
tableName,
source,
temporary = false,
partitionColumns.toArray,
mode,
options,
logicalPlan)).toRdd
}

/**
* :: Experimental ::
* Saves the contents of this DataFrame to the given path,
Expand Down Expand Up @@ -1417,6 +1472,21 @@ class DataFrame private[sql](
save(source, mode, options.toMap)
}

/**
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source,
* [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
* @group output
*/
@Experimental
def save(
source: String,
mode: SaveMode,
options: java.util.Map[String, String],
partitionColumns: java.util.List[String]): Unit = {
save(source, mode, options.toMap, partitionColumns)
}

/**
* :: Experimental ::
* (Scala-specific)
Expand All @@ -1429,7 +1499,22 @@ class DataFrame private[sql](
source: String,
mode: SaveMode,
options: Map[String, String]): Unit = {
ResolvedDataSource(sqlContext, source, mode, options, this)
ResolvedDataSource(sqlContext, source, Array.empty[String], mode, options, this)
}

/**
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source,
* [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
* @group output
*/
@Experimental
def save(
source: String,
mode: SaveMode,
options: Map[String, String],
partitionColumns: Seq[String]): Unit = {
ResolvedDataSource(sqlContext, source, partitionColumns.toArray, mode, options, this)
}

/**
Expand Down
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ private[spark] object SQLConf {
// to its length exceeds the threshold.
val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold"

// Whether to perform partition discovery when loading external data sources. Default to true.
val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the default value of this setting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default to true.


// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
Expand Down Expand Up @@ -235,6 +238,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def defaultDataSourceName: String =
getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")

private[spark] def partitionDiscoveryEnabled() =
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean

// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
private[spark] def schemaStringLengthThreshold: Int =
Expand Down
36 changes: 34 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def load(source: String, options: Map[String, String]): DataFrame = {
val resolved = ResolvedDataSource(this, None, source, options)
val resolved = ResolvedDataSource(this, None, Array.empty[String], source, options)
DataFrame(this, LogicalRelation(resolved.relation))
}

Expand All @@ -781,6 +781,37 @@ class SQLContext(@transient val sparkContext: SparkContext)
load(source, schema, options.toMap)
}

/**
* :: Experimental ::
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
*/
@Experimental
def load(
source: String,
schema: StructType,
partitionColumns: Array[String],
options: java.util.Map[String, String]): DataFrame = {
load(source, schema, partitionColumns, options.toMap)
}

/**
* :: Experimental ::
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
* @group genericdata
*/
@Experimental
def load(
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem related?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, its just a weird diff since the other function below now has partition columns.

val resolved = ResolvedDataSource(this, Some(schema), Array.empty[String], source, options)
DataFrame(this, LogicalRelation(resolved.relation))
}

/**
* :: Experimental ::
* (Scala-specific) Returns the dataset specified by the given data source and
Expand All @@ -791,8 +822,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
def load(
source: String,
schema: StructType,
partitionColumns: Array[String],
options: Map[String, String]): DataFrame = {
val resolved = ResolvedDataSource(this, Some(schema), source, options)
val resolved = ResolvedDataSource(this, Some(schema), partitionColumns, source, options)
DataFrame(this, LogicalRelation(resolved.relation))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case c: CreateTableUsing if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")

case CreateTableUsingAsSelect(tableName, provider, true, mode, opts, query) =>
val cmd =
CreateTempTableUsingAsSelect(tableName, provider, mode, opts, query)
case CreateTableUsingAsSelect(tableName, provider, true, partitionsCols, mode, opts, query)
if partitionsCols.nonEmpty =>
sys.error("Cannot create temporary partitioned table.")

case CreateTableUsingAsSelect(tableName, provider, true, _, mode, opts, query) =>
val cmd = CreateTempTableUsingAsSelect(
tableName, provider, Array.empty[String], mode, opts, query)
ExecutedCommand(cmd) :: Nil
case c: CreateTableUsingAsSelect if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
Expand Down
Loading