Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.columnar

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils

import org.apache.spark.network.util.JavaUtils
Expand All @@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.LongAccumulator
import org.apache.spark.util.CollectionAccumulator


object InMemoryRelation {
Expand Down Expand Up @@ -61,7 +63,8 @@ case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
val batchStats: CollectionAccumulator[InternalRow] =
child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
Expand All @@ -71,12 +74,21 @@ case class InMemoryRelation(
@transient val partitionStatistics = new PartitionStatistics(output)

override lazy val statistics: Statistics = {
if (batchStats.value == 0L) {
if (batchStats.value.isEmpty) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
Statistics(sizeInBytes = batchStats.value.longValue)
// Underlying columnar RDD has been materialized, required information has also been
// collected via the `batchStats` accumulator.
val sizeOfRow: Expression =
BindReferences.bindReference(
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
partitionStatistics.schema)

val sizeInBytes =
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
Statistics(sizeInBytes = sizeInBytes)
}
}

Expand Down Expand Up @@ -127,10 +139,10 @@ case class InMemoryRelation(
rowCount += 1
}

batchStats.add(totalSize)

val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))

batchStats.add(stats)
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,4 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}

test("SPARK-17549: cached table size should be correctly calculated") {
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)

// Materialize the data.
val expectedAnswer = data.collect()
checkAnswer(cached, expectedAnswer)

// Check that the right size was calculated.
assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
}

}