Skip to content

Commit ca6ccb2

Browse files
committed
do not create DataSourceReader many times
1 parent b6c50d7 commit ca6ccb2

File tree

2 files changed

+132
-176
lines changed

2 files changed

+132
-176
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 72 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -22,78 +22,57 @@ import scala.collection.JavaConverters._
2222
import org.apache.spark.sql.AnalysisException
2323
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2424
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
25-
import org.apache.spark.sql.catalyst.plans.QueryPlan
2625
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
27-
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
28-
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
26+
import org.apache.spark.sql.sources.DataSourceRegister
2927
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
30-
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics}
28+
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics}
3129
import org.apache.spark.sql.types.StructType
3230

31+
/**
32+
* A logical plan representing a data source v2 scan.
33+
*
34+
* @param source An instance of a [[DataSourceV2]] implementation.
35+
* @param options The options for this scan. Used to create fresh [[DataSourceReader]].
36+
* @param userSpecifiedSchema The user-specified schema for this scan. Used to create fresh
37+
* [[DataSourceReader]].
38+
* @param optimizedReader An optimized [[DataSourceReader]] which is produced by the optimizer rule
39+
* [[PushDownOperatorsToDataSource]]. It is a temporary value and is excluded
40+
* in the equality definition of this class. It is to avoid re-applying
41+
* operators pushdown and re-creating [[DataSourceReader]] when we copy
42+
* the relation during query plan transformation.
43+
* @param pushedFilters The filters that are pushed down to the data source.
44+
*/
3345
case class DataSourceV2Relation(
46+
output: Seq[AttributeReference],
3447
source: DataSourceV2,
3548
options: Map[String, String],
36-
projection: Seq[AttributeReference],
37-
filters: Option[Seq[Expression]] = None,
38-
userSpecifiedSchema: Option[StructType] = None)
49+
userSpecifiedSchema: Option[StructType],
50+
optimizedReader: Option[DataSourceReader] = None,
51+
pushedFilters: Seq[Expression] = Nil)
3952
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
4053

4154
import DataSourceV2Relation._
4255

43-
override def simpleString: String = "RelationV2 " + metadataString
44-
45-
override lazy val schema: StructType = reader.readSchema()
56+
def createFreshReader: DataSourceReader = source.createReader(options, userSpecifiedSchema)
4657

