From 8253bbe36d551f11d8e48ab92444977ac5b0776a Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Tue, 30 May 2017 14:58:39 -0700 Subject: [PATCH 1/4] [SPARK-20926][SQL] Removing exposures to guava library through directly accessing SessionCatalog's tableRelationCache There were test failures because DataStorageStrategy, HiveMetastoreCatalog and also HiveSchemaInferenceSuite were exposed to the shaded Guava library. This change removes those exposures by introducing new methods in SessionCatalog. --- .../sql/catalyst/catalog/SessionCatalog.scala | 47 ++++++++++++++++++- .../datasources/DataSourceStrategy.scala | 4 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 +++---- .../sql/hive/HiveSchemaInferenceSuite.scala | 2 +- 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index a78440df4f3e..52fc3367b9cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.catalog import java.net.URI import java.util.Locale +import java.util.concurrent.Callable import javax.annotation.concurrent.GuardedBy import scala.collection.mutable @@ -126,13 +127,55 @@ class SessionCatalog( } /** - * A cache of qualified table names to table relation plans. - */ + * A cache of qualified table names to table relation plans. + * Accessing tableRelationCache directly is not recommended, + * since it will introduce exposures to guava libraries. + */ val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { val cacheSize = conf.tableRelationCacheSize CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]() } + /** + * This method provides a way to get a cached plan + * without exposing components to Guava library. + */ + def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = { + tableRelationCache.get(t, c) + } + + /** + * This method provides a way to get a cached plan if the key exists + * without exposing components to Guava library. + */ + def getCachedTableIfPresent(key: QualifiedTableName): LogicalPlan = { + tableRelationCache.getIfPresent(key) + } + + /** + * This method provides a way to cache a plan + * without exposing components to Guava library. + */ + def putTableInCache(t: QualifiedTableName, l: LogicalPlan): Unit = { + tableRelationCache.put(t, l) + } + + /** + * This method provides a way to invalidate a cached plan + * without exposing components to Guava library. + */ + def invalidateCachedTable(key: QualifiedTableName): Unit = { + tableRelationCache.invalidate(key) + } + + /** + * This method provides a way to invalidate all the cached plans + * without exposing components to Guava library. + */ + def invalidateAllCachedTables(): Unit = { + tableRelationCache.invalidateAll() + } + /** * This method is used to make the given path qualified before we * store this path in the underlying external catalog. So, when a path diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 21d75a404911..e05a8d5f02bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -215,9 +215,9 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] private def readDataSourceTable(r: CatalogRelation): LogicalPlan = { val table = r.tableMeta val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) - val cache = sparkSession.sessionState.catalog.tableRelationCache + val catalogProxy = sparkSession.sessionState.catalog - val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() { + val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) val dataSource = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6b98066cb76c..6b0ce1a40f10 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.types._ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { // these are def_s and not val/lazy val since the latter would introduce circular references private def sessionState = sparkSession.sessionState - private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache + private def catalogProxy = sparkSession.sessionState.catalog import HiveMetastoreCatalog._ /** These locks guard against multiple attempts to instantiate a table, which wastes memory. */ @@ -61,7 +61,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val key = QualifiedTableName( table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase, table.table.toLowerCase) - tableRelationCache.getIfPresent(key) + catalogProxy.getCachedTableIfPresent(key) } private def getCached( @@ -71,7 +71,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log expectedFileFormat: Class[_ <: FileFormat], partitionSchema: Option[StructType]): Option[LogicalRelation] = { - tableRelationCache.getIfPresent(tableIdentifier) match { + catalogProxy.getCachedTableIfPresent(tableIdentifier) match { case null => None // Cache miss case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => val cachedRelationFileFormatClass = relation.fileFormat.getClass @@ -92,21 +92,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log Some(logical) } else { // If the cached relation is not updated, we invalidate it right away. - tableRelationCache.invalidate(tableIdentifier) + catalogProxy.invalidateCachedTable(tableIdentifier) None } case _ => logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " + s"However, we are getting a ${relation.fileFormat} from the metastore cache. " + "This cached entry will be invalidated.") - tableRelationCache.invalidate(tableIdentifier) + catalogProxy.invalidateCachedTable(tableIdentifier) None } case other => logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " + s"However, we are getting a $other from the metastore cache. " + "This cached entry will be invalidated.") - tableRelationCache.invalidate(tableIdentifier) + catalogProxy.invalidateCachedTable(tableIdentifier) None } } @@ -176,7 +176,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log fileFormat = fileFormat, options = options)(sparkSession = sparkSession) val created = LogicalRelation(fsRelation, updatedTable) - tableRelationCache.put(tableIdentifier, created) + catalogProxy.putTableInCache(tableIdentifier, created) created } @@ -205,7 +205,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log className = fileType).resolveRelation(), table = updatedTable) - tableRelationCache.put(tableIdentifier, created) + catalogProxy.putTableInCache(tableIdentifier, created) created } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala index b3a06045b5fd..d271acc63de0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala @@ -46,7 +46,7 @@ class HiveSchemaInferenceSuite override def afterEach(): Unit = { super.afterEach() - spark.sessionState.catalog.tableRelationCache.invalidateAll() + spark.sessionState.catalog.invalidateAllCachedTables() FileStatusCache.resetForTesting() } From 9821ea191d63b327663f29adb04b48c856c550ff Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Thu, 1 Jun 2017 18:36:05 -0700 Subject: [PATCH 2/4] Making tableRelationCache private and updating the comments. --- .../sql/catalyst/catalog/SessionCatalog.scala | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 52fc3367b9cb..743b73f29472 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -126,51 +126,41 @@ class SessionCatalog( if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) } - /** - * A cache of qualified table names to table relation plans. - * Accessing tableRelationCache directly is not recommended, - * since it will introduce exposures to guava libraries. - */ - val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { + private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { val cacheSize = conf.tableRelationCacheSize CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]() } /** - * This method provides a way to get a cached plan - * without exposing components to Guava library. + * This method provides a way to get a cached plan. */ def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = { tableRelationCache.get(t, c) } /** - * This method provides a way to get a cached plan if the key exists - * without exposing components to Guava library. + * This method provides a way to get a cached plan if the key exists. */ def getCachedTableIfPresent(key: QualifiedTableName): LogicalPlan = { tableRelationCache.getIfPresent(key) } /** - * This method provides a way to cache a plan - * without exposing components to Guava library. + * This method provides a way to cache a plan. */ def putTableInCache(t: QualifiedTableName, l: LogicalPlan): Unit = { tableRelationCache.put(t, l) } /** - * This method provides a way to invalidate a cached plan - * without exposing components to Guava library. + * This method provides a way to invalidate a cached plan. */ def invalidateCachedTable(key: QualifiedTableName): Unit = { tableRelationCache.invalidate(key) } /** - * This method provides a way to invalidate all the cached plans - * without exposing components to Guava library. + * This method provides a way to invalidate all the cached plans. */ def invalidateAllCachedTables(): Unit = { tableRelationCache.invalidateAll() From 942137299dc03de53ce3e7120ac052f5764c14dc Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Thu, 1 Jun 2017 20:44:57 -0700 Subject: [PATCH 3/4] Fixing scalastyle check errors --- .../sql/catalyst/catalog/SessionCatalog.scala | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 743b73f29472..5b638553f0c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -131,37 +131,27 @@ class SessionCatalog( CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]() } - /** - * This method provides a way to get a cached plan. - */ + /** This method provides a way to get a cached plan. */ def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = { tableRelationCache.get(t, c) } - /** - * This method provides a way to get a cached plan if the key exists. - */ + /** This method provides a way to get a cached plan if the key exists. */ def getCachedTableIfPresent(key: QualifiedTableName): LogicalPlan = { tableRelationCache.getIfPresent(key) } - /** - * This method provides a way to cache a plan. - */ + /** This method provides a way to cache a plan. */ def putTableInCache(t: QualifiedTableName, l: LogicalPlan): Unit = { tableRelationCache.put(t, l) } - /** - * This method provides a way to invalidate a cached plan. - */ + /** This method provides a way to invalidate a cached plan. */ def invalidateCachedTable(key: QualifiedTableName): Unit = { tableRelationCache.invalidate(key) } - /** - * This method provides a way to invalidate all the cached plans. - */ + /** This method provides a way to invalidate all the cached plans. */ def invalidateAllCachedTables(): Unit = { tableRelationCache.invalidateAll() } From 2832253afe2a48daae3f78568315b19a5aeb045f Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Fri, 2 Jun 2017 16:49:49 -0700 Subject: [PATCH 4/4] Changing the names for two of the methods. --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 4 ++-- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 5b638553f0c5..57006bfaf9b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -137,12 +137,12 @@ class SessionCatalog( } /** This method provides a way to get a cached plan if the key exists. */ - def getCachedTableIfPresent(key: QualifiedTableName): LogicalPlan = { + def getCachedTable(key: QualifiedTableName): LogicalPlan = { tableRelationCache.getIfPresent(key) } /** This method provides a way to cache a plan. */ - def putTableInCache(t: QualifiedTableName, l: LogicalPlan): Unit = { + def cacheTable(t: QualifiedTableName, l: LogicalPlan): Unit = { tableRelationCache.put(t, l) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6b0ce1a40f10..9b3cbb63a21b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -61,7 +61,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val key = QualifiedTableName( table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase, table.table.toLowerCase) - catalogProxy.getCachedTableIfPresent(key) + catalogProxy.getCachedTable(key) } private def getCached( @@ -71,7 +71,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log expectedFileFormat: Class[_ <: FileFormat], partitionSchema: Option[StructType]): Option[LogicalRelation] = { - catalogProxy.getCachedTableIfPresent(tableIdentifier) match { + catalogProxy.getCachedTable(tableIdentifier) match { case null => None // Cache miss case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => val cachedRelationFileFormatClass = relation.fileFormat.getClass @@ -176,7 +176,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log fileFormat = fileFormat, options = options)(sparkSession = sparkSession) val created = LogicalRelation(fsRelation, updatedTable) - catalogProxy.putTableInCache(tableIdentifier, created) + catalogProxy.cacheTable(tableIdentifier, created) created } @@ -205,7 +205,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log className = fileType).resolveRelation(), table = updatedTable) - catalogProxy.putTableInCache(tableIdentifier, created) + catalogProxy.cacheTable(tableIdentifier, created) created }