Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ object LocalRelation {
}
}

case class LocalRelation(output: Seq[Attribute],
data: Seq[InternalRow] = Nil,
// Indicates whether this relation has data from a streaming source.
override val isStreaming: Boolean = false)
case class LocalRelation(
output: Seq[Attribute],
data: Seq[InternalRow] = Nil,
// Indicates whether this relation has data from a streaming source.
override val isStreaming: Boolean = false)
extends LeafNode with analysis.MultiInstanceRelation {

// A local relation must have resolved output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.execution

import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -80,8 +83,13 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
private def getPartitionAttrs(
partitionColumnNames: Seq[String],
relation: LogicalPlan): Seq[Attribute] = {
val partColumns = partitionColumnNames.map(_.toLowerCase).toSet
relation.output.filter(a => partColumns.contains(a.name.toLowerCase))
val attrMap = relation.output.map(_.name.toLowerCase(Locale.ROOT)).zip(relation.output).toMap
partitionColumnNames.map { colName =>
attrMap.getOrElse(colName.toLowerCase(Locale.ROOT),
throw new AnalysisException(s"Unable to find the column `$colName` " +
s"given [${relation.output.map(_.name).mkString(", ")}]")
)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ case class HadoopFsRelation(
}
}

// When data schema and partition schema have the overlapped columns, the output
// schema respects the order of data schema for the overlapped columns, but respect
// the data types of partition schema
val schema: StructType = {
StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.sql.execution

import java.io.File

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
import org.apache.spark.sql.test.SharedSQLContext

class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
Expand Down Expand Up @@ -125,4 +128,23 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
sql("SELECT COUNT(DISTINCT p) FROM t_1000").collect()
}
}

test("Incorrect result caused by the rule OptimizeMetadataOnlyQuery") {
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
withTempPath { path =>
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
.write.json(tablePath.getCanonicalPath)

val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
checkAnswer(df, Row("a", "e", "c"))

val localRelation = df.queryExecution.optimizedPlan.collectFirst {
case l: LocalRelation => l
}
assert(localRelation.nonEmpty, "expect to see a LocalRelation")
assert(localRelation.get.output.map(_.name) == Seq("cOl3", "cOl1", "cOl5"))
}
}
}
}