47-
override lazy val output: Seq[AttributeReference] = {
48-
// use the projection attributes to avoid assigning new ids. fields that are not projected
49-
// will be assigned new ids, which is okay because they are not projected.
50-
val attrMap = projection.map(a => a.name -> a).toMap
51-
schema.map(f => attrMap.getOrElse(f.name,
52-
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
53-
}
58+
def reader: DataSourceReader = optimizedReader.getOrElse(createFreshReader)
5459

55-
private lazy val v2Options: DataSourceOptions = makeV2Options(options)
56-
57-
// postScanFilters: filters that need to be evaluated after the scan.
58-
// pushedFilters: filters that will be pushed down and evaluated in the underlying data sources.
59-
// Note: postScanFilters and pushedFilters can overlap, e.g. the parquet row group filter.
60-
lazy val (
61-
reader: DataSourceReader,
62-
postScanFilters: Seq[Expression],
63-
pushedFilters: Seq[Expression]) = {
64-
val newReader = userSpecifiedSchema match {
65-
case Some(s) =>
66-
source.asReadSupportWithSchema.createReader(s, v2Options)
67-
case _ =>
68-
source.asReadSupport.createReader(v2Options)
69-
}
70-
71-
DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType)
72-
73-
val (postScanFilters, pushedFilters) = filters match {
74-
case Some(filterSeq) =>
75-
DataSourceV2Relation.pushFilters(newReader, filterSeq)
76-
case _ =>
77-
(Nil, Nil)
78-
}
79-
logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
80-
logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
60+
override def simpleString: String = "RelationV2 " + metadataString
8161

82-
(newReader, postScanFilters, pushedFilters)
62+
override def equals(other: Any): Boolean = other match {
63+
case other: DataSourceV2Relation =>
64+
output == other.output && source.getClass == other.source.getClass &&
65+
options == other.options && userSpecifiedSchema == other.userSpecifiedSchema &&
66+
pushedFilters == other.pushedFilters
67+
case _ => false
8368
}
8469

85-
override def doCanonicalize(): LogicalPlan = {
86-
val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
87-
88-
// override output with canonicalized output to avoid attempting to configure a reader
89-
val canonicalOutput: Seq[AttributeReference] = this.output
90-
.map(a => QueryPlan.normalizeExprId(a, projection))
91-
92-
new DataSourceV2Relation(c.source, c.options, c.projection) {
93-
override lazy val output: Seq[AttributeReference] = canonicalOutput
94-
}
70+
override def hashCode(): Int = {
71+
Seq(output, source.getClass, options, userSpecifiedSchema, pushedFilters).hashCode()
9572
}
9673

74+
// `LogicalPlanStats` caches the computed statistics, so we are fine here even the
75+
// `optimizedReader` is None. We won't create `DataSourceReader` many times.
9776
override def computeStats(): Statistics = reader match {
9877
case r: SupportsReportStatistics =>
9978
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
@@ -102,9 +81,7 @@ case class DataSourceV2Relation(
10281
}
10382

10483
override def newInstance(): DataSourceV2Relation = {
105-
// projection is used to maintain id assignment.
106-
// if projection is not set, use output so the copy is not equal to the original
107-
copy(projection = projection.map(_.newInstance()))
84+
copy(output = output.map(_.newInstance()))
10885
}
10986
}
11087

@@ -150,111 +127,57 @@ case class StreamingDataSourceV2Relation(
150127
}
151128

152129
object DataSourceV2Relation {
130+
153131
private implicit class SourceHelpers(source: DataSourceV2) {
154-
def asReadSupport: ReadSupport = {
155-
source match {
156-
case support: ReadSupport =>
157-
support
158-
case _: ReadSupportWithSchema =>
159-
// this method is only called if there is no user-supplied schema. if there is no
160-
// user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
161-
throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
162-
case _ =>
163-
throw new AnalysisException(s"Data source is not readable: $name")
164-
}
165-
}
166132

167-
def asReadSupportWithSchema: ReadSupportWithSchema = {
168-
source match {
169-
case support: ReadSupportWithSchema =>
170-
support
171-
case _: ReadSupport =>
172-
throw new AnalysisException(
173-
s"Data source does not support user-supplied schema: $name")
174-
case _ =>
175-
throw new AnalysisException(s"Data source is not readable: $name")
176-
}
133+
private def asReadSupport: ReadSupport = source match {
134+
case support: ReadSupport =>
135+
support
136+
case _: ReadSupportWithSchema =>
137+
// this method is only called if there is no user-supplied schema. if there is no
138+
// user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
139+
throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
140+
case _ =>
141+
throw new AnalysisException(s"Data source is not readable: $name")
177142
}
178143

179-
def name: String = {
180-
source match {
181-
case registered: DataSourceRegister =>
182-
registered.shortName()
183-
case _ =>
184-
source.getClass.getSimpleName
185-
}
144+
private def asReadSupportWithSchema: ReadSupportWithSchema = source match {
145+
case support: ReadSupportWithSchema =>
146+
support
147+
case _: ReadSupport =>
148+
throw new AnalysisException(
149+
s"Data source does not support user-supplied schema: $name")
150+
case _ =>
151+
throw new AnalysisException(s"Data source is not readable: $name")
186152
}
187-
}
188153

189-
private def makeV2Options(options: Map[String, String]): DataSourceOptions = {
190-
new DataSourceOptions(options.asJava)
191-
}
192154

193-
private def schema(
194-
source: DataSourceV2,
195-
v2Options: DataSourceOptions,
196-
userSchema: Option[StructType]): StructType = {
197-
val reader = userSchema match {
198-
case Some(s) =>
199-
source.asReadSupportWithSchema.createReader(s, v2Options)
155+
private def name: String = source match {
156+
case registered: DataSourceRegister =>
157+
registered.shortName()
200158
case _ =>
201-
source.asReadSupport.createReader(v2Options)
159+
source.getClass.getSimpleName
160+
}
161+
162+
def createReader(
163+
options: Map[String, String],
164+
userSpecifiedSchema: Option[StructType]): DataSourceReader = {
165+
val v2Options = new DataSourceOptions(options.asJava)
166+
userSpecifiedSchema match {
167+
case Some(s) =>
168+
asReadSupportWithSchema.createReader(s, v2Options)
169+
case _ =>
170+
asReadSupport.createReader(v2Options)
171+
}
202172
}
203-
reader.readSchema()
204173
}
205174

206175
def create(
207176
source: DataSourceV2,
208177
options: Map[String, String],
209-
filters: Option[Seq[Expression]] = None,
210-
userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
211-
val projection = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
212-
DataSourceV2Relation(source, options, projection, filters, userSpecifiedSchema)
213-
}
214-
215-
private def pushRequiredColumns(reader: DataSourceReader, struct: StructType): Unit = {
216-
reader match {
217-
case projectionSupport: SupportsPushDownRequiredColumns =>
218-
projectionSupport.pruneColumns(struct)
219-
case _ =>
220-
}
221-
}
222-
223-
private def pushFilters(
224-
reader: DataSourceReader,
225-
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
226-
reader match {
227-
case r: SupportsPushDownCatalystFilters =>
228-
val postScanFilters = r.pushCatalystFilters(filters.toArray)
229-
val pushedFilters = r.pushedCatalystFilters()
230-
(postScanFilters, pushedFilters)
231-
232-
case r: SupportsPushDownFilters =>
233-
// A map from translated data source filters to original catalyst filter expressions.
234-
val translatedFilterToExpr = scala.collection.mutable.HashMap.empty[Filter, Expression]
235-
// Catalyst filter expression that can't be translated to data source filters.
236-
val untranslatableExprs = scala.collection.mutable.ArrayBuffer.empty[Expression]
237-
238-
for (filterExpr <- filters) {
239-
val translated = DataSourceStrategy.translateFilter(filterExpr)
240-
if (translated.isDefined) {
241-
translatedFilterToExpr(translated.get) = filterExpr
242-
} else {
243-
untranslatableExprs += filterExpr
244-
}
245-
}
246-
247-
// Data source filters that need to be evaluated again after scanning. which means
248-
// the data source cannot guarantee the rows returned can pass these filters.
249-
// As a result we must return it so Spark can plan an extra filter operator.
250-
val postScanFilters =
251-
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
252-
// The filters which are marked as pushed to this data source
253-
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
254-
255-
(untranslatableExprs ++ postScanFilters, pushedFilters)
256-
257-
case _ => (filters, Nil)
258-
}
178+
userSpecifiedSchema: Option[StructType]): DataSourceV2Relation = {
179+
val reader = source.createReader(options, userSpecifiedSchema)
180+
DataSourceV2Relation(
181+
reader.readSchema().toAttributes, source, options, userSpecifiedSchema)
259182
}
260183
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala

Lines changed: 60 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,81 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20-
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression}
2123
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2224
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
2325
import org.apache.spark.sql.catalyst.rules.Rule
26+
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
27+
import org.apache.spark.sql.sources
28+
import org.apache.spark.sql.sources.v2.reader.{SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
2429

2530
object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
2631
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
2732
// PhysicalOperation guarantees that filters are deterministic; no need to check
2833
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
29-
assert(relation.filters.isEmpty, "data source v2 should do push down only once.")
34+
val newReader = relation.createFreshReader
35+
var newRelation = relation.copy(optimizedReader = Some(newReader))
3036

31-
val projectAttrs = project.map(_.toAttribute)
32-
val projectSet = AttributeSet(project.flatMap(_.references))
33-
val filterSet = AttributeSet(filters.flatMap(_.references))
37+
val postScanFilters: Seq[Expression] = newReader match {
38+
case r: SupportsPushDownCatalystFilters =>
39+
val postScanFilters = r.pushCatalystFilters(filters.toArray)
40+
newRelation.copy(pushedFilters = r.pushedCatalystFilters())
41+
postScanFilters
3442

35-
val projection = if (filterSet.subsetOf(projectSet) &&
36-
AttributeSet(projectAttrs) == projectSet) {
37-
// When the required projection contains all of the filter columns and column pruning alone
38-
// can produce the required projection, push the required projection.
39-
// A final projection may still be needed if the data source produces a different column
40-
// order or if it cannot prune all of the nested columns.
41-
projectAttrs
42-
} else {
43-
// When there are filter columns not already in the required projection or when the required
44-
// projection is more complicated than column pruning, base column pruning on the set of
45-
// all columns needed by both.
46-
(projectSet ++ filterSet).toSeq
43+
case r: SupportsPushDownFilters =>
44+
// A map from translated data source filters to original catalyst filter expressions.
45+
val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression]
46+
// Catalyst filter expression that can't be translated to data source filters.
47+
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]
48+
49+
for (filterExpr <- filters) {
50+
val translated = DataSourceStrategy.translateFilter(filterExpr)
51+
if (translated.isDefined) {
52+
translatedFilterToExpr(translated.get) = filterExpr
53+
} else {
54+
untranslatableExprs += filterExpr
55+
}
56+
}
57+
58+
// Data source filters that need to be evaluated again after scanning. which means
59+
// the data source cannot guarantee the rows returned can pass these filters.
60+
// As a result we must return it so Spark can plan an extra filter operator.
61+
val postScanFilters =
62+
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
63+
// The filters which are marked as pushed to this data source
64+
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
65+
newRelation = newRelation.copy(pushedFilters = pushedFilters)
66+
untranslatableExprs ++ postScanFilters
67+
68+
case _ => filters
4769
}
4870

49-
val newRelation = relation.copy(
50-
projection = projection.asInstanceOf[Seq[AttributeReference]],
51-
filters = Some(filters))
71+
newReader match {
72+
case r: SupportsPushDownRequiredColumns =>
73+
val requiredColumns = AttributeSet(
74+
project.flatMap(_.references) ++ postScanFilters.flatMap(_.references))
75+
val neededOutput = relation.output.filter(requiredColumns.contains)
76+
if (neededOutput != relation.output) {
77+
r.pruneColumns(neededOutput.toStructType)
78+
val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
79+
val newOutput = r.readSchema().toAttributes.map {
80+
// We have to keep the attribute id during transformation.
81+
a => a.withExprId(nameToAttr(a.name).exprId)
82+
}
83+
newRelation = newRelation.copy(output = newOutput)
84+
}
5285

53-
// Add a Filter for any filters that need to be evaluated after scan.
54-
val postScanFilterCond = newRelation.postScanFilters.reduceLeftOption(And)
55-
val filtered = postScanFilterCond.map(Filter(_, newRelation)).getOrElse(newRelation)
86+
case _ =>
87+
}
5688

57-
// Add a Project to ensure the output matches the required projection
58-
if (newRelation.output != projectAttrs) {
59-
Project(project, filtered)
89+
val filterCondition = postScanFilters.reduceLeftOption(And)
90+
val withFilter = filterCondition.map(Filter(_, newRelation)).getOrElse(newRelation)
91+
if (withFilter.output == project) {
92+
withFilter
6093
} else {
61-
filtered
94+
Project(project, withFilter)
6295
}
6396

6497
case other => other.mapChildren(apply)

0 commit comments

Comments
 (0)