Skip to content

Commit bac215a

Browse files
marmbrusCodingCat
authored andcommitted
[SPARK-8890] [SQL] Fallback on sorting when writing many dynamic partitions
Previously, we would open a new file for each new dynamic written out using `HadoopFsRelation`. For formats like parquet this is very costly due to the buffers required to get good compression. In this PR I refactor the code allowing us to fall back on an external sort when many partitions are seen. As such each task will open no more than `spark.sql.sources.maxFiles` files. I also did the following cleanup: - Instead of keying the file HashMap on an expensive to compute string representation of the partition, we now use a fairly cheap UnsafeProjection that avoids heap allocations. - The control flow for instantiating and invoking a writer container has been simplified. Now instead of switching in two places based on the use of partitioning, the specific writer container must implement a single method `writeRows` that is invoked using `runJob`. - `InternalOutputWriter` has been removed. Instead we have a `private[sql]` method `writeInternal` that converts and calls the public method. This method can be overridden by internal datasources to avoid the conversion. This change remove a lot of code duplication and per-row `asInstanceOf` checks. - `commands.scala` has been split up. Author: Michael Armbrust <[email protected]> Closes apache#8010 from marmbrus/fsWriting and squashes the following commits: 00804fe [Michael Armbrust] use shuffleMemoryManager.pageSizeBytes 775cc49 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into fsWriting 17b690e [Michael Armbrust] remove comment 40f0372 [Michael Armbrust] address comments f5675bd [Michael Armbrust] char -> string 7e2d0a4 [Michael Armbrust] make sure we close current writer 8100100 [Michael Armbrust] delete empty commands.scala 71cc717 [Michael Armbrust] update comment 8ec75ac [Michael Armbrust] [SPARK-8890][SQL] Fallback on sorting when writing many dynamic partitions
1 parent b89243b commit bac215a

File tree

10 files changed

+715
-623
lines changed

10 files changed

+715
-623
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,17 +366,21 @@ private[spark] object SQLConf {
366366
"storing additional schema information in Hive's metastore.",
367367
isPublic = false)
368368

369-
// Whether to perform partition discovery when loading external data sources. Default to true.
370369
val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
371370
defaultValue = Some(true),
372371
doc = "When true, automtically discover data partitions.")
373372

374-
// Whether to perform partition column type inference. Default to true.
375373
val PARTITION_COLUMN_TYPE_INFERENCE =
376374
booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
377375
defaultValue = Some(true),
378376
doc = "When true, automatically infer the data types for partitioned columns.")
379377

