Skip to content

Commit 6e594b8

Browse files
Get size info from metastore for MetastoreRelation.
Additionally, remove size estimate from ParquetRelation since the Hadoop FileSystem API calls can be expensive (e.g. S3FileSystem has a lot of RPCs).
1 parent 01b7a3e commit 6e594b8

File tree

3 files changed

+11
-62
lines changed

3 files changed

+11
-62
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@ import java.io.IOException
2222
import org.apache.hadoop.conf.Configuration
2323
import org.apache.hadoop.fs.Path
2424
import org.apache.hadoop.fs.permission.FsAction
25-
import org.apache.hadoop.mapreduce.Job
2625

2726
import parquet.hadoop.ParquetOutputFormat
2827
import parquet.hadoop.metadata.CompressionCodecName
29-
import parquet.hadoop.util.ContextUtil
3028
import parquet.schema.MessageType
3129

3230
import org.apache.spark.sql.SQLContext
@@ -53,15 +51,6 @@ private[sql] case class ParquetRelation(
5351

5452
self: Product =>
5553

56-
@transient override lazy val statistics = Statistics(
57-
// TODO: investigate getting encoded column statistics in the parquet file?
58-
sizeInBytes = {
59-
val hdfsPath = new Path(path)
60-
val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job())))
61-
math.max(fs.getContentSummary(hdfsPath).getLength, 1L) // TODO: in bytes or system-dependent?
62-
}
63-
)
64-
6554
/** Schema derived from ParquetFile */
6655
def parquetSchema: MessageType =
6756
ParquetTypesConverter

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -269,18 +269,18 @@ private[hive] case class MetastoreRelation
269269
new Partition(hiveQlTable, p)
270270
}
271271

272-
// TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use?
273-
@transient override lazy val statistics = new Statistics {
272+
@transient override lazy val statistics = Statistics(
274273
// TODO: check if this estimate is valid for tables after partition pruning.
275-
// Size getters adapted from SizeBasedBigTableSelectorForAutoSMJ.java in Hive (version 0.13).
276-
override val sizeInBytes: Long =
277-
math.max(maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path), 1L)
278-
279-
private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = {
280-
val res = try { Some(size.toLong) } catch { case _: Exception => None }
281-
res.getOrElse { path.getFileSystem(conf).getContentSummary(path).getLength }
274+
sizeInBytes = {
275+
// NOTE: kind of hacky, but this should be relatively cheap if parameters for the table are
276+
// populated into the metastore. An alternative would be going through Hadoop's FileSystem
277+
// API, which can be expensive if a lot of RPCs are involved. Besides `totalSize`, there are
278+
// also `numFiles`, `numRows`, `rawDataSize` keys we can look at in the future.
279+
val sizeMaybeFromMetastore =
280+
Option(hiveQlTable.getParameters.get("totalSize")).map(_.toLong).getOrElse(-1L)
281+
math.max(sizeMaybeFromMetastore, 1L)
282282
}
283-
}
283+
)
284284

285285
val tableDesc = new TableDesc(
286286
Class.forName(hiveQlTable.getSerializationLib).asInstanceOf[Class[Deserializer]],

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,32 +23,16 @@ import org.apache.spark.sql.QueryTest
2323
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
2424
import org.apache.spark.sql.hive.test.TestHive
2525
import org.apache.spark.sql.hive.test.TestHive._
26-
import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTestData}
27-
import org.apache.spark.util.Utils
2826

2927
class StatisticsSuite extends QueryTest {
3028

31-
test("estimates the size of a test ParquetRelation") {
32-
ParquetTestData.writeFile()
33-
val testRDD = parquetFile(ParquetTestData.testDir.toString)
34-
35-
val sizes = testRDD.logicalPlan.collect { case j: ParquetRelation =>
36-
(j.statistics.sizeInBytes, j.newInstance.statistics.sizeInBytes)
37-
}
38-
assert(sizes.size === 1)
39-
assert(sizes(0)._1 == sizes(0)._2, "after .newInstance, estimates are different from before")
40-
assert(sizes(0)._1 > 1, "1 is the default, indicating the absence of a meaningful estimate")
41-
42-
Utils.deleteRecursively(ParquetTestData.testDir)
43-
}
44-
4529
test("estimates the size of a test MetastoreRelation") {
4630
val rdd = hql("""SELECT * FROM src""")
4731
val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
4832
mr.statistics.sizeInBytes
4933
}
5034
assert(sizes.size === 1)
51-
assert(sizes(0) > 1, "1 is the default, indicating the absence of a meaningful estimate")
35+
assert(sizes(0) == 5812, s"expected exact size 5812 for test table 'src', got ${sizes(0)}")
5236
}
5337

5438
test("auto converts to broadcast hash join, by size estimate of a relation") {
@@ -95,30 +79,6 @@ class StatisticsSuite extends QueryTest {
9579
after()
9680
}
9781

98-
/** Tests for ParquetRelation */
99-
val parquetQuery =
100-
"""SELECT a.mystring, b.myint
101-
|FROM psrc a
102-
|JOIN psrc b
103-
|ON a.mylong = 0 AND a.mylong = b.mylong""".stripMargin
104-
val parquetAnswer = Seq(("abc", 5))
105-
def parquetBefore(): Unit = {
106-
ParquetTestData.writeFile()
107-
val testRDD = parquetFile(ParquetTestData.testDir.toString)
108-
testRDD.registerAsTable("psrc")
109-
}
110-
def parquetAfter() = {
111-
Utils.deleteRecursively(ParquetTestData.testDir)
112-
reset()
113-
}
114-
mkTest(
115-
parquetBefore,
116-
parquetAfter,
117-
parquetQuery,
118-
parquetAnswer,
119-
implicitly[ClassTag[ParquetRelation]]
120-
)
121-
12282
/** Tests for MetastoreRelation */
12383
val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key"""
12484
val metastoreAnswer = Seq.fill(4)((238, "val_238", 238, "val_238"))

0 commit comments

Comments
 (0)