Skip to content

Commit f9c9986

Browse files
committed
address comments
1 parent d16dbab commit f9c9986

File tree

5 files changed

+14
-15
lines changed

5 files changed

+14
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ abstract class FileScan(
3333
sparkSession: SparkSession,
3434
fileIndex: PartitioningAwareFileIndex,
3535
readDataSchema: StructType,
36-
readPartitionSchema: StructType,
37-
options: CaseInsensitiveStringMap) extends Scan with Batch {
36+
readPartitionSchema: StructType) extends Scan with Batch {
3837
/**
3938
* Returns whether a file with `path` could be split or not.
4039
*/
@@ -46,10 +45,10 @@ abstract class FileScan(
4645
val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty)
4746
val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions)
4847
val partitionAttributes = fileIndex.partitionSchema.toAttributes
49-
val attributeMap = partitionAttributes.map(a => getAttributeName(a) -> a).toMap
50-
val readPartitionAttributes = readPartitionSchema.toAttributes.map { readAttr =>
51-
attributeMap.get(getAttributeName(readAttr)).getOrElse {
52-
throw new AnalysisException(s"Can't find required partition column ${readAttr.name} " +
48+
val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap
49+
val readPartitionAttributes = readPartitionSchema.map { readField =>
50+
attributeMap.get(normalizeName(readField.name)).getOrElse {
51+
throw new AnalysisException(s"Can't find required partition column ${readField.name} " +
5352
s"in partition schema ${fileIndex.partitionSchema}")
5453
}
5554
}
@@ -88,11 +87,11 @@ abstract class FileScan(
8887

8988
private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
9089

91-
private def getAttributeName(a: AttributeReference): String = {
90+
private def normalizeName(name: String): String = {
9291
if (isCaseSensitive) {
93-
a.name
92+
name
9493
} else {
95-
a.name.toLowerCase(Locale.ROOT)
94+
name.toLowerCase(Locale.ROOT)
9695
}
9796
}
9897
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ abstract class TextBasedFileScan(
3232
readDataSchema: StructType,
3333
readPartitionSchema: StructType,
3434
options: CaseInsensitiveStringMap)
35-
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) {
35+
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
3636
private var codecFactory: CompressionCodecFactory = _
3737

3838
override def isSplitable(path: Path): Boolean = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import org.apache.spark.util.SerializableConfiguration
3333
* @param sqlConf SQL configuration.
3434
* @param broadcastedConf Broadcasted serializable Hadoop Configuration.
3535
* @param dataSchema Schema of CSV files.
36+
* @param readDataSchema Required data schema in the batch scan.
3637
* @param partitionSchema Schema of partitions.
37-
* @param readSchema Required schema in the batch scan.
3838
* @param parsedOptions Options for parsing CSV files.
3939
*/
4040
case class CSVPartitionReaderFactory(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,16 @@ import org.apache.spark.util.SerializableConfiguration
4646
* @param sqlConf SQL configuration.
4747
* @param broadcastedConf Broadcast serializable Hadoop Configuration.
4848
* @param dataSchema Schema of orc files.
49+
* @param readDataSchema Required data schema in the batch scan.
4950
* @param partitionSchema Schema of partitions.
50-
* @param readSchema Required schema in the batch scan.
5151
*/
5252
case class OrcPartitionReaderFactory(
5353
sqlConf: SQLConf,
5454
broadcastedConf: Broadcast[SerializableConfiguration],
55-
resultSchema: StructType,
5655
dataSchema: StructType,
5756
readDataSchema: StructType,
5857
partitionSchema: StructType) extends FilePartitionReaderFactory {
58+
private val resultSchema = StructType(readDataSchema.fields ++ partitionSchema.fields)
5959
private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
6060
private val capacity = sqlConf.orcVectorizedReaderBatchSize
6161

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ case class OrcScan(
3535
readDataSchema: StructType,
3636
readPartitionSchema: StructType,
3737
options: CaseInsensitiveStringMap)
38-
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) {
38+
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
3939
override def isSplitable(path: Path): Boolean = true
4040

4141
override def createReaderFactory(): PartitionReaderFactory = {
4242
val broadcastedConf = sparkSession.sparkContext.broadcast(
4343
new SerializableConfiguration(hadoopConf))
4444
// The partition values are already truncated in `FileScan.partitions`.
4545
// We should use `readPartitionSchema` as the partition schema here.
46-
OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, readSchema(),
46+
OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
4747
dataSchema, readDataSchema, readPartitionSchema)
4848
}
4949
}

0 commit comments

Comments
 (0)