Skip to content

Commit 07e54d3

Browse files
Michael AllmanRobert Kruszewski
authored andcommitted
[SPARK-16980][SQL] Load only catalog table partition metadata required to answer a query
(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.) ## What changes were proposed in this pull request? In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference. If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild. In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space. This PR proposes an alternative approach. Basically, it makes four changes: 1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates. 1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates. 1. It removes partition loading and caching from `HiveMetastoreCatalog`. 1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog. The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters. As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted. ## Open Issues 1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR. 1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by apache#14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue. 1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`. 1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly. ## How was this patch tested? The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded. Author: Michael Allman <[email protected]> Author: Eric Liang <[email protected]> Author: Eric Liang <[email protected]> Closes apache#14690 from mallman/spark-16980-lazy_partition_fetching.
1 parent d5429cd commit 07e54d3

37 files changed

+914
-368
lines changed

core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ private[spark] object StaticSources {
2626
* The set of all static sources. These sources may be reported to from any class, including
2727
* static classes, without requiring reference to a SparkEnv.
2828
*/
29-
val allSources = Seq(CodegenMetrics)
29+
val allSources = Seq(CodegenMetrics, HiveCatalogMetrics)
3030
}
3131

3232
/**
@@ -60,3 +60,35 @@ object CodegenMetrics extends Source {
6060
val METRIC_GENERATED_METHOD_BYTECODE_SIZE =
6161
metricRegistry.histogram(MetricRegistry.name("generatedMethodSize"))
6262
}
63+
64+
/**
65+
* :: Experimental ::
66+
* Metrics for access to the hive external catalog.
67+
*/
68+
@Experimental
69+
object HiveCatalogMetrics extends Source {
70+
override val sourceName: String = "HiveExternalCatalog"
71+
override val metricRegistry: MetricRegistry = new MetricRegistry()
72+
73+
/**
74+
* Tracks the total number of partition metadata entries fetched via the client api.
75+
*/
76+
val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
77+
78+
/**
79+
* Tracks the total number of files discovered off of the filesystem by ListingFileCatalog.
80+
*/
81+
val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
82+
83+
/**
84+
* Resets the values of all metrics to zero. This is useful in tests.
85+
*/
86+
def reset(): Unit = {
87+
METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
88+
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
89+
}
90+
91+
// clients can use these to avoid classloader issues with the codahale classes
92+
def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
93+
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
94+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,12 @@ abstract class ExternalCatalog {
198198
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
199199

200200
/**
201-
* List the metadata of selected partitions according to the given partition predicates.
201+
* List the metadata of partitions that belong to the specified table, assuming it exists, that
202+
* satisfy the given partition-pruning predicate expressions.
202203
*
203204
* @param db database name
204205
* @param table table name
205-
* @param predicates partition predicated
206+
* @param predicates partition-pruning predicates
206207
*/
207208
def listPartitionsByFilter(
208209
db: String,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,9 @@ class InMemoryCatalog(
482482
db: String,
483483
table: String,
484484
predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
485-
throw new UnsupportedOperationException("listPartitionsByFilter is not implemented.")
485+
// TODO: Provide an implementation
486+
throw new UnsupportedOperationException(
487+
"listPartitionsByFilter is not implemented for InMemoryCatalog")
486488
}
487489

488490
// --------------------------------------------------------------------------

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ package org.apache.spark.sql.catalyst.catalog
2020
import java.util.Date
2121

2222
import org.apache.spark.sql.AnalysisException
23-
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
24-
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
2525
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
2626
import org.apache.spark.sql.catalyst.util.quoteIdentifier
27-
import org.apache.spark.sql.types.StructType
27+
import org.apache.spark.sql.types.{StructField, StructType}
2828

2929

3030
/**
@@ -97,6 +97,15 @@ case class CatalogTablePartition(
9797

9898
output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")")
9999
}
100+
101+
/**
102+
* Given the partition schema, returns a row with that schema holding the partition values.
103+
*/
104+
def toRow(partitionSchema: StructType): InternalRow = {
105+
InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) =>
106+
Cast(Literal(spec(name)), dataType).eval()
107+
})
108+
}
100109
}
101110

