-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23203][SQL]: DataSourceV2: Use immutable logical plans. #20387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fd5adbb
afbb1fb
b23bbf3
fd9551a
969ae23
0192a67
14cabd7
cd08775
3f0aca1
1533bdf
53ffa4f
bf762b4
c6b3701
1a603db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,17 +17,80 @@ | |
|
|
||
| package org.apache.spark.sql.execution.datasources.v2 | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation | ||
| import org.apache.spark.sql.catalyst.expressions.AttributeReference | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} | ||
| import org.apache.spark.sql.sources.v2.reader._ | ||
| import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} | ||
| import org.apache.spark.sql.catalyst.plans.QueryPlan | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} | ||
| import org.apache.spark.sql.execution.datasources.DataSourceStrategy | ||
| import org.apache.spark.sql.sources.{DataSourceRegister, Filter} | ||
| import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} | ||
| import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| case class DataSourceV2Relation( | ||
| output: Seq[AttributeReference], | ||
| reader: DataSourceReader) | ||
| extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { | ||
| source: DataSourceV2, | ||
| options: Map[String, String], | ||
| projection: Seq[AttributeReference], | ||
| filters: Option[Seq[Expression]] = None, | ||
| userSpecifiedSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { | ||
|
|
||
| import DataSourceV2Relation._ | ||
|
|
||
| override def simpleString: String = { | ||
| s"DataSourceV2Relation(source=${source.name}, " + | ||
| s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + | ||
| s"filters=[${pushedFilters.mkString(", ")}], options=$options)" | ||
| } | ||
|
|
||
| override lazy val schema: StructType = reader.readSchema() | ||
|
|
||
| override lazy val output: Seq[AttributeReference] = { | ||
| // use the projection attributes to avoid assigning new ids. fields that are not projected | ||
| // will be assigned new ids, which is okay because they are not projected. | ||
| val attrMap = projection.map(a => a.name -> a).toMap | ||
| schema.map(f => attrMap.getOrElse(f.name, | ||
| AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) | ||
| } | ||
|
|
||
| private lazy val v2Options: DataSourceOptions = makeV2Options(options) | ||
|
|
||
| lazy val ( | ||
| reader: DataSourceReader, | ||
| unsupportedFilters: Seq[Expression], | ||
| pushedFilters: Seq[Expression]) = { | ||
| val newReader = userSpecifiedSchema match { | ||
| case Some(s) => | ||
| source.asReadSupportWithSchema.createReader(s, v2Options) | ||
| case _ => | ||
| source.asReadSupport.createReader(v2Options) | ||
| } | ||
|
|
||
| DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType) | ||
|
|
||
| override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] | ||
| val (remainingFilters, pushedFilters) = filters match { | ||
| case Some(filterSeq) => | ||
| DataSourceV2Relation.pushFilters(newReader, filterSeq) | ||
| case _ => | ||
| (Nil, Nil) | ||
| } | ||
|
|
||
| (newReader, remainingFilters, pushedFilters) | ||
| } | ||
|
|
||
| override def doCanonicalize(): LogicalPlan = { | ||
| val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation] | ||
|
|
||
| // override output with canonicalized output to avoid attempting to configure a reader | ||
| val canonicalOutput: Seq[AttributeReference] = this.output | ||
| .map(a => QueryPlan.normalizeExprId(a, projection)) | ||
|
|
||
| new DataSourceV2Relation(c.source, c.options, c.projection) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is hacky but I don't have a better idea now, let's revisit it later. |
||
| override lazy val output: Seq[AttributeReference] = canonicalOutput | ||
| } | ||
| } | ||
|
|
||
| override def computeStats(): Statistics = reader match { | ||
| case r: SupportsReportStatistics => | ||
|
|
@@ -37,22 +100,147 @@ case class DataSourceV2Relation( | |
| } | ||
|
|
||
| override def newInstance(): DataSourceV2Relation = { | ||
| copy(output = output.map(_.newInstance())) | ||
| // projection is used to maintain id assignment. | ||
| // if projection is not set, use output so the copy is not equal to the original | ||
| copy(projection = projection.map(_.newInstance())) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical | ||
| * to the non-streaming relation. | ||
| */ | ||
| class StreamingDataSourceV2Relation( | ||
| case class StreamingDataSourceV2Relation( | ||
| output: Seq[AttributeReference], | ||
| reader: DataSourceReader) extends DataSourceV2Relation(output, reader) { | ||
| reader: DataSourceReader) | ||
| extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation { | ||
| override def isStreaming: Boolean = true | ||
|
|
||
| override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] | ||
|
|
||
| override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) | ||
|
|
||
| override def computeStats(): Statistics = reader match { | ||
| case r: SupportsReportStatistics => | ||
| Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) | ||
| case _ => | ||
| Statistics(sizeInBytes = conf.defaultSizeInBytes) | ||
| } | ||
| } | ||
|
|
||
| object DataSourceV2Relation { | ||
| def apply(reader: DataSourceReader): DataSourceV2Relation = { | ||
| new DataSourceV2Relation(reader.readSchema().toAttributes, reader) | ||
| private implicit class SourceHelpers(source: DataSourceV2) { | ||
| def asReadSupport: ReadSupport = { | ||
| source match { | ||
| case support: ReadSupport => | ||
| support | ||
| case _: ReadSupportWithSchema => | ||
| // this method is only called if there is no user-supplied schema. if there is no | ||
| // user-supplied schema and ReadSupport was not implemented, throw a helpful exception. | ||
| throw new AnalysisException(s"Data source requires a user-supplied schema: $name") | ||
| case _ => | ||
| throw new AnalysisException(s"Data source is not readable: $name") | ||
| } | ||
| } | ||
|
|
||
| def asReadSupportWithSchema: ReadSupportWithSchema = { | ||
| source match { | ||
| case support: ReadSupportWithSchema => | ||
| support | ||
| case _: ReadSupport => | ||
|
||
| throw new AnalysisException( | ||
| s"Data source does not support user-supplied schema: $name") | ||
| case _ => | ||
| throw new AnalysisException(s"Data source is not readable: $name") | ||
| } | ||
| } | ||
|
|
||
| def name: String = { | ||
| source match { | ||
| case registered: DataSourceRegister => | ||
| registered.shortName() | ||
| case _ => | ||
| source.getClass.getSimpleName | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def makeV2Options(options: Map[String, String]): DataSourceOptions = { | ||
| new DataSourceOptions(options.asJava) | ||
| } | ||
|
|
||
| private def schema( | ||
| source: DataSourceV2, | ||
| v2Options: DataSourceOptions, | ||
| userSchema: Option[StructType]): StructType = { | ||
| val reader = userSchema match { | ||
| // TODO: remove this case because it is confusing for users | ||
| case Some(s) if !source.isInstanceOf[ReadSupportWithSchema] => | ||
| val reader = source.asReadSupport.createReader(v2Options) | ||
| if (reader.readSchema() != s) { | ||
| throw new AnalysisException(s"${source.name} does not allow user-specified schemas.") | ||
| } | ||
| reader | ||
| case Some(s) => | ||
| source.asReadSupportWithSchema.createReader(s, v2Options) | ||
| case _ => | ||
| source.asReadSupport.createReader(v2Options) | ||
| } | ||
| reader.readSchema() | ||
| } | ||
|
|
||
| def create( | ||
| source: DataSourceV2, | ||
| options: Map[String, String], | ||
| filters: Option[Seq[Expression]] = None, | ||
| userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { | ||
| val projection = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes | ||
| DataSourceV2Relation(source, options, projection, filters, | ||
| // if the source does not implement ReadSupportWithSchema, then the userSpecifiedSchema must | ||
| // be equal to the reader's schema. the schema method enforces this. because the user schema | ||
| // and the reader's schema are identical, drop the user schema. | ||
| if (source.isInstanceOf[ReadSupportWithSchema]) userSpecifiedSchema else None) | ||
| } | ||
|
|
||
| private def pushRequiredColumns(reader: DataSourceReader, struct: StructType): Unit = { | ||
| reader match { | ||
| case projectionSupport: SupportsPushDownRequiredColumns => | ||
| projectionSupport.pruneColumns(struct) | ||
| case _ => | ||
| } | ||
| } | ||
|
|
||
| private def pushFilters( | ||
| reader: DataSourceReader, | ||
| filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { | ||
| reader match { | ||
| case catalystFilterSupport: SupportsPushDownCatalystFilters => | ||
| ( | ||
| catalystFilterSupport.pushCatalystFilters(filters.toArray), | ||
| catalystFilterSupport.pushedCatalystFilters() | ||
| ) | ||
|
|
||
| case filterSupport: SupportsPushDownFilters => | ||
| // A map from original Catalyst expressions to corresponding translated data source | ||
| // filters. If a predicate is not in this map, it means it cannot be pushed down. | ||
| val translatedMap: Map[Expression, Filter] = filters.flatMap { p => | ||
| DataSourceStrategy.translateFilter(p).map(f => p -> f) | ||
| }.toMap | ||
|
|
||
| // Catalyst predicate expressions that cannot be converted to data source filters. | ||
| val nonConvertiblePredicates = filters.filterNot(translatedMap.contains) | ||
|
|
||
| // Data source filters that cannot be pushed down. An unhandled filter means | ||
| // the data source cannot guarantee the rows returned can pass the filter. | ||
| // As a result we must return it so Spark can plan an extra filter operator. | ||
| val unhandledFilters = filterSupport.pushFilters(translatedMap.values.toArray).toSet | ||
| val (unhandledPredicates, pushedPredicates) = translatedMap.partition { case (_, f) => | ||
| unhandledFilters.contains(f) | ||
| } | ||
|
|
||
| (nonConvertiblePredicates ++ unhandledPredicates.keys, pushedPredicates.keys.toSeq) | ||
|
|
||
| case _ => (filters, Nil) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reverts #20485 , can we still pass the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll have a look. I didn't realize you'd committed that one already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That commit was not reverted when I rebased. The test is still present and passing: https://github.com/apache/spark/blob/181946d1f1c5889661544830a77bd23c4b4f685a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala#L320-L336
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pulled your code and played with it. So your PR does fix the bug, but in a hacky way. Let's me explain what happened.
QueryPlan.canonicalizedis called, every expression inDataSourceV2Relationis canonicalized, includingDataSourceV2Relation.projection. This means, the attributes inprojectionare all renamed to "none".DataSourceV2Relation.outputis called, which triggers the creation of the reader, and applies filter push down and column pruning. Note that because all attributes are renamed to "none", we are actually pushing invalid filters and columns to data sources.reader.schemaandprojection, to get the actual output. Because all names are "none", it works.However step 2 is pretty dangerous, Spark doesn't define the behavior of pushing invalid filters and columns, especially what
reader.schemashould return after invalid columns are pushed down.I prefer my original fix, which put
outputinDataSourceV2Relation's constructor parameters, and update it when doing column pruning inPushDownOperatorsToDataSource.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it's a bad idea to run push-down here. I fixed this by implementing
doCanonicalizeand returning a node that overrides theoutputval. I think that is cleaner than pulling the logic outside of the relation. There's no need for every place that creates a relation to need to get the output of a reader, which is the only way to determine what the node's output will be.