Skip to content

Commit c0dc7a4

Browse files
committed
Cache converted parquet relations.
1 parent e3202aa commit c0dc7a4

File tree

2 files changed

+168
-6
lines changed

2 files changed

+168
-6
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
106106
}
107107

108108
override def refreshTable(databaseName: String, tableName: String): Unit = {
109-
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
109+
// refresh table does not eagerly reload the cache. It just invalidate the cache.
110+
// Next time when we use the table, it will be populated in the cache.
111+
invalidateTable(databaseName, tableName)
110112
}
111113

112114
def invalidateTable(databaseName: String, tableName: String): Unit = {
@@ -213,13 +215,42 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
213215
private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
214216
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
215217
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
216-
val parquetOptions = Map(
217-
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
218-
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
219218

220219
// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
221220
// serialize the Metastore schema to JSON and pass it as a data source option because of the
222221
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
222+
val parquetOptions = Map(
223+
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
224+
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
225+
val tableIdentifier =
226+
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
227+
228+
def getCached(
229+
tableIdentifier: QualifiedTableName,
230+
pathsInMetastore: Seq[String],
231+
schemaInMetastore: StructType,
232+
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
233+
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
234+
case null => None // Cache miss
235+
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
236+
// If we have the same paths, same schema, and same partition spec,
237+
// we will use the cached Parquet Relation.
238+
val useCached =
239+
parquetRelation.paths == pathsInMetastore &&
240+
logical.schema.sameType(metastoreSchema) &&
241+
parquetRelation.maybePartitionSpec == partitionSpecInMetastore
242+
243+
if (useCached) Some(logical) else None
244+
case other =>
245+
logWarning(
246+
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
247+
s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
248+
s"This cached entry will be invalidated.")
249+
cachedDataSourceTables.invalidate(tableIdentifier)
250+
None
251+
}
252+
}
253+
223254
if (metastoreRelation.hiveQlTable.isPartitioned) {
224255
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
225256
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
@@ -232,10 +263,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
232263
}
233264
val partitionSpec = PartitionSpec(partitionSchema, partitions)
234265
val paths = partitions.map(_.path)
235-
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
266+
267+
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
268+
val parquetRelation = cached.getOrElse {
269+
val created =
270+
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
271+
cachedDataSourceTables.put(tableIdentifier, created)
272+
created
273+
}
274+
275+
parquetRelation
236276
} else {
237277
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
238-
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
278+
279+
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
280+
val parquetRelation = cached.getOrElse {
281+
val created =
282+
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
283+
cachedDataSourceTables.put(tableIdentifier, created)
284+
created
285+
}
286+
287+
parquetRelation
239288
}
240289
}
241290

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
2626
import org.apache.spark.sql.catalyst.expressions.Row
2727
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
2828
import org.apache.spark.sql.hive.execution.HiveTableScan
29+
import org.apache.spark.sql.hive.test.TestHive
2930
import org.apache.spark.sql.hive.test.TestHive._
3031
import org.apache.spark.sql.hive.test.TestHive.implicits._
32+
import org.apache.spark.sql.json.JSONRelation
3133
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
3234
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
3335
import org.apache.spark.sql.SaveMode
@@ -390,6 +392,116 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
390392

391393
sql("DROP TABLE ms_convert")
392394
}
395+
396+
test("Caching converted data source Parquet Relations") {
397+
def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
398+
// Converted test_parquet should be cached.
399+
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
400+
case null => fail("Converted test_parquet should be cached in the cache.")
401+
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
402+
case other =>
403+
fail(
404+
"The cached test_parquet should be a Parquet Relation. " +
405+
s"However, $other is returned form the cache.")
406+
}
407+
}
408+
409+
sql("DROP TABLE IF EXISTS test_insert_parquet")
410+
sql("DROP TABLE IF EXISTS test_parquet_partitioned_cache_test")
411+
412+
sql(
413+
"""
414+
|create table test_insert_parquet
415+
|(
416+
| intField INT,
417+
| stringField STRING
418+
|)
419+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
420+
|STORED AS
421+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
422+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
423+
""".stripMargin)
424+
425+
var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet")
426+
427+
// First, make sure the converted test_parquet is not cached.
428+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
429+
// Table lookup will make the table cached.
430+
table("test_insert_parquet")
431+
checkCached(tableIdentifer)
432+
// For insert into non-partitioned table, we will do the conversion,
433+
// so the converted test_insert_parquet should be cached.
434+
invalidateTable("test_insert_parquet")
435+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
436+
sql(
437+
"""
438+
|INSERT INTO TABLE test_insert_parquet
439+
|select a, b from jt
440+
""".stripMargin)
441+
checkCached(tableIdentifer)
442+
// Make sure we can read the data.
443+
checkAnswer(
444+
sql("select * from test_insert_parquet"),
445+
sql("select a, b from jt").collect())
446+
// Invalidate the cache.
447+
invalidateTable("test_insert_parquet")
448+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
449+
450+
// Create a partitioned table.
451+
sql(
452+
"""
453+
|create table test_parquet_partitioned_cache_test
454+
|(
455+
| intField INT,
456+
| stringField STRING
457+
|)
458+
|PARTITIONED BY (date string)
459+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
460+
|STORED AS
461+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
462+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
463+
""".stripMargin)
464+
465+
tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
466+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
467+
sql(
468+
"""
469+
|INSERT INTO TABLE test_parquet_partitioned_cache_test
470+
|PARTITION (date='2015-04-01')
471+
|select a, b from jt
472+
""".stripMargin)
473+
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
474+
// So, we expect it is not cached.
475+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
476+
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
477+
sql(
478+
"""
479+
|INSERT INTO TABLE test_parquet_partitioned_cache_test
480+
|PARTITION (date='2015-04-02')
481+
|select a, b from jt
482+
""".stripMargin)
483+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
484+
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
485+
486+
// Make sure we can cache the partitioned table.
487+
table("test_parquet_partitioned_cache_test")
488+
checkCached(tableIdentifer)
489+
// Make sure we can read the data.
490+
checkAnswer(
491+
sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"),
492+
sql(
493+
"""
494+
|select b, '2015-04-01', a FROM jt
495+
|UNION ALL
496+
|select b, '2015-04-02', a FROM jt
497+
""".stripMargin).collect())
498+
499+
invalidateTable("test_parquet_partitioned_cache_test")
500+
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
501+
502+
sql("DROP TABLE test_insert_parquet")
503+
sql("DROP TABLE test_parquet_partitioned_cache_test")
504+
}
393505
}
394506

395507
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
@@ -414,6 +526,7 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
414526
|)
415527
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
416528
|STORED AS
529+
|STORED AS
417530
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
418531
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
419532
""".stripMargin)

0 commit comments

Comments
 (0)