102111

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
4343
import org.apache.spark.sql.catalyst.util.usePrettyExpression
4444
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
4545
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
46-
import org.apache.spark.sql.execution.datasources.LogicalRelation
46+
import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation}
4747
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
4848
import org.apache.spark.sql.execution.python.EvaluatePython
4949
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
@@ -2614,7 +2614,7 @@ class Dataset[T] private[sql](
26142614
* @since 2.0.0
26152615
*/
26162616
def inputFiles: Array[String] = {
2617-
val files: Seq[String] = logicalPlan.collect {
2617+
val files: Seq[String] = queryExecution.optimizedPlan.collect {
26182618
case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
26192619
fsBasedRelation.inputFiles
26202620
case fr: FileRelation =>

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ class CacheManager extends Logging {
185185
plan match {
186186
case lr: LogicalRelation => lr.relation match {
187187
case hr: HadoopFsRelation =>
188-
val invalidate = hr.location.paths
188+
val invalidate = hr.location.rootPaths
189189
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
190190
.contains(qualifiedPath)
191191
if (invalidate) hr.location.refresh()

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -225,13 +225,27 @@ case class FileSourceScanExec(
225225
}
226226

227227
// These metadata values make scan plans uniquely identifiable for equality checking.
228-
override val metadata: Map[String, String] = Map(
229-
"Format" -> relation.fileFormat.toString,
230-
"ReadSchema" -> outputSchema.catalogString,
231-
"Batched" -> supportsBatch.toString,
232-
"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
233-
"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
234-
"InputPaths" -> relation.location.paths.mkString(", "))
228+
override val metadata: Map[String, String] = {
229+
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
230+
val location = relation.location
231+
val locationDesc =
232+
location.getClass.getSimpleName + seqToString(location.rootPaths)
233+
val metadata =
234+
Map(
235+
"Format" -> relation.fileFormat.toString,
236+
"ReadSchema" -> outputSchema.catalogString,
237+
"Batched" -> supportsBatch.toString,
238+
"PartitionFilters" -> seqToString(partitionFilters),
239+
"PushedFilters" -> seqToString(dataFilters),
240+
"Location" -> locationDesc)
241+
val withOptPartitionCount =
242+
relation.partitionSchemaOption.map { _ =>
243+
metadata + ("PartitionCount" -> selectedPartitions.size.toString)
244+
} getOrElse {
245+
metadata
246+
}
247+
withOptPartitionCount
248+
}
235249

236250
private lazy val inputRDD: RDD[InternalRow] = {
237251
val originalPartitions = relation.location.listFiles(partitionFilters)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
2020
import org.apache.spark.sql.ExperimentalMethods
2121
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
2222
import org.apache.spark.sql.catalyst.optimizer.Optimizer
23+
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
2324
import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate
2425
import org.apache.spark.sql.internal.SQLConf
2526

@@ -32,5 +33,6 @@ class SparkOptimizer(
3233
override def batches: Seq[Batch] = super.batches :+
3334
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
3435
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
36+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
3537
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
3638
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
6767

6868
dataSource match {
6969
case fs: HadoopFsRelation =>
70-
if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty) {
70+
if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) {
7171
throw new AnalysisException(
7272
"Cannot create a file-based external data source table without path")
7373
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -471,9 +471,7 @@ case class DataSource(
471471
val existingPartitionColumns = Try {
472472
resolveRelation()
473473
.asInstanceOf[HadoopFsRelation]
474-
.location
475-
.partitionSpec()
476-
.partitionColumns
474+
.partitionSchema
477475
.fieldNames
478476
.toSeq
479477
}.getOrElse(Seq.empty[String])

0 commit comments

Comments
 (0)