diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 979a55467ff89..6a3b7df5e0be5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1381,8 +1381,18 @@ object SQLConf { "issues. Turn on this config to insert a local sort before actually doing repartition " + "to generate consistent repartition results. The performance of repartition() may go " + "down since we insert extra local sort before it.") + .booleanConf + .createWithDefault(true) + + val NESTED_SCHEMA_PRUNING_ENABLED = + buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + + "satisfying a query. This optimization allows columnar file format readers to avoid " + + "reading unnecessary nested column data. Currently Parquet is the only data source that " + + "implements this optimization.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val TOP_K_SORT_FALLBACK_THRESHOLD = buildConf("spark.sql.execution.topKSortFallbackThreshold") @@ -1863,6 +1873,8 @@ class SQLConf extends Serializable with Logging { def partitionOverwriteMode: PartitionOverwriteMode.Value = PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE)) + def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED) + def csvColumnPruning: Boolean = getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING) def legacySizeOfNull: Boolean = getConf(SQLConf.LEGACY_SIZE_OF_NULL) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SchemaPruningTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SchemaPruningTest.scala new file mode 100644 index 0000000000000..68e76fc013c18 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SchemaPruningTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.internal.SQLConf.NESTED_SCHEMA_PRUNING_ENABLED + +/** + * A PlanTest that ensures that all tests in this suite are run with nested schema pruning enabled. + * Remove this trait once the default value of SQLConf.NESTED_SCHEMA_PRUNING_ENABLED is set to true. + */ +private[sql] trait SchemaPruningTest extends PlanTest with BeforeAndAfterAll { + private var originalConfSchemaPruningEnabled = false + + override protected def beforeAll(): Unit = { + originalConfSchemaPruningEnabled = conf.nestedSchemaPruningEnabled + conf.setConf(NESTED_SCHEMA_PRUNING_ENABLED, true) + super.beforeAll() + } + + override protected def afterAll(): Unit = { + try { + super.afterAll() + } finally { + conf.setConf(NESTED_SCHEMA_PRUNING_ENABLED, originalConfSchemaPruningEnabled) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GetStructFieldObject.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GetStructFieldObject.scala new file mode 100644 index 0000000000000..c88b2f8c034fc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GetStructFieldObject.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} +import org.apache.spark.sql.types.StructField + +/** + * A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. + * This is in contrast to the [[GetStructField]] case class extractor which returns the field + * ordinal instead of the field itself. + */ +private[execution] object GetStructFieldObject { + def unapply(getStructField: GetStructField): Option[(Expression, StructField)] = + Some(( + getStructField.child, + getStructField.childSchema(getStructField.ordinal))) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala new file mode 100644 index 0000000000000..2236f18b0da12 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and field counts of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. This + * class is motivated by columnar nested schema pruning. + */ +private[execution] case class ProjectionOverSchema(schema: StructType) { + private val fieldNames = schema.fieldNames.toSet + + def unapply(expr: Expression): Option[Expression] = getProjection(expr) + + private def getProjection(expr: Expression): Option[Expression] = + expr match { + case a: AttributeReference if fieldNames.contains(a.name) => + Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier)) + case GetArrayItem(child, arrayItemOrdinal) => + getProjection(child).map { projection => GetArrayItem(projection, arrayItemOrdinal) } + case a: GetArrayStructFields => + getProjection(a.child).map(p => (p, p.dataType)).map { + case (projection, ArrayType(projSchema @ StructType(_), _)) => + GetArrayStructFields(projection, + projSchema(a.field.name), + projSchema.fieldIndex(a.field.name), + projSchema.size, + a.containsNull) + } + case GetMapValue(child, key) => + getProjection(child).map { projection => GetMapValue(projection, key) } + case GetStructFieldObject(child, field: StructField) => + getProjection(child).map(p => (p, p.dataType)).map { + case (projection, projSchema: StructType) => + GetStructField(projection, projSchema.fieldIndex(field.name)) + } + case _ => + None + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala new file mode 100644 index 0000000000000..0e7c593f9fb67 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] from a Catalyst + * complex type extractor. For example, consider a relation with the following schema: + * + * {{{ + * root + * |-- name: struct (nullable = true) + * | |-- first: string (nullable = true) + * | |-- last: string (nullable = true) + * }}} + * + * Further, suppose we take the select expression `name.first`. This will parse into an + * `Alias(child, "first")`. Ignoring the alias, `child` matches the following pattern: + * + * {{{ + * GetStructFieldObject( + * AttributeReference("name", StructType(_), _, _), + * StructField("first", StringType, _, _)) + * }}} + * + * [[SelectedField]] converts that expression into + * + * {{{ + * StructField("name", StructType(Array(StructField("first", StringType)))) + * }}} + * + * by mapping each complex type extractor to a [[org.apache.spark.sql.types.StructField]] with the + * same name as its child (or "parent" going right to left in the select expression) and a data + * type appropriate to the complex type extractor. In our example, the name of the child expression + * is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string + * field named "first". + * + * @param expr the top-level complex type extractor + */ +private[execution] object SelectedField { + def unapply(expr: Expression): Option[StructField] = { + // If this expression is an alias, work on its child instead + val unaliased = expr match { + case Alias(child, _) => child + case expr => expr + } + selectField(unaliased, None) + } + + private def selectField(expr: Expression, fieldOpt: Option[StructField]): Option[StructField] = { + expr match { + // No children. Returns a StructField with the attribute name or None if fieldOpt is None. + case AttributeReference(name, dataType, nullable, metadata) => + fieldOpt.map(field => + StructField(name, wrapStructType(dataType, field), nullable, metadata)) + // Handles case "expr0.field[n]", where "expr0" is of struct type and "expr0.field" is of + // array type. + case GetArrayItem(x @ GetStructFieldObject(child, field @ StructField(name, + dataType, nullable, metadata)), _) => + val childField = fieldOpt.map(field => StructField(name, + wrapStructType(dataType, field), nullable, metadata)).getOrElse(field) + selectField(child, Some(childField)) + // Handles case "expr0.field[n]", where "expr0.field" is of array type. + case GetArrayItem(child, _) => + selectField(child, fieldOpt) + // Handles case "expr0.field.subfield", where "expr0" and "expr0.field" are of array type. + case GetArrayStructFields(child: GetArrayStructFields, + field @ StructField(name, dataType, nullable, metadata), _, _, _) => + val childField = fieldOpt.map(field => StructField(name, + wrapStructType(dataType, field), + nullable, metadata)).orElse(Some(field)) + selectField(child, childField) + // Handles case "expr0.field", where "expr0" is of array type. + case GetArrayStructFields(child, + field @ StructField(name, dataType, nullable, metadata), _, _, _) => + val childField = + fieldOpt.map(field => StructField(name, + wrapStructType(dataType, field), + nullable, metadata)).orElse(Some(field)) + selectField(child, childField) + // Handles case "expr0.field[key]", where "expr0" is of struct type and "expr0.field" is of + // map type. + case GetMapValue(x @ GetStructFieldObject(child, field @ StructField(name, + dataType, + nullable, metadata)), _) => + val childField = fieldOpt.map(field => StructField(name, + wrapStructType(dataType, field), + nullable, metadata)).orElse(Some(field)) + selectField(child, childField) + // Handles case "expr0.field[key]", where "expr0.field" is of map type. + case GetMapValue(child, _) => + selectField(child, fieldOpt) + // Handles case "expr0.field", where expr0 is of struct type. + case GetStructFieldObject(child, + field @ StructField(name, dataType, nullable, metadata)) => + val childField = fieldOpt.map(field => StructField(name, + wrapStructType(dataType, field), + nullable, metadata)).orElse(Some(field)) + selectField(child, childField) + case _ => + None + } + } + + // Constructs a composition of complex types with a StructType(Array(field)) at its core. Returns + // a StructType for a StructType, an ArrayType for an ArrayType and a MapType for a MapType. + private def wrapStructType(dataType: DataType, field: StructField): DataType = { + dataType match { + case _: StructType => + StructType(Array(field)) + case ArrayType(elementType, containsNull) => + ArrayType(wrapStructType(elementType, field), containsNull) + case MapType(keyType, valueType, valueContainsNull) => + MapType(keyType, wrapStructType(valueType, field), valueContainsNull) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 64d3f2cdbfa82..969def7624058 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.ExperimentalMethods import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions +import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaPruning import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate class SparkOptimizer( @@ -31,7 +32,8 @@ class SparkOptimizer( override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ - Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++ + Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+ + Batch("Parquet Schema Pruning", Once, ParquetSchemaPruning)) ++ postHocOptimizationBatches :+ Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala new file mode 100644 index 0000000000000..6a46b5f8edc54 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = + if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) + } else { + plan + } + + private def apply0(plan: LogicalPlan): LogicalPlan = + plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation: HadoopFsRelation, _, _, _)) + if canPruneRelation(hadoopFsRelation) => + val (normalizedProjects, normalizedFilters) = + normalizeAttributeRefNames(l, projects, filters) + val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters) + + // If requestedRootFields includes a nested field, continue. Otherwise, + // return op + if (requestedRootFields.exists { root: RootField => !root.derivedFromAtt }) { + val dataSchema = hadoopFsRelation.dataSchema + val prunedDataSchema = pruneDataSchema(dataSchema, requestedRootFields) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return op. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in prunedDataSchema are a subset of the fields + // in dataSchema. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { + val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + + val prunedRelation = buildPrunedRelation(l, prunedParquetRelation) + val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) + + buildNewProjection(normalizedProjects, normalizedFilters, prunedRelation, + projectionOverSchema) + } else { + op + } + } else { + op + } + } + + /** + * Checks to see if the given relation is Parquet and can be pruned. + */ + private def canPruneRelation(fsRelation: HadoopFsRelation) = + fsRelation.fileFormat.isInstanceOf[ParquetFileFormat] + + /** + * Normalizes the names of the attribute references in the given projects and filters to reflect + * the names in the given logical relation. This makes it possible to compare attributes and + * fields by name. Returns a tuple with the normalized projects and filters, respectively. + */ + private def normalizeAttributeRefNames( + logicalRelation: LogicalRelation, + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Seq[NamedExpression], Seq[Expression]) = { + val normalizedAttNameMap = logicalRelation.output.map(att => (att.exprId, att.name)).toMap + val normalizedProjects = projects.map(_.transform { + case att: AttributeReference if normalizedAttNameMap.contains(att.exprId) => + att.withName(normalizedAttNameMap(att.exprId)) + }).map { case expr: NamedExpression => expr } + val normalizedFilters = filters.map(_.transform { + case att: AttributeReference if normalizedAttNameMap.contains(att.exprId) => + att.withName(normalizedAttNameMap(att.exprId)) + }) + (normalizedProjects, normalizedFilters) + } + + /** + * Returns the set of fields from the Parquet file that the query plan needs. + */ + private def identifyRootFields(projects: Seq[NamedExpression], filters: Seq[Expression]) = { + val projectionRootFields = projects.flatMap(getRootFields) + val filterRootFields = filters.flatMap(getRootFields) + + (projectionRootFields ++ filterRootFields).distinct + } + + /** + * Builds the new output [[Project]] Spark SQL operator that has the pruned output relation. + */ + private def buildNewProjection( + projects: Seq[NamedExpression], filters: Seq[Expression], prunedRelation: LogicalRelation, + projectionOverSchema: ProjectionOverSchema) = { + // Construct a new target for our projection by rewriting and + // including the original filters where available + val projectionChild = + if (filters.nonEmpty) { + val projectedFilters = filters.map(_.transformDown { + case projectionOverSchema(expr) => expr + }) + val newFilterCondition = projectedFilters.reduce(And) + Filter(newFilterCondition, prunedRelation) + } else { + prunedRelation + } + + // Construct the new projections of our Project by + // rewriting the original projections + val newProjects = projects.map(_.transformDown { + case projectionOverSchema(expr) => expr + }).map { case expr: NamedExpression => expr } + + if (log.isDebugEnabled) { + logDebug(s"New projects:\n${newProjects.map(_.treeString).mkString("\n")}") + } + + Project(newProjects, projectionChild) + } + + /** + * Filters the schema from the given file by the requested fields. + * Schema field ordering from the file is preserved. + */ + private def pruneDataSchema( + fileDataSchema: StructType, + requestedRootFields: Seq[RootField]) = { + // Merge the requested root fields into a single schema. Note the ordering of the fields + // in the resulting schema may differ from their ordering in the logical relation's + // original schema + val mergedSchema = requestedRootFields + .map { case RootField(field, _) => StructType(Array(field)) } + .reduceLeft(_ merge _) + val dataSchemaFieldNames = fileDataSchema.fieldNames.toSet + val mergedDataSchema = + StructType(mergedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + // Sort the fields of mergedDataSchema according to their order in dataSchema, + // recursively. This makes mergedDataSchema a pruned schema of dataSchema + sortLeftFieldsByRight(mergedDataSchema, fileDataSchema).asInstanceOf[StructType] + } + + /** + * Builds a pruned logical relation from the output of the output relation and the schema of the + * pruned base relation. + */ + private def buildPrunedRelation( + outputRelation: LogicalRelation, + prunedBaseRelation: HadoopFsRelation) = { + // We need to replace the expression ids of the pruned relation output attributes + // with the expression ids of the original relation output attributes so that + // references to the original relation's output are not broken + val outputIdMap = outputRelation.output.map(att => (att.name, att.exprId)).toMap + val prunedRelationOutput = + prunedBaseRelation + .schema + .toAttributes + .map { + case att if outputIdMap.contains(att.name) => + att.withExprId(outputIdMap(att.name)) + case att => att + } + outputRelation.copy(relation = prunedBaseRelation, output = prunedRelationOutput) + } + + /** + * Gets the root (aka top-level, no-parent) [[StructField]]s for the given [[Expression]]. + * When expr is an [[Attribute]], construct a field around it and indicate that that + * field was derived from an attribute. + */ + private def getRootFields(expr: Expression): Seq[RootField] = { + expr match { + case att: Attribute => + RootField(StructField(att.name, att.dataType, att.nullable), derivedFromAtt = true) :: Nil + case SelectedField(field) => RootField(field, derivedFromAtt = false) :: Nil + case _ => + expr.children.flatMap(getRootFields) + } + } + + /** + * Counts the "leaf" fields of the given dataType. Informally, this is the + * number of fields of non-complex data type in the tree representation of + * [[DataType]]. + */ + private def countLeaves(dataType: DataType): Int = { + dataType match { + case array: ArrayType => countLeaves(array.elementType) + case map: MapType => countLeaves(map.keyType) + countLeaves(map.valueType) + case struct: StructType => + struct.map(field => countLeaves(field.dataType)).sum + case _ => 1 + } + } + + /** + * Sorts the fields and descendant fields of structs in left according to their order in + * right. This function assumes that the fields of left are a subset of the fields of + * right, recursively. That is, left is a "subschema" of right, ignoring order of + * fields. + */ + private def sortLeftFieldsByRight(left: DataType, right: DataType): DataType = + (left, right) match { + case (ArrayType(leftElementType, containsNull), ArrayType(rightElementType, _)) => + ArrayType( + sortLeftFieldsByRight(leftElementType, rightElementType), + containsNull) + case (MapType(leftKeyType, leftValueType, containsNull), + MapType(rightKeyType, rightValueType, _)) => + MapType( + sortLeftFieldsByRight(leftKeyType, rightKeyType), + sortLeftFieldsByRight(leftValueType, rightValueType), + containsNull) + case (leftStruct: StructType, rightStruct: StructType) => + val filteredRightFieldNames = rightStruct.fieldNames.filter(leftStruct.fieldNames.contains) + val sortedLeftFields = filteredRightFieldNames.map { fieldName => + val leftFieldType = leftStruct(fieldName).dataType + val rightFieldType = rightStruct(fieldName).dataType + val sortedLeftFieldType = sortLeftFieldsByRight(leftFieldType, rightFieldType) + StructField(fieldName, sortedLeftFieldType) + } + StructType(sortedLeftFields) + case _ => left + } + + /** + * A "root" schema field (aka top-level, no-parent) and whether it was derived from + * an attribute or had a proper child. + */ + private case class RootField(field: StructField, derivedFromAtt: Boolean) +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SelectedFieldSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SelectedFieldSuite.scala new file mode 100644 index 0000000000000..05f7e3ce83880 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SelectedFieldSuite.scala @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + private val ignoredField = StructField("col1", StringType, nullable = false) + + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // | |-- field1: integer (nullable = true) + // | |-- field6: struct (nullable = true) + // | | |-- subfield1: string (nullable = false) + // | | |-- subfield2: string (nullable = true) + // | |-- field7: struct (nullable = true) + // | | |-- subfield1: struct (nullable = true) + // | | | |-- subsubfield1: integer (nullable = true) + // | | | |-- subsubfield2: integer (nullable = true) + // | |-- field9: map (nullable = true) + // | | |-- key: string + // | | |-- value: integer (valueContainsNull = false) + private val nestedComplex = StructType(ignoredField :: + StructField("col2", StructType( + StructField("field1", IntegerType) :: + StructField("field6", StructType( + StructField("subfield1", StringType, nullable = false) :: + StructField("subfield2", StringType) :: Nil)) :: + StructField("field7", StructType( + StructField("subfield1", StructType( + StructField("subsubfield1", IntegerType) :: + StructField("subsubfield2", IntegerType) :: Nil)) :: Nil)) :: + StructField("field9", + MapType(StringType, IntegerType, valueContainsNull = false)) :: Nil)) :: Nil) + + test("SelectedField should not match an attribute reference") { + val testRelation = LocalRelation(nestedComplex.toAttributes) + assertResult(None)(unapplySelect("col1", testRelation)) + assertResult(None)(unapplySelect("col1 as foo", testRelation)) + assertResult(None)(unapplySelect("col2", testRelation)) + } + + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // | |-- field2: array (nullable = true) + // | | |-- element: integer (containsNull = false) + // | |-- field3: array (nullable = false) + // | | |-- element: struct (containsNull = true) + // | | | |-- subfield1: integer (nullable = true) + // | | | |-- subfield2: integer (nullable = true) + // | | | |-- subfield3: array (nullable = true) + // | | | | |-- element: integer (containsNull = true) + private val structOfArray = StructType(ignoredField :: + StructField("col2", StructType( + StructField("field2", ArrayType(IntegerType, containsNull = false)) :: + StructField("field3", ArrayType(StructType( + StructField("subfield1", IntegerType) :: + StructField("subfield2", IntegerType) :: + StructField("subfield3", ArrayType(IntegerType)) :: Nil)), nullable = false) + :: Nil)) + :: Nil) + + testSelect(structOfArray, "col2.field2", "col2.field2[0] as foo") { + StructField("col2", StructType( + StructField("field2", ArrayType(IntegerType, containsNull = false)) :: Nil)) + } + + testSelect(nestedComplex, "col2.field9", "col2.field9['foo'] as foo") { + StructField("col2", StructType( + StructField("field9", MapType(StringType, IntegerType, valueContainsNull = false)) :: Nil)) + } + + testSelect(structOfArray, "col2.field3.subfield3", "col2.field3[0].subfield3 as foo", + "col2.field3.subfield3[0] as foo", "col2.field3[0].subfield3[0] as foo") { + StructField("col2", StructType( + StructField("field3", ArrayType(StructType( + StructField("subfield3", ArrayType(IntegerType)) :: Nil)), nullable = false) :: Nil)) + } + + testSelect(structOfArray, "col2.field3.subfield1") { + StructField("col2", StructType( + StructField("field3", ArrayType(StructType( + StructField("subfield1", IntegerType) :: Nil)), nullable = false) :: Nil)) + } + + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // | |-- field4: map (nullable = true) + // | | |-- key: string + // | | |-- value: struct (valueContainsNull = false) + // | | | |-- subfield1: integer (nullable = true) + // | | | |-- subfield2: array (nullable = true) + // | | | | |-- element: integer (containsNull = false) + // | |-- field8: map (nullable = true) + // | | |-- key: string + // | | |-- value: array (valueContainsNull = false) + // | | | |-- element: struct (containsNull = true) + // | | | | |-- subfield1: integer (nullable = true) + // | | | | |-- subfield2: array (nullable = true) + // | | | | | |-- element: integer (containsNull = false) + private val structWithMap = StructType( + ignoredField :: + StructField("col2", StructType( + StructField("field4", MapType(StringType, StructType( + StructField("subfield1", IntegerType) :: + StructField("subfield2", ArrayType(IntegerType, containsNull = false)) :: Nil + ), valueContainsNull = false)) :: + StructField("field8", MapType(StringType, ArrayType(StructType( + StructField("subfield1", IntegerType) :: + StructField("subfield2", ArrayType(IntegerType, containsNull = false)) :: Nil) + ), valueContainsNull = false)) :: Nil + )) :: Nil + ) + + testSelect(structWithMap, "col2.field4['foo'].subfield1 as foo") { + StructField("col2", StructType( + StructField("field4", MapType(StringType, StructType( + StructField("subfield1", IntegerType) :: Nil), valueContainsNull = false)) :: Nil)) + } + + testSelect(structWithMap, + "col2.field4['foo'].subfield2 as foo", "col2.field4['foo'].subfield2[0] as foo") { + StructField("col2", StructType( + StructField("field4", MapType(StringType, StructType( + StructField("subfield2", ArrayType(IntegerType, containsNull = false)) + :: Nil), valueContainsNull = false)) :: Nil)) + } + + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // | |-- field5: array (nullable = false) + // | | |-- element: struct (containsNull = true) + // | | | |-- subfield1: struct (nullable = false) + // | | | | |-- subsubfield1: integer (nullable = true) + // | | | | |-- subsubfield2: integer (nullable = true) + // | | | |-- subfield2: struct (nullable = true) + // | | | | |-- subsubfield1: struct (nullable = true) + // | | | | | |-- subsubsubfield1: string (nullable = true) + // | | | | |-- subsubfield2: integer (nullable = true) + private val structWithArray = StructType( + ignoredField :: + StructField("col2", StructType( + StructField("field5", ArrayType(StructType( + StructField("subfield1", StructType( + StructField("subsubfield1", IntegerType) :: + StructField("subsubfield2", IntegerType) :: Nil), nullable = false) :: + StructField("subfield2", StructType( + StructField("subsubfield1", StructType( + StructField("subsubsubfield1", StringType) :: Nil)) :: + StructField("subsubfield2", IntegerType) :: Nil)) :: Nil)), nullable = false) :: Nil) + ) :: Nil + ) + + testSelect(structWithArray, "col2.field5.subfield1") { + StructField("col2", StructType( + StructField("field5", ArrayType(StructType( + StructField("subfield1", StructType( + StructField("subsubfield1", IntegerType) :: + StructField("subsubfield2", IntegerType) :: Nil), nullable = false) + :: Nil)), nullable = false) :: Nil)) + } + + testSelect(structWithArray, "col2.field5.subfield1.subsubfield1") { + StructField("col2", StructType( + StructField("field5", ArrayType(StructType( + StructField("subfield1", StructType( + StructField("subsubfield1", IntegerType) :: Nil), nullable = false) + :: Nil)), nullable = false) :: Nil)) + } + + testSelect(structWithArray, "col2.field5.subfield2.subsubfield1.subsubsubfield1") { + StructField("col2", StructType( + StructField("field5", ArrayType(StructType( + StructField("subfield2", StructType( + StructField("subsubfield1", StructType( + StructField("subsubsubfield1", StringType) :: Nil)) :: Nil)) + :: Nil)), nullable = false) :: Nil)) + } + + testSelect(structWithMap, "col2.field8['foo'][0].subfield1 as foo") { + StructField("col2", StructType( + StructField("field8", MapType(StringType, ArrayType(StructType( + StructField("subfield1", IntegerType) :: Nil)), valueContainsNull = false)) :: Nil)) + } + + testSelect(nestedComplex, "col2.field1") { + StructField("col2", StructType( + StructField("field1", IntegerType) :: Nil)) + } + + testSelect(nestedComplex, "col2.field6") { + StructField("col2", StructType( + StructField("field6", StructType( + StructField("subfield1", StringType, nullable = false) :: + StructField("subfield2", StringType) :: Nil)) :: Nil)) + } + + testSelect(nestedComplex, "col2.field6.subfield1") { + StructField("col2", StructType( + StructField("field6", StructType( + StructField("subfield1", StringType, nullable = false) :: Nil)) :: Nil)) + } + + testSelect(nestedComplex, "col2.field7.subfield1") { + StructField("col2", StructType( + StructField("field7", StructType( + StructField("subfield1", StructType( + StructField("subsubfield1", IntegerType) :: + StructField("subsubfield2", IntegerType) :: Nil)) :: Nil)) :: Nil)) + } + + // |-- col1: string (nullable = false) + // |-- col3: array (nullable = false) + // | |-- element: struct (containsNull = false) + // | | |-- field1: struct (nullable = true) + // | | | |-- subfield1: integer (nullable = false) + // | | | |-- subfield2: integer (nullable = true) + // | | |-- field2: map (nullable = true) + // | | | |-- key: string + // | | | |-- value: integer (valueContainsNull = false) + private val arrayWithStructAndMap = StructType(Array( + StructField("col3", ArrayType(StructType( + StructField("field1", StructType( + StructField("subfield1", IntegerType, nullable = false) :: + StructField("subfield2", IntegerType) :: Nil)) :: + StructField("field2", MapType(StringType, IntegerType, valueContainsNull = false)) + :: Nil), containsNull = false), nullable = false) + )) + + testSelect(arrayWithStructAndMap, "col3.field1.subfield1") { + StructField("col3", ArrayType(StructType( + StructField("field1", StructType( + StructField("subfield1", IntegerType, nullable = false) :: Nil)) + :: Nil), containsNull = false), nullable = false) + } + + testSelect(arrayWithStructAndMap, "col3.field2['foo'] as foo") { + StructField("col3", ArrayType(StructType( + StructField("field2", MapType(StringType, IntegerType, valueContainsNull = false)) + :: Nil), containsNull = false), nullable = false) + } + + // |-- col1: string (nullable = false) + // |-- col4: map (nullable = false) + // | |-- key: string + // | |-- value: struct (valueContainsNull = false) + // | | |-- field1: struct (nullable = true) + // | | | |-- subfield1: integer (nullable = false) + // | | | |-- subfield2: integer (nullable = true) + // | | |-- field2: map (nullable = true) + // | | | |-- key: string + // | | | |-- value: integer (valueContainsNull = false) + private val col4 = StructType(Array(ignoredField, + StructField("col4", MapType(StringType, StructType( + StructField("field1", StructType( + StructField("subfield1", IntegerType, nullable = false) :: + StructField("subfield2", IntegerType) :: Nil)) :: + StructField("field2", MapType(StringType, IntegerType, valueContainsNull = false)) + :: Nil), valueContainsNull = false), nullable = false) + )) + + testSelect(col4, "col4['foo'].field1.subfield1 as foo") { + StructField("col4", MapType(StringType, StructType( + StructField("field1", StructType( + StructField("subfield1", IntegerType, nullable = false) :: Nil)) + :: Nil), valueContainsNull = false), nullable = false) + } + + testSelect(col4, "col4['foo'].field2['bar'] as foo") { + StructField("col4", MapType(StringType, StructType( + StructField("field2", MapType(StringType, IntegerType, valueContainsNull = false)) + :: Nil), valueContainsNull = false), nullable = false) + } + + // |-- col1: string (nullable = false) + // |-- col5: array (nullable = true) + // | |-- element: map (containsNull = true) + // | | |-- key: string + // | | |-- value: struct (valueContainsNull = false) + // | | | |-- field1: struct (nullable = true) + // | | | | |-- subfield1: integer (nullable = true) + // | | | | |-- subfield2: integer (nullable = true) + private val arrayOfStruct = StructType(Array(ignoredField, + StructField("col5", ArrayType(MapType(StringType, StructType( + StructField("field1", StructType( + StructField("subfield1", IntegerType) :: + StructField("subfield2", IntegerType) :: Nil)) :: Nil), valueContainsNull = false))) + )) + + testSelect(arrayOfStruct, "col5[0]['foo'].field1.subfield1 as foo") { + StructField("col5", ArrayType(MapType(StringType, StructType( + StructField("field1", StructType( + StructField("subfield1", IntegerType) :: Nil)) :: Nil), valueContainsNull = false))) + } + + // |-- col1: string (nullable = false) + // |-- col6: map (nullable = true) + // | |-- key: string + // | |-- value: array (valueContainsNull = true) + // | | |-- element: struct (containsNull = false) + // | | | |-- field1: struct (nullable = true) + // | | | | |-- subfield1: integer (nullable = true) + // | | | | |-- subfield2: integer (nullable = true) + private val mapOfArray = StructType(Array(ignoredField, + StructField("col6", MapType(StringType, ArrayType(StructType( + StructField("field1", StructType( + StructField("subfield1", IntegerType) :: + StructField("subfield2", IntegerType) :: Nil)) :: Nil), containsNull = false))))) + + testSelect(mapOfArray, "col6['foo'][0].field1.subfield1 as foo") { + StructField("col6", MapType(StringType, ArrayType(StructType( + StructField("field1", StructType( + StructField("subfield1", IntegerType) :: Nil)) :: Nil), containsNull = false))) + } + + // An array with a struct with a different fields + // |-- col1: string (nullable = false) + // |-- col7: array (nullable = true) + // | |-- element: struct (containsNull = true) + // | | |-- field1: integer (nullable = false) + // | | |-- field2: struct (nullable = true) + // | | | |-- subfield1: integer (nullable = false) + // | | |-- field3: array (nullable = true) + // | | | |-- element: struct (containsNull = true) + // | | | | |-- subfield1: integer (nullable = false) + private val arrayWithMultipleFields = StructType(Array(ignoredField, + StructField("col7", ArrayType(StructType( + StructField("field1", IntegerType, nullable = false) :: + StructField("field2", StructType( + StructField("subfield1", IntegerType, nullable = false) :: Nil)) :: + StructField("field3", ArrayType(StructType( + StructField("subfield1", IntegerType, nullable = false) :: Nil))) :: Nil))))) + + testSelect(arrayWithMultipleFields, + "col7.field1", "col7[0].field1 as foo", "col7.field1[0] as foo") { + StructField("col7", ArrayType(StructType( + StructField("field1", IntegerType, nullable = false) :: Nil))) + } + + testSelect(arrayWithMultipleFields, "col7.field2.subfield1") { + StructField("col7", ArrayType(StructType( + StructField("field2", StructType( + StructField("subfield1", IntegerType, nullable = false) :: Nil)) :: Nil))) + } + + testSelect(arrayWithMultipleFields, "col7.field3.subfield1") { + StructField("col7", ArrayType(StructType( + StructField("field3", ArrayType(StructType( + StructField("subfield1", IntegerType, nullable = false) :: Nil))) :: Nil))) + } + + // Array with a nested int array + // |-- col1: string (nullable = false) + // |-- col8: array (nullable = true) + // | |-- element: struct (containsNull = true) + // | | |-- field1: array (nullable = false) + // | | | |-- element: integer (containsNull = false) + private val arrayOfArray = StructType(Array(ignoredField, + StructField("col8", + ArrayType(StructType(Array(StructField("field1", + ArrayType(IntegerType, containsNull = false), nullable = false)))) + ))) + + testSelect(arrayOfArray, "col8.field1", + "col8[0].field1 as foo", + "col8.field1[0] as foo", + "col8[0].field1[0] as foo") { + StructField("col8", ArrayType(StructType( + StructField("field1", ArrayType(IntegerType, containsNull = false), nullable = false) + :: Nil))) + } + + def assertResult(expected: StructField)(actual: StructField)(selectExpr: String): Unit = { + try { + super.assertResult(expected)(actual) + } catch { + case ex: TestFailedException => + // Print some helpful diagnostics in the case of failure + alert("Expected SELECT \"" + selectExpr + "\" to select the schema\n" + + indent(StructType(expected :: Nil).treeString) + + indent("but it actually selected\n") + + indent(StructType(actual :: Nil).treeString) + + indent("Note that expected.dataType.sameType(actual.dataType) = " + + expected.dataType.sameType(actual.dataType))) + throw ex + } + } + + // Test that the given SELECT expressions prune the test schema to the single-column schema + // defined by the given field + private def testSelect(inputSchema: StructType, selectExprs: String*) + (expected: StructField) { + test(s"SELECT ${selectExprs.map(s => s""""$s"""").mkString(", ")} should select the schema\n" + + indent(StructType(expected :: Nil).treeString)) { + for (selectExpr <- selectExprs) { + assertSelect(selectExpr, expected, inputSchema) + } + } + } + + private def assertSelect(expr: String, expected: StructField, inputSchema: StructType): Unit = { + val relation = LocalRelation(inputSchema.toAttributes) + unapplySelect(expr, relation) match { + case Some(field) => + assertResult(expected)(field)(expr) + case None => + val failureMessage = + "Failed to select a field from " + expr + ". " + + "Expected:\n" + + StructType(expected :: Nil).treeString + fail(failureMessage) + } + } + + private def unapplySelect(expr: String, relation: LocalRelation) = { + val parsedExpr = parseAsCatalystExpression(Seq(expr)).head + val select = relation.select(parsedExpr) + val analyzed = select.analyze + SelectedField.unapply(analyzed.expressions.head) + } + + private def parseAsCatalystExpression(exprs: Seq[String]) = { + exprs.map(CatalystSqlParser.parseExpression(_) match { + case namedExpr: NamedExpression => namedExpr + }) + } + + // Indent every line in `string` by four spaces + private def indent(string: String) = string.replaceAll("(?m)^", " ") +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index dbf637783e6d2..54c77dddc3525 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -108,7 +108,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext val queryOutput = selfJoin.queryExecution.analyzed.output assertResult(4, "Field count mismatches")(queryOutput.size) - assertResult(2, "Duplicated expression ID in query plan:\n $selfJoin") { + assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") { queryOutput.filter(_.name == "_1").map(_.exprId).size } @@ -117,7 +117,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } test("nested data - struct with array field") { - val data = (1 to 10).map(i => Tuple1((i, Seq("val_$i")))) + val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i")))) withParquetTable(data, "t") { checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map { case Tuple1((_, Seq(string))) => Row(string) @@ -126,7 +126,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } test("nested data - array of struct") { - val data = (1 to 10).map(i => Tuple1(Seq(i -> "val_$i"))) + val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i"))) withParquetTable(data, "t") { checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map { case Tuple1(Seq((_, string))) => Row(string) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala new file mode 100644 index 0000000000000..eb99654fa78f5 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.scalactic.Equality + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.SchemaPruningTest +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class ParquetSchemaPruningSuite + extends QueryTest + with ParquetTest + with SchemaPruningTest + with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact( + id: Int, + name: FullName, + address: String, + pets: Int, + friends: Array[FullName] = Array.empty, + relatives: Map[String, FullName] = Map.empty) + + val janeDoe = FullName("Jane", "X.", "Doe") + val johnDoe = FullName("John", "Y.", "Doe") + val susanSmith = FullName("Susan", "Z.", "Smith") + + private val contacts = + Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith), + relatives = Map("brother" -> johnDoe)) :: + Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(id: Int, name: Name, address: String) + + private val briefContacts = + BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") :: + BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn( + id: Int, + name: FullName, + address: String, + pets: Int, + friends: Array[FullName] = Array(), + relatives: Map[String, FullName] = Map(), + p: Int) + + case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int) + + private val contactsWithDataPartitionColumn = + contacts.map { case Contact(id, name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) } + private val briefContactsWithDataPartitionColumn = + briefContacts.map { case BriefContact(id, name, address) => + BriefContactWithDataPartitionColumn(id, name, address, 2) } + + testSchemaPruning("select a single complex field") { + val query = sql("select name.middle from contacts") + checkScan(query, "struct>") + checkAnswer(query.orderBy("id"), Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and its parent struct") { + val query = sql("select name.middle, name from contacts") + checkScan(query, "struct>") + checkAnswer(query.orderBy("id"), + Row("X.", Row("Jane", "X.", "Doe")) :: + Row("Y.", Row("John", "Y.", "Doe")) :: + Row(null, Row("Janet", null, "Jones")) :: + Row(null, Row("Jim", null, "Jones")) :: + Nil) + } + + testSchemaPruning("select a single complex field array and its parent struct array") { + val query = sql("select friends.middle, friends from contacts where p=1") + checkScan(query, + "struct>>") + checkAnswer(query.orderBy("id"), + Row(Array("Z."), Array(Row("Susan", "Z.", "Smith"))) :: + Row(Array.empty[String], Array.empty[Row]) :: + Nil) + } + + testSchemaPruning("select a single complex field from a map entry and its parent map entry") { + val query = + sql("select relatives[\"brother\"].middle, relatives[\"brother\"] from contacts where p=1") + checkScan(query, + "struct>>") + checkAnswer(query.orderBy("id"), + Row("Y.", Row("John", "Y.", "Doe")) :: + Row(null, null) :: + Nil) + } + + testSchemaPruning("select a single complex field and the partition column") { + val query = sql("select name.middle, p from contacts") + checkScan(query, "struct>") + checkAnswer(query.orderBy("id"), + Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) + } + + ignore("partial schema intersection - select missing subfield") { + val query = sql("select name.middle, address from contacts where p=2") + checkScan(query, "struct,address:string>") + checkAnswer(query.orderBy("id"), + Row(null, "567 Maple Drive") :: + Row(null, "6242 Ash Street") :: Nil) + } + + testSchemaPruning("no unnecessary schema pruning") { + val query = + sql("select id, name.last, name.middle, name.first, relatives[''].last, " + + "relatives[''].middle, relatives[''].first, friends[0].last, friends[0].middle, " + + "friends[0].first, pets, address from contacts where p=2") + // We've selected every field in the schema. Therefore, no schema pruning should be performed. + // We check this by asserting that the scanned schema of the query is identical to the schema + // of the contacts relation, even though the fields are selected in different orders. + checkScan(query, + "struct,address:string,pets:int," + + "friends:array>," + + "relatives:map>>") + checkAnswer(query.orderBy("id"), + Row(2, "Jones", null, "Janet", null, null, null, null, null, null, null, "567 Maple Drive") :: + Row(3, "Jones", null, "Jim", null, null, null, null, null, null, null, "6242 Ash Street") :: + Nil) + } + + testSchemaPruning("empty schema intersection") { + val query = sql("select name.middle from contacts where p=2") + checkScan(query, "struct>") + checkAnswer(query.orderBy("id"), + Row(null) :: Row(null) :: Nil) + } + + private def testSchemaPruning(testName: String)(testThunk: => Unit) { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + test(s"Spark vectorized reader - without partition data column - $testName") { + withContacts(testThunk) + } + test(s"Spark vectorized reader - with partition data column - $testName") { + withContactsWithDataPartitionColumn(testThunk) + } + } + + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + test(s"Parquet-mr reader - without partition data column - $testName") { + withContacts(testThunk) + } + test(s"Parquet-mr reader - with partition data column - $testName") { + withContactsWithDataPartitionColumn(testThunk) + } + } + } + + private def withContacts(testThunk: => Unit) { + withTempPath { dir => + val path = dir.getCanonicalPath + + makeParquetFile(contacts, new File(path + "/contacts/p=1")) + makeParquetFile(briefContacts, new File(path + "/contacts/p=2")) + + spark.read.parquet(path + "/contacts").createOrReplaceTempView("contacts") + + testThunk + } + } + + private def withContactsWithDataPartitionColumn(testThunk: => Unit) { + withTempPath { dir => + val path = dir.getCanonicalPath + + makeParquetFile(contactsWithDataPartitionColumn, new File(path + "/contacts/p=1")) + makeParquetFile(briefContactsWithDataPartitionColumn, new File(path + "/contacts/p=2")) + + spark.read.parquet(path + "/contacts").createOrReplaceTempView("contacts") + + testThunk + } + } + + case class MixedCaseColumn(a: String, B: Int) + case class MixedCase(id: Int, CoL1: String, coL2: MixedCaseColumn) + + private val mixedCaseData = + MixedCase(0, "r0c1", MixedCaseColumn("abc", 1)) :: + MixedCase(1, "r1c1", MixedCaseColumn("123", 2)) :: + Nil + + testMixedCasePruning("select with exact column names") { + val query = sql("select CoL1, coL2.B from mixedcase") + checkScan(query, "struct>") + checkAnswer(query.orderBy("id"), + Row("r0c1", 1) :: + Row("r1c1", 2) :: + Nil) + } + + testMixedCasePruning("select with lowercase column names") { + val query = sql("select col1, col2.b from mixedcase") + checkScan(query, "struct>") + checkAnswer(query.orderBy("id"), + Row("r0c1", 1) :: + Row("r1c1", 2) :: + Nil) + } + + testMixedCasePruning("select with different-case column names") { + val query = sql("select cOL1, cOl2.b from mixedcase") + checkScan(query, "struct>") + checkAnswer(query.orderBy("id"), + Row("r0c1", 1) :: + Row("r1c1", 2) :: + Nil) + } + + testMixedCasePruning("filter with different-case column names") { + val query = sql("select id from mixedcase where Col2.b = 2") + // Pruning with filters is currently unsupported. As-is, the file reader will read the id column + // and the entire coL2 struct. Once pruning with filters has been implemented we can uncomment + // this line + // checkScan(query, "struct>") + checkAnswer(query.orderBy("id"), Row(1) :: Nil) + } + + private def testMixedCasePruning(testName: String)(testThunk: => Unit) { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", + SQLConf.CASE_SENSITIVE.key -> "true") { + test(s"Spark vectorized reader - case-sensitive parser - mixed-case schema - $testName") { + withMixedCaseData(testThunk) + } + } + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + SQLConf.CASE_SENSITIVE.key -> "false") { + test(s"Parquet-mr reader - case-insensitive parser - mixed-case schema - $testName") { + withMixedCaseData(testThunk) + } + } + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", + SQLConf.CASE_SENSITIVE.key -> "false") { + test(s"Spark vectorized reader - case-insensitive parser - mixed-case schema - $testName") { + withMixedCaseData(testThunk) + } + } + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + SQLConf.CASE_SENSITIVE.key -> "true") { + test(s"Parquet-mr reader - case-sensitive parser - mixed-case schema - $testName") { + withMixedCaseData(testThunk) + } + } + } + + private def withMixedCaseData(testThunk: => Unit) { + withParquetTable(mixedCaseData, "mixedcase") { + testThunk + } + } + + private val schemaEquality = new Equality[StructType] { + override def areEqual(a: StructType, b: Any): Boolean = + b match { + case otherType: StructType => a.sameType(otherType) + case _ => false + } + } + + protected def checkScan(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { + checkScanSchemata(df, expectedSchemaCatalogStrings: _*) + // We check here that we can execute the query without throwing an exception. The results + // themselves are irrelevant, and should be checked elsewhere as needed + df.collect() + } + + private def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { + val fileSourceScanSchemata = + df.queryExecution.executedPlan.collect { + case scan: FileSourceScanExec => scan.requiredSchema + } + assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, + s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + + s"but expected $expectedSchemaCatalogStrings") + fileSourceScanSchemata.zip(expectedSchemaCatalogStrings).foreach { + case (scanSchema, expectedScanSchemaCatalogString) => + val expectedScanSchema = CatalystSqlParser.parseDataType(expectedScanSchemaCatalogString) + implicit val equality = schemaEquality + assert(scanSchema === expectedScanSchema) + } + } +}