Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@ package org.apache.spark.sql.execution.datasources
import java.io.IOException

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.internal.SQLConf

/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
Expand All @@ -40,20 +34,6 @@ import org.apache.spark.sql.internal.SQLConf
* implementation of [[HadoopFsRelation]] should use this UUID together with task id to generate
* unique file path for each task output file. This UUID is passed to executor side via a
* property named `spark.sql.sources.writeJobUUID`.
*
* Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
* are used to write to normal tables and tables with dynamic partitions.
*
* Basic work flow of this command is:
*
* 1. Driver side setup, including output committer initialization and data source specific
* preparation work for the write job to be issued.
* 2. Issues a write job consists of one or more executor side tasks, each of which writes all
* rows within an RDD partition.
* 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
* exception is thrown during task commitment, also aborts that task.
* 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
* thrown during job commitment, also aborts the job.
*/
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
Expand Down Expand Up @@ -103,52 +83,17 @@ case class InsertIntoHadoopFsRelationCommand(
val isAppend = pathExists && (mode == SaveMode.Append)

if (doInsertion) {
val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)

val partitionSet = AttributeSet(partitionColumns)
val dataColumns = query.output.filterNot(partitionSet.contains)

val queryExecution = Dataset.ofRows(sparkSession, query).queryExecution
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
val relation =
WriteRelation(
sparkSession,
dataColumns.toStructType,
qualifiedOutputPath.toString,
fileFormat.prepareWrite(sparkSession, _, options, dataColumns.toStructType),
bucketSpec)

val writerContainer = if (partitionColumns.isEmpty && bucketSpec.isEmpty) {
new DefaultWriterContainer(relation, job, isAppend)
} else {
new DynamicPartitionWriterContainer(
relation,
job,
partitionColumns = partitionColumns,
dataColumns = dataColumns,
inputSchema = query.output,
PartitioningUtils.DEFAULT_PARTITION_NAME,
sparkSession.sessionState.conf.partitionMaxFiles,
isAppend)
}

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
writerContainer.driverSideSetup()

try {
sparkSession.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)
writerContainer.commitJob()
refreshFunction()
} catch { case cause: Throwable =>
logError("Aborting job.", cause)
writerContainer.abortJob()
throw new SparkException("Job aborted.", cause)
}
}
WriteOutput.write(
sparkSession,
query,
fileFormat,
qualifiedOutputPath,
hadoopConf,
partitionColumns,
bucketSpec,
refreshFunction,
options,
isAppend)
} else {
logInfo("Skipping insertion into a relation that already exists.")
}
Expand Down
Loading