Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = Some(table))

val result = try {
dataSource.write(mode, df)
dataSource.writeAndRead(mode, df)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table $tableName in $mode mode", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,82 @@ case class DataSource(
relation
}

/** Writes the given [[DataFrame]] out to this [[DataSource]]. */
def write(
mode: SaveMode,
data: DataFrame): BaseRelation = {
/**
* Writes the given [[DataFrame]] out in this [[FileFormat]].
*/
private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${allPaths.mkString(", ")}")
}

val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(
data.schema, partitionColumns, caseSensitive)

// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumns = Try {
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
}
}

// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val columns = partitionColumns.map { name =>
val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitionKeys = Map.empty,
customPartitionLocations = Map.empty,
partitionColumns = columns,
bucketSpec = bucketSpec,
fileFormat = format,
refreshFunction = _ => Unit, // No existing table needs to be refreshed.
options = options,
query = data.logicalPlan,
mode = mode,
catalogTable = catalogTable)
sparkSession.sessionState.executePlan(plan).toRdd
}

/**
* Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for
* the following reading.
*/
def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
Expand All @@ -425,74 +497,27 @@ case class DataSource(
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${allPaths.mkString(", ")}")
}

val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(
data.schema, partitionColumns, caseSensitive)

// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumns = Try {
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
}
}

// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val columns = partitionColumns.map { name =>
val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitionKeys = Map.empty,
customPartitionLocations = Map.empty,
partitionColumns = columns,
bucketSpec = bucketSpec,
fileFormat = format,
refreshFunction = _ => Unit, // No existing table needs to be refreshed.
options = options,
query = data.logicalPlan,
mode = mode,
catalogTable = catalogTable)
sparkSession.sessionState.executePlan(plan).toRdd
writeInFileFormat(format, mode, data)
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}

/**
* Writes the given [[DataFrame]] out to this [[DataSource]].
*/
def write(mode: SaveMode, data: DataFrame): Unit = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}

providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
writeInFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,12 @@ class PartitionedTablePerfStatsSuite
}

private def setupPartitionedHiveTable(
tableName: String, dir: File, scale: Int,
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)

if (clearMetricsBeforeCreate) {
HiveCatalogMetrics.reset()
}

spark.sql(s"""
|create external table $tableName (fieldOne long)
|partitioned by (partCol1 int, partCol2 int)
Expand All @@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite
}

private def setupPartitionedDatasourceTable(
tableName: String, dir: File, scale: Int,
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)

if (clearMetricsBeforeCreate) {
HiveCatalogMetrics.reset()
}

spark.sql(s"""
|create table $tableName (fieldOne long, partCol1 int, partCol2 int)
|using parquet
Expand Down Expand Up @@ -271,8 +261,8 @@ class PartitionedTablePerfStatsSuite
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable(
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
HiveCatalogMetrics.reset()
setupPartitionedDatasourceTable("test", dir, scale = 10, repair = false)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
Expand All @@ -285,8 +275,7 @@ class PartitionedTablePerfStatsSuite
withTable("test") {
withTempDir { dir =>
HiveCatalogMetrics.reset()
setupPartitionedHiveTable(
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
setupPartitionedHiveTable("test", dir, scale = 10, repair = false)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
Expand Down Expand Up @@ -416,12 +405,8 @@ class PartitionedTablePerfStatsSuite
})
executorPool.shutdown()
executorPool.awaitTermination(30, TimeUnit.SECONDS)
// check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and
// METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect,
// only one thread can really do the build, so the listing job count is 2, the other
// one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
}
}
}
Expand Down