Skip to content

Commit 6d23af6

Browse files
committed
[SPARK-5049][SQL] Fix ordering of partition columns in ParquetTableScan
Followup to #3870. Props to rahulaggarwalguavus for identifying the issue. Author: Michael Armbrust <[email protected]> Closes #3990 from marmbrus/SPARK-5049 and squashes the following commits: dd03e4e [Michael Armbrust] Fill in the partition values of parquet scans instead of using JoinedRow (cherry picked from commit 5d9fa55) Signed-off-by: Michael Armbrust <[email protected]>
1 parent 5970f0b commit 6d23af6

File tree

3 files changed

+41
-18
lines changed

3 files changed

+41
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import parquet.schema.MessageType
2828

2929
import org.apache.spark.sql.SQLContext
3030
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
31-
import org.apache.spark.sql.catalyst.expressions.Attribute
31+
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute}
3232
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
3333

3434
/**
@@ -67,6 +67,8 @@ private[sql] case class ParquetRelation(
6767
conf,
6868
sqlContext.isParquetBinaryAsString)
6969

70+
lazy val attributeMap = AttributeMap(output.map(o => o -> o))
71+
7072
override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
7173

7274
// Equals must also take into account the output attributes so that we can distinguish between

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,17 @@ case class ParquetTableScan(
6363
// The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
6464
// by exprId. note: output cannot be transient, see
6565
// https://issues.apache.org/jira/browse/SPARK-1367
66-
val normalOutput =
67-
attributes
68-
.filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId))
69-
.flatMap(a => relation.output.find(o => o.exprId == a.exprId))
66+
val output = attributes.map(relation.attributeMap)
7067

71-
val partOutput =
72-
attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId))
68+
// A mapping of ordinals partitionRow -> finalOutput.
69+
val requestedPartitionOrdinals = {
70+
val partitionAttributeOrdinals = AttributeMap(relation.partitioningAttributes.zipWithIndex)
7371

74-
def output = partOutput ++ normalOutput
75-
76-
assert(normalOutput.size + partOutput.size == attributes.size,
77-
s"$normalOutput + $partOutput != $attributes, ${relation.output}")
72+
attributes.zipWithIndex.flatMap {
73+
case (attribute, finalOrdinal) =>
74+
partitionAttributeOrdinals.get(attribute).map(_ -> finalOrdinal)
75+
}
76+
}.toArray
7877

7978
override def execute(): RDD[Row] = {
8079
import parquet.filter2.compat.FilterCompat.FilterPredicateCompat
@@ -96,7 +95,7 @@ case class ParquetTableScan(
9695
// Store both requested and original schema in `Configuration`
9796
conf.set(
9897
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
99-
ParquetTypesConverter.convertToString(normalOutput))
98+
ParquetTypesConverter.convertToString(output))
10099
conf.set(
101100
RowWriteSupport.SPARK_ROW_SCHEMA,
102101
ParquetTypesConverter.convertToString(relation.output))
@@ -124,7 +123,7 @@ case class ParquetTableScan(
124123
classOf[Row],
125124
conf)
126125

127-
if (partOutput.nonEmpty) {
126+
if (requestedPartitionOrdinals.nonEmpty) {
128127
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
129128
val partValue = "([^=]+)=([^=]+)".r
130129
val partValues =
@@ -137,15 +136,25 @@ case class ParquetTableScan(
137136
case _ => None
138137
}.toMap
139138

139+
// Convert the partitioning attributes into the correct types
140140
val partitionRowValues =
141-
partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
141+
relation.partitioningAttributes
142+
.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
142143

143144
new Iterator[Row] {
144-
private[this] val joinedRow = new JoinedRow5(Row(partitionRowValues:_*), null)
145-
146145
def hasNext = iter.hasNext
147-
148-
def next() = joinedRow.withRight(iter.next()._2)
146+
def next() = {
147+
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
148+
149+
// Parquet will leave partitioning columns empty, so we fill them in here.
150+
var i = 0
151+
while (i < requestedPartitionOrdinals.size) {
152+
row(requestedPartitionOrdinals(i)._2) =
153+
partitionRowValues(requestedPartitionOrdinals(i)._1)
154+
i += 1
155+
}
156+
row
157+
}
149158
}
150159
}
151160
} else {

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,18 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
174174
}
175175

176176
Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
177+
test(s"ordering of the partitioning columns $table") {
178+
checkAnswer(
179+
sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
180+
Seq.fill(10)((1, "part-1"))
181+
)
182+
183+
checkAnswer(
184+
sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
185+
Seq.fill(10)(("part-1", 1))
186+
)
187+
}
188+
177189
test(s"project the partitioning column $table") {
178190
checkAnswer(
179191
sql(s"SELECT p, count(*) FROM $table group by p"),

0 commit comments

Comments
 (0)