378+
val PARTITION_MAX_FILES =
379+
intConf("spark.sql.sources.maxConcurrentWrites",
380+
defaultValue = Some(5),
381+
doc = "The maximum number of concurent files to open before falling back on sorting when " +
382+
"writing out files using dynamic partitioning.")
383+
380384
// The output committer class used by HadoopFsRelation. The specified class needs to be a
381385
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
382386
//
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import java.io.IOException
21+
import java.util.{Date, UUID}
22+
23+
import scala.collection.JavaConversions.asScalaIterator
24+
25+
import org.apache.hadoop.fs.Path
26+
import org.apache.hadoop.mapreduce._
27+
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
28+
import org.apache.spark._
29+
import org.apache.spark.mapred.SparkHadoopMapRedUtil
30+
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
31+
import org.apache.spark.sql._
32+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
33+
import org.apache.spark.sql.catalyst.expressions._
34+
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
35+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
36+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
37+
import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
38+
import org.apache.spark.sql.sources._
39+
import org.apache.spark.sql.types.StringType
40+
import org.apache.spark.util.{Utils, SerializableConfiguration}
41+
42+
43+
/**
44+
* Inserts the results of `query` in to a relation that extends [[InsertableRelation]].
45+
*/
46+
private[sql] case class InsertIntoDataSource(
47+
logicalRelation: LogicalRelation,
48+
query: LogicalPlan,
49+
overwrite: Boolean)
50+
extends RunnableCommand {
51+
52+
override def run(sqlContext: SQLContext): Seq[Row] = {
53+
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
54+
val data = DataFrame(sqlContext, query)
55+
// Apply the schema of the existing table to the new data.
56+
val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
57+
relation.insert(df, overwrite)
58+
59+
// Invalidate the cache.
60+
sqlContext.cacheManager.invalidateCache(logicalRelation)
61+
62+
Seq.empty[Row]
63+
}
64+
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import java.io.IOException
21+
22+
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.mapreduce._
24+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
25+
import org.apache.spark._
26+
import org.apache.spark.sql._
27+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
28+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
29+
import org.apache.spark.sql.catalyst.InternalRow
30+
import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
31+
import org.apache.spark.sql.sources._
32+
import org.apache.spark.util.Utils
33+
34+
35+
/**
36+
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
37+
* Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
38+
* single write job, and owns a UUID that identifies this job. Each concrete implementation of
39+
* [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
40+
* each task output file. This UUID is passed to executor side via a property named
41+
* `spark.sql.sources.writeJobUUID`.
42+
*
43+
* Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
44+
* are used to write to normal tables and tables with dynamic partitions.
45+
*
46+
* Basic work flow of this command is:
47+
*
48+
* 1. Driver side setup, including output committer initialization and data source specific
49+
* preparation work for the write job to be issued.
50+
* 2. Issues a write job consists of one or more executor side tasks, each of which writes all
51+
* rows within an RDD partition.
52+
* 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
53+
* exception is thrown during task commitment, also aborts that task.
54+
* 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
55+
* thrown during job commitment, also aborts the job.
56+
*/
57+
private[sql] case class InsertIntoHadoopFsRelation(
58+
@transient relation: HadoopFsRelation,
59+
@transient query: LogicalPlan,
60+
mode: SaveMode)
61+
extends RunnableCommand {
62+
63+
override def run(sqlContext: SQLContext): Seq[Row] = {
64+
require(
65+
relation.paths.length == 1,
66+
s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
67+
68+
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
69+
val outputPath = new Path(relation.paths.head)
70+
val fs = outputPath.getFileSystem(hadoopConf)
71+
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
72+
73+
val pathExists = fs.exists(qualifiedOutputPath)
74+
val doInsertion = (mode, pathExists) match {
75+
case (SaveMode.ErrorIfExists, true) =>
76+
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
77+
case (SaveMode.Overwrite, true) =>
78+
Utils.tryOrIOException {
79+
if (!fs.delete(qualifiedOutputPath, true /* recursively */)) {
80+
throw new IOException(s"Unable to clear output " +
81+
s"directory $qualifiedOutputPath prior to writing to it")
82+
}
83+
}
84+
true
85+
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
86+
true
87+
case (SaveMode.Ignore, exists) =>
88+
!exists
89+
case (s, exists) =>
90+
throw new IllegalStateException(s"unsupported save mode $s ($exists)")
91+
}
92+
// If we are appending data to an existing dir.
93+
val isAppend = pathExists && (mode == SaveMode.Append)
94+
95+
if (doInsertion) {
96+
val job = new Job(hadoopConf)
97+
job.setOutputKeyClass(classOf[Void])
98+
job.setOutputValueClass(classOf[InternalRow])
99+
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
100+
101+
// A partitioned relation schema's can be different from the input logicalPlan, since
102+
// partition columns are all moved after data column. We Project to adjust the ordering.
103+
// TODO: this belongs in the analyzer.
104+
val project = Project(
105+
relation.schema.map(field => UnresolvedAttribute.quoted(field.name)), query)
106+
val queryExecution = DataFrame(sqlContext, project).queryExecution
107+
108+
SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
109+
val df = sqlContext.internalCreateDataFrame(queryExecution.toRdd, relation.schema)
110+
val partitionColumns = relation.partitionColumns.fieldNames
111+
112+
// Some pre-flight checks.
113+
require(
114+
df.schema == relation.schema,
115+
s"""DataFrame must have the same schema as the relation to which is inserted.
116+
|DataFrame schema: ${df.schema}
117+
|Relation schema: ${relation.schema}
118+
""".stripMargin)
119+
val partitionColumnsInSpec = relation.partitionColumns.fieldNames
120+
require(
121+
partitionColumnsInSpec.sameElements(partitionColumns),
122+
s"""Partition columns mismatch.
123+
|Expected: ${partitionColumnsInSpec.mkString(", ")}
124+
|Actual: ${partitionColumns.mkString(", ")}
125+
""".stripMargin)
126+
127+
val writerContainer = if (partitionColumns.isEmpty) {
128+
new DefaultWriterContainer(relation, job, isAppend)
129+
} else {
130+
val output = df.queryExecution.executedPlan.output
131+
val (partitionOutput, dataOutput) =
132+
output.partition(a => partitionColumns.contains(a.name))
133+
134+
new DynamicPartitionWriterContainer(
135+
relation,
136+
job,
137+
partitionOutput,
138+
dataOutput,
139+
output,
140+
PartitioningUtils.DEFAULT_PARTITION_NAME,
141+
sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES),
142+
isAppend)
143+
}
144+
145+
// This call shouldn't be put into the `try` block below because it only initializes and
146+
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
147+
writerContainer.driverSideSetup()
148+
149+
try {
150+
sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writerContainer.writeRows _)
151+
writerContainer.commitJob()
152+
relation.refresh()
153+
} catch { case cause: Throwable =>
154+
logError("Aborting job.", cause)
155+
writerContainer.abortJob()
156+
throw new SparkException("Job aborted.", cause)
157+
}
158+
}
159+
} else {
160+
logInfo("Skipping insertion into a relation that already exists.")
161+
}
162+
163+
Seq.empty[Row]
164+
}
165+
}

0 commit comments

Comments
 (0)