Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import parquet.schema.MessageType

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}

/**
Expand Down Expand Up @@ -67,6 +67,8 @@ private[sql] case class ParquetRelation(
conf,
sqlContext.isParquetBinaryAsString)

lazy val attributeMap = AttributeMap(output.map(o => o -> o))

override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]

// Equals must also take into account the output attributes so that we can distinguish between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,17 @@ case class ParquetTableScan(
// The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
// by exprId. note: output cannot be transient, see
// https://issues.apache.org/jira/browse/SPARK-1367
val normalOutput =
attributes
.filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId))
.flatMap(a => relation.output.find(o => o.exprId == a.exprId))
val output = attributes.map(relation.attributeMap)

val partOutput =
attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId))
// A mapping of ordinals partitionRow -> finalOutput.
val requestedPartitionOrdinals = {
val partitionAttributeOrdinals = AttributeMap(relation.partitioningAttributes.zipWithIndex)

def output = partOutput ++ normalOutput

assert(normalOutput.size + partOutput.size == attributes.size,
s"$normalOutput + $partOutput != $attributes, ${relation.output}")
attributes.zipWithIndex.flatMap {
case (attribute, finalOrdinal) =>
partitionAttributeOrdinals.get(attribute).map(_ -> finalOrdinal)
}
}.toArray

override def execute(): RDD[Row] = {
import parquet.filter2.compat.FilterCompat.FilterPredicateCompat
Expand All @@ -97,7 +96,7 @@ case class ParquetTableScan(
// Store both requested and original schema in `Configuration`
conf.set(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertToString(normalOutput))
ParquetTypesConverter.convertToString(output))
conf.set(
RowWriteSupport.SPARK_ROW_SCHEMA,
ParquetTypesConverter.convertToString(relation.output))
Expand Down Expand Up @@ -125,7 +124,7 @@ case class ParquetTableScan(
classOf[Row],
conf)

if (partOutput.nonEmpty) {
if (requestedPartitionOrdinals.nonEmpty) {
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
val partValue = "([^=]+)=([^=]+)".r
val partValues =
Expand All @@ -138,15 +137,25 @@ case class ParquetTableScan(
case _ => None
}.toMap

// Convert the partitioning attributes into the correct types
val partitionRowValues =
partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
relation.partitioningAttributes
.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))

new Iterator[Row] {
private[this] val joinedRow = new JoinedRow5(Row(partitionRowValues:_*), null)

def hasNext = iter.hasNext

def next() = joinedRow.withRight(iter.next()._2)
def next() = {
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]

// Parquet will leave partitioning columns empty, so we fill them in here.
var i = 0
while (i < requestedPartitionOrdinals.size) {
row(requestedPartitionOrdinals(i)._2) =
partitionRowValues(requestedPartitionOrdinals(i)._1)
i += 1
}
row
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
}

Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
test(s"ordering of the partitioning columns $table") {
checkAnswer(
sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
Seq.fill(10)((1, "part-1"))
)

checkAnswer(
sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
Seq.fill(10)(("part-1", 1))
)
}

test(s"project the partitioning column $table") {
checkAnswer(
sql(s"SELECT p, count(*) FROM $table group by p"),
Expand Down