Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.CollectionAccumulator
import org.apache.spark.util.AccumulatorV2


object InMemoryRelation {
Expand All @@ -44,6 +44,70 @@ object InMemoryRelation {
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
}

/**
* Accumulator for storing column stats. Summarizes the data in the driver to curb the amount of
* memory being used. Only "sizeInBytes" for each column is kept.
*/
class ColStatsAccumulator(originalOutput: Seq[Attribute])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make the class name explicitly say that it is for sizeInBytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to leave it generic in case other stats need to be added later, but not worries, I can change the name.

extends AccumulatorV2[Seq[ColumnStats], Array[Long]] {

private var stats: Array[Long] = null

override def isZero: Boolean = stats == null

override def copy(): AccumulatorV2[Seq[ColumnStats], Array[Long]] = {
val newAcc = new ColStatsAccumulator(originalOutput)
newAcc.stats = stats
newAcc
}

override def reset(): Unit = {
stats = null
}

override def add(update: Seq[ColumnStats]): Unit = {
if (update != null) {
require(isZero || stats.length == update.size, "Input stats doesn't match expected size.")

val newStats = new Array[Long](update.size)

update.toIndexedSeq.zipWithIndex.foreach { case (colStats, idx) =>
val current = if (!isZero) stats(idx) else 0L
newStats(idx) = current + colStats.sizeInBytes
}

stats = newStats
}
}

override def merge(other: AccumulatorV2[Seq[ColumnStats], Array[Long]]): Unit = {
if (other.value != null) {
require(isZero || stats.length == other.value.length,
"Merging accumulators of different size.")

val newStats = new Array[Long](other.value.length)
for (i <- 0 until other.value.size) {
val current = if (!isZero) stats(i) else 0L
newStats(i) = current + other.value(i)
}
stats = newStats
}
}

override def value: Array[Long] = stats

/**
* Calculate the size of the relation for a given output. Adds up all the known column sizes
* that match the desired output.
*/
def sizeForOutput(output: Seq[Attribute]): Long = {
originalOutput.toIndexedSeq.zipWithIndex.map { case (a, idx) =>
val count = output.count(a.semanticEquals)
stats(idx) * count
}.fold(0L)(_ + _)
}

}

/**
* CachedBatch is a cached batch of rows.
Expand All @@ -63,8 +127,7 @@ case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: CollectionAccumulator[InternalRow] =
child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
_batchStats: ColStatsAccumulator = null)
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
Expand All @@ -73,22 +136,23 @@ case class InMemoryRelation(

@transient val partitionStatistics = new PartitionStatistics(output)

val batchStats = if (_batchStats != null) {
_batchStats
} else {
val _newStats = new ColStatsAccumulator(output)
child.sqlContext.sparkContext.register(_newStats)
_newStats
}

override lazy val statistics: Statistics = {
if (batchStats.value.isEmpty) {
if (batchStats.isZero) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
// Underlying columnar RDD has been materialized, required information has also been
// collected via the `batchStats` accumulator.
val sizeOfRow: Expression =
BindReferences.bindReference(
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
partitionStatistics.schema)

val sizeInBytes =
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
Statistics(sizeInBytes = sizeInBytes)
Statistics(sizeInBytes = batchStats.sizeForOutput(output))
}
}

Expand Down Expand Up @@ -139,13 +203,13 @@ case class InMemoryRelation(
rowCount += 1
}

val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))

val stats = columnBuilders.map(_.columnStats)
batchStats.add(stats)

val statsRow = InternalRow.fromSeq(stats.map(_.collectedStatistics).flatMap(_.values))
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
}, statsRow)
}

def hasNext: Boolean = rowIterator.hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,29 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}

test("SPARK-17549: cached table size should be correctly calculated") {
val data = spark.sparkContext.parallelize(1 to 10, 5).map { i => (i, i.toLong) }
.toDF("col1", "col2")
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)

// Materialize the data.
val expectedAnswer = data.collect()
checkAnswer(cached, expectedAnswer)

// Check that the right size was calculated.
val expectedColSizes = expectedAnswer.size * (INT.defaultSize + LONG.defaultSize)
assert(cached.statistics.sizeInBytes === expectedColSizes)

// Create a projection of the cached data and make sure the statistics are correct.
val projected = cached.withOutput(Seq(plan.output.last))
assert(projected.statistics.sizeInBytes === expectedAnswer.size * LONG.defaultSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I understand the last two parts. After we cache the dataset, I am not sure if we can change the number of output columns (this test) or the data types (the next one).

If we do a project on the cached dataset, we will see a project operator on top of the InMemoryRelation.

I am wondering what kinds of queries can cause this problems?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I looked again at a heap dump with a couple of cached relations and you're right; I had misinterpreted the previous data. I'll remove these tests and simplify the code.

Still I'd be a little more comfortable if there was an assert in InMemoryRelation.withOutput that the new output at least is of the same size as the previous one...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... in that case, isn't my previous patch correct? (#15112)

My worry about that patch was multiple cached relations with different outputs sharing the same accumulator. But if that doesn't happen, then that patch is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai given the above, is it ok if I just revert your revert of my previous patch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see. Sorry, I may have missed something. How to reproduce the problem that led us to revert the previous PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me check with @liancheng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked the code. The output of an InMemoryRelation always represent the materialized dataset. So, it should not be a set of the underlying dataset's column set. When we scan this relation in InMemoryTableScanExec, we will push the selection to the scan.

So, even we use withOutput in CacheManager's useCachedData, we should be fine to still use the original stats because we are not changing the dataset. If you look at the implementation of this method

def useCachedData(plan: LogicalPlan): LogicalPlan = {
    plan transformDown {
      case currentFragment =>
        lookupCachedData(currentFragment)
          .map(_.cachedRepresentation.withOutput(currentFragment.output))
          .getOrElse(currentFragment)
    }
  }

lookupCachedData is implemented using sameResult. So, we are just applying a equivalent output (attributes in this output list may have cosmetic variations but they should be equivalent to the original attributes of this dataset).

Although we may have different outputs, they are still representing the same dataset. So, seems it is fine if they have the same accumulator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming. So we should be fine with the previous patch.


// Create a silly projection that repeats columns of the first cached relation, and
// check that the size is calculated correctly.
val projected2 = cached.withOutput(Seq(plan.output.last, plan.output.last))
assert(projected2.statistics.sizeInBytes === 2 * expectedAnswer.size * LONG.defaultSize)
}

}