Skip to content

Commit 2bd37ce

Browse files
committed
[SPARK-17549][SQL] Revert "[] Only collect table size stat in driver for cached relation."
This reverts commit 39e2bad because of the problem mentioned at https://issues.apache.org/jira/browse/SPARK-17549?focusedCommentId=15505060&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15505060 Author: Yin Huai <[email protected]> Closes #15157 from yhuai/revert-SPARK-17549. (cherry picked from commit 9ac68db) Signed-off-by: Yin Huai <[email protected]>
1 parent e76f4f4 commit 2bd37ce

File tree

2 files changed

+18
-20
lines changed

2 files changed

+18
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.columnar
1919

20+
import scala.collection.JavaConverters._
21+
2022
import org.apache.commons.lang3.StringUtils
2123

2224
import org.apache.spark.network.util.JavaUtils
@@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
2931
import org.apache.spark.sql.catalyst.plans.logical.Statistics
3032
import org.apache.spark.sql.execution.SparkPlan
3133
import org.apache.spark.storage.StorageLevel
32-
import org.apache.spark.util.LongAccumulator
34+
import org.apache.spark.util.CollectionAccumulator
3335

3436

3537
object InMemoryRelation {
@@ -61,7 +63,8 @@ case class InMemoryRelation(
6163
@transient child: SparkPlan,
6264
tableName: Option[String])(
6365
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
64-
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
66+
val batchStats: CollectionAccumulator[InternalRow] =
67+
child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
6568
extends logical.LeafNode with MultiInstanceRelation {
6669

6770
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
@@ -71,12 +74,21 @@ case class InMemoryRelation(
7174
@transient val partitionStatistics = new PartitionStatistics(output)
7275

7376
override lazy val statistics: Statistics = {
74-
if (batchStats.value == 0L) {
77+
if (batchStats.value.isEmpty) {
7578
// Underlying columnar RDD hasn't been materialized, no useful statistics information
7679
// available, return the default statistics.
7780
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
7881
} else {
79-
Statistics(sizeInBytes = batchStats.value.longValue)
82+
// Underlying columnar RDD has been materialized, required information has also been
83+
// collected via the `batchStats` accumulator.
84+
val sizeOfRow: Expression =
85+
BindReferences.bindReference(
86+
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
87+
partitionStatistics.schema)
88+
89+
val sizeInBytes =
90+
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
91+
Statistics(sizeInBytes = sizeInBytes)
8092
}
8193
}
8294

@@ -127,10 +139,10 @@ case class InMemoryRelation(
127139
rowCount += 1
128140
}
129141

130-
batchStats.add(totalSize)
131-
132142
val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
133143
.flatMap(_.values))
144+
145+
batchStats.add(stats)
134146
CachedBatch(rowCount, columnBuilders.map { builder =>
135147
JavaUtils.bufferToArray(builder.build())
136148
}, stats)

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -232,18 +232,4 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
232232
val columnTypes2 = List.fill(length2)(IntegerType)
233233
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
234234
}
235-
236-
test("SPARK-17549: cached table size should be correctly calculated") {
237-
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
238-
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
239-
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
240-
241-
// Materialize the data.
242-
val expectedAnswer = data.collect()
243-
checkAnswer(cached, expectedAnswer)
244-
245-
// Check that the right size was calculated.
246-
assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
247-
}
248-
249235
}

0 commit comments

Comments
 (0)