Skip to content

Commit 140eddb

Browse files
committed
WriterContainer refactoring.
1 parent f39852e commit 140eddb

File tree

3 files changed

+341
-358
lines changed

3 files changed

+341
-358
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 12 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,12 @@ package org.apache.spark.sql.execution.datasources
2020
import java.io.IOException
2121

2222
import org.apache.hadoop.fs.Path
23-
import org.apache.hadoop.mapreduce._
24-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
2523

26-
import org.apache.spark._
2724
import org.apache.spark.sql._
2825
import org.apache.spark.sql.catalyst.catalog.BucketSpec
29-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
26+
import org.apache.spark.sql.catalyst.expressions.Attribute
3027
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
31-
import org.apache.spark.sql.catalyst.InternalRow
32-
import org.apache.spark.sql.execution.SQLExecution
3328
import org.apache.spark.sql.execution.command.RunnableCommand
34-
import org.apache.spark.sql.internal.SQLConf
3529

3630
/**
3731
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
@@ -103,52 +97,17 @@ case class InsertIntoHadoopFsRelationCommand(
10397
val isAppend = pathExists && (mode == SaveMode.Append)
10498

10599
if (doInsertion) {
106-
val job = Job.getInstance(hadoopConf)
107-
job.setOutputKeyClass(classOf[Void])
108-
job.setOutputValueClass(classOf[InternalRow])
109-
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
110-
111-
val partitionSet = AttributeSet(partitionColumns)
112-
val dataColumns = query.output.filterNot(partitionSet.contains)
113-
114-
val queryExecution = Dataset.ofRows(sparkSession, query).queryExecution
115-
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
116-
val relation =
117-
WriteRelation(
118-
sparkSession,
119-
dataColumns.toStructType,
120-
qualifiedOutputPath.toString,
121-
fileFormat.prepareWrite(sparkSession, _, options, dataColumns.toStructType),
122-
bucketSpec)
123-
124-
val writerContainer = if (partitionColumns.isEmpty && bucketSpec.isEmpty) {
125-
new DefaultWriterContainer(relation, job, isAppend)
126-
} else {
127-
new DynamicPartitionWriterContainer(
128-
relation,
129-
job,
130-
partitionColumns = partitionColumns,
131-
dataColumns = dataColumns,
132-
inputSchema = query.output,
133-
PartitioningUtils.DEFAULT_PARTITION_NAME,
134-
sparkSession.sessionState.conf.partitionMaxFiles,
135-
isAppend)
136-
}
137-
138-
// This call shouldn't be put into the `try` block below because it only initializes and
139-
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
140-
writerContainer.driverSideSetup()
141-
142-
try {
143-
sparkSession.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)
144-
writerContainer.commitJob()
145-
refreshFunction()
146-
} catch { case cause: Throwable =>
147-
logError("Aborting job.", cause)
148-
writerContainer.abortJob()
149-
throw new SparkException("Job aborted.", cause)
150-
}
151-
}
100+
WriteOutput.write(
101+
sparkSession,
102+
query,
103+
fileFormat,
104+
outputPath,
105+
hadoopConf,
106+
partitionColumns,
107+
bucketSpec,
108+
refreshFunction,
109+
options,
110+
isAppend)
152111
} else {
153112
logInfo("Skipping insertion into a relation that already exists.")
154113
}

0 commit comments

Comments
 (0)