Skip to content

Commit 40f0372

Browse files
committed
address comments
1 parent f5675bd commit 40f0372

File tree

3 files changed

+9
-21
lines changed

3 files changed

+9
-21
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ private[spark] object SQLConf {
369369
doc = "When true, automatically infer the data types for partitioned columns.")
370370

371371
val PARTITION_MAX_FILES =
372-
intConf("spark.sql.sources.maxFiles",
372+
intConf("spark.sql.sources.maxConcurrentWrites",
373373
defaultValue = Some(5),
374374
doc = "The maximum number of concurent files to open before falling back on sorting when " +
375375
"writing out files using dynamic partitioning.")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,18 @@
1818
package org.apache.spark.sql.execution.datasources
1919

2020
import java.io.IOException
21-
import java.util.{Date, UUID}
22-
23-
import scala.collection.JavaConversions.asScalaIterator
2421

2522
import org.apache.hadoop.fs.Path
2623
import org.apache.hadoop.mapreduce._
27-
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
24+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
2825
import org.apache.spark._
29-
import org.apache.spark.mapred.SparkHadoopMapRedUtil
30-
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
3126
import org.apache.spark.sql._
3227
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
33-
import org.apache.spark.sql.catalyst.expressions._
34-
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
3528
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
36-
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
29+
import org.apache.spark.sql.catalyst.InternalRow
3730
import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
3831
import org.apache.spark.sql.sources._
39-
import org.apache.spark.sql.types.StringType
40-
import org.apache.spark.util.{Utils, SerializableConfiguration}
32+
import org.apache.spark.util.Utils
4133

4234

4335
/**
@@ -109,14 +101,11 @@ private[sql] case class InsertIntoHadoopFsRelation(
109101
// We create a DataFrame by applying the schema of relation to the data to make sure.
110102
// We are writing data based on the expected schema,
111103

112-
// For partitioned relation r, r.schema's column ordering can be different from the column
113-
// ordering of data.logicalPlan (partition columns are all moved after data column). We
114-
// need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
115-
// safely apply the schema of r.schema to the data.
116-
104+
// A partitioned relation schema's can be different from the input logicalPlan, since
105+
// partition columns are all moved after data column. We Project to adjust the ordering.
117106
// TODO: this belongs in the analyzer.
118107
val project = Project(
119-
relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
108+
relation.schema.map(field => UnresolvedAttribute.quoted(field.name)), query)
120109
val queryExecution = DataFrame(sqlContext, project).queryExecution
121110

122111
SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
@@ -128,14 +117,14 @@ private[sql] case class InsertIntoHadoopFsRelation(
128117
df.schema == relation.schema,
129118
s"""DataFrame must have the same schema as the relation to which is inserted.
130119
|DataFrame schema: ${df.schema}
131-
|Relation schema: ${relation.schema}
120+
|Relation schema: ${relation.schema}
132121
""".stripMargin)
133122
val partitionColumnsInSpec = relation.partitionColumns.fieldNames
134123
require(
135124
partitionColumnsInSpec.sameElements(partitionColumns),
136125
s"""Partition columns mismatch.
137126
|Expected: ${partitionColumnsInSpec.mkString(", ")}
138-
|Actual: ${partitionColumns.mkString(", ")}
127+
|Actual: ${partitionColumns.mkString(", ")}
139128
""".stripMargin)
140129

141130
val writerContainer = if (partitionColumns.isEmpty) {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ private[sql] abstract class BaseWriterContainer(
4444
with Logging
4545
with Serializable {
4646

47-
protected val needsConversion = relation.needConversion
4847
protected val dataSchema = relation.dataSchema
4948

5049
protected val serializableConf = new SerializableConfiguration(job.getConfiguration)

0 commit comments

Comments
 (0)