Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ private[sql] case class InMemoryRelation(
useCompression: Boolean,
batchSize: Int,
storageLevel: StorageLevel,
child: SparkPlan,
@transient child: SparkPlan,
tableName: Option[String])(
private var _cachedColumnBuffers: RDD[CachedBatch] = null,
private var _statistics: Statistics = null,
@transient private var _cachedColumnBuffers: RDD[CachedBatch] = null,
@transient private var _statistics: Statistics = null,
private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
extends LogicalPlan with MultiInstanceRelation {

Expand All @@ -62,7 +62,7 @@ private[sql] case class InMemoryRelation(
_batchStats
}

val partitionStatistics = new PartitionStatistics(output)
@transient val partitionStatistics = new PartitionStatistics(output)

private def computeSizeInBytes = {
val sizeOfRow: Expression =
Expand Down Expand Up @@ -196,7 +196,7 @@ private[sql] case class InMemoryRelation(
private[sql] case class InMemoryColumnarTableScan(
attributes: Seq[Attribute],
predicates: Seq[Expression],
relation: InMemoryRelation)
@transient relation: InMemoryRelation)
extends LeafNode {

override def output: Seq[Attribute] = attributes
Expand All @@ -205,7 +205,7 @@ private[sql] case class InMemoryColumnarTableScan(

// Returned filter predicate should return false iff it is impossible for the input expression
// to evaluate to `true' based on statistics collected about this partition batch.
val buildFilter: PartialFunction[Expression, Expression] = {
@transient val buildFilter: PartialFunction[Expression, Expression] = {
case And(lhs: Expression, rhs: Expression)
if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
(buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _)
Expand Down Expand Up @@ -268,24 +268,31 @@ private[sql] case class InMemoryColumnarTableScan(
readBatches.setValue(0)
}

relation.cachedColumnBuffers.mapPartitions { cachedBatchIterator =>
val partitionFilter = newPredicate(
partitionFilters.reduceOption(And).getOrElse(Literal(true)),
relation.partitionStatistics.schema)
// Using these variables here to avoid serialization of entire objects (if referenced directly)
// within the map Partitions closure.
val schema = relation.partitionStatistics.schema
val schemaIndex = schema.zipWithIndex
val relOutput = relation.output
val buffers = relation.cachedColumnBuffers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a comment above this hunk to indicate that these variables are added to avoid unnecessary serialization?


buffers.mapPartitions { cachedBatchIterator =>
val partitionFilter = newPredicate(
partitionFilters.reduceOption(And).getOrElse(Literal(true)),
schema)

// Find the ordinals and data types of the requested columns. If none are requested, use the
// narrowest (the field with minimum default element size).
val (requestedColumnIndices, requestedColumnDataTypes) = if (attributes.isEmpty) {
val (narrowestOrdinal, narrowestDataType) =
relation.output.zipWithIndex.map { case (a, ordinal) =>
relOutput.zipWithIndex.map { case (a, ordinal) =>
ordinal -> a.dataType
} minBy { case (_, dataType) =>
ColumnType(dataType).defaultSize
}
Seq(narrowestOrdinal) -> Seq(narrowestDataType)
} else {
attributes.map { a =>
relation.output.indexWhere(_.exprId == a.exprId) -> a.dataType
relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType
}.unzip
}

Expand All @@ -296,7 +303,7 @@ private[sql] case class InMemoryColumnarTableScan(
// Build column accessors
val columnAccessors = requestedColumnIndices.map { batchColumnIndex =>
ColumnAccessor(
relation.output(batchColumnIndex).dataType,
relOutput(batchColumnIndex).dataType,
ByteBuffer.wrap(cachedBatch.buffers(batchColumnIndex)))
}

Expand Down Expand Up @@ -328,7 +335,7 @@ private[sql] case class InMemoryColumnarTableScan(
if (inMemoryPartitionPruningEnabled) {
cachedBatchIterator.filter { cachedBatch =>
if (!partitionFilter(cachedBatch.stats)) {
def statsString: String = relation.partitionStatistics.schema.zipWithIndex.map {
def statsString: String = schemaIndex.map {
case (a, i) =>
val value = cachedBatch.stats.get(i, a.dataType)
s"${a.name}: $value"
Expand Down