From 5afb9ebdf3ff2ae3321b89dd80f0207fe1e330a6 Mon Sep 17 00:00:00 2001 From: Yash Datta Date: Sat, 5 Sep 2015 00:25:19 +0530 Subject: [PATCH 1/3] SPARK-10451: Prevent unnecessary serializations in InMemoryColumnarTableScan --- .../columnar/InMemoryColumnarTableScan.scala | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 66d429bc06198..8575371a2c276 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -48,10 +48,10 @@ private[sql] case class InMemoryRelation( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, - child: SparkPlan, + @transient child: SparkPlan, tableName: Option[String])( - private var _cachedColumnBuffers: RDD[CachedBatch] = null, - private var _statistics: Statistics = null, + @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null, + @transient private var _statistics: Statistics = null, private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) extends LogicalPlan with MultiInstanceRelation { @@ -62,7 +62,7 @@ private[sql] case class InMemoryRelation( _batchStats } - val partitionStatistics = new PartitionStatistics(output) + @transient val partitionStatistics = new PartitionStatistics(output) private def computeSizeInBytes = { val sizeOfRow: Expression = @@ -196,7 +196,7 @@ private[sql] case class InMemoryRelation( private[sql] case class InMemoryColumnarTableScan( attributes: Seq[Attribute], predicates: Seq[Expression], - relation: InMemoryRelation) + @transient relation: InMemoryRelation) extends LeafNode { override def output: Seq[Attribute] = attributes @@ -205,7 +205,7 @@ private[sql] case class InMemoryColumnarTableScan( // Returned filter predicate should return false iff it is impossible for the input expression // to evaluate to `true' based on statistics collected about this partition batch. - val buildFilter: PartialFunction[Expression, Expression] = { + @transient val buildFilter: PartialFunction[Expression, Expression] = { case And(lhs: Expression, rhs: Expression) if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) => (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _) @@ -235,7 +235,7 @@ private[sql] case class InMemoryColumnarTableScan( case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 } - val partitionFilters: Seq[Expression] = { + @transient val partitionFilters: Seq[Expression] = { predicates.flatMap { p => val filter = buildFilter.lift(p) val boundFilter = @@ -268,16 +268,20 @@ private[sql] case class InMemoryColumnarTableScan( readBatches.setValue(0) } - relation.cachedColumnBuffers.mapPartitions { cachedBatchIterator => - val partitionFilter = newPredicate( - partitionFilters.reduceOption(And).getOrElse(Literal(true)), - relation.partitionStatistics.schema) + val schemaIndex = relation.partitionStatistics.schema.zipWithIndex + val relOutput = relation.output + val buffers = relation.cachedColumnBuffers + val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + relation.partitionStatistics.schema) + + buffers.mapPartitions { cachedBatchIterator => // Find the ordinals and data types of the requested columns. If none are requested, use the // narrowest (the field with minimum default element size). val (requestedColumnIndices, requestedColumnDataTypes) = if (attributes.isEmpty) { val (narrowestOrdinal, narrowestDataType) = - relation.output.zipWithIndex.map { case (a, ordinal) => + relOutput.zipWithIndex.map { case (a, ordinal) => ordinal -> a.dataType } minBy { case (_, dataType) => ColumnType(dataType).defaultSize @@ -285,7 +289,7 @@ private[sql] case class InMemoryColumnarTableScan( Seq(narrowestOrdinal) -> Seq(narrowestDataType) } else { attributes.map { a => - relation.output.indexWhere(_.exprId == a.exprId) -> a.dataType + relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType }.unzip } @@ -328,7 +332,7 @@ private[sql] case class InMemoryColumnarTableScan( if (inMemoryPartitionPruningEnabled) { cachedBatchIterator.filter { cachedBatch => if (!partitionFilter(cachedBatch.stats)) { - def statsString: String = relation.partitionStatistics.schema.zipWithIndex.map { + def statsString: String = schemaIndex.map { case (a, i) => val value = cachedBatch.stats.get(i, a.dataType) s"${a.name}: $value" From 1587a8bd0e1f4fbdd5f388a9a2e46fcc377f7f93 Mon Sep 17 00:00:00 2001 From: Yash Datta Date: Sat, 5 Sep 2015 13:34:14 +0530 Subject: [PATCH 2/3] SPARK-10451: partitionFilters cannot be out of scope because of codegen SpecificPredicate class --- .../sql/columnar/InMemoryColumnarTableScan.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 8575371a2c276..6b6ada09ff3b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -235,7 +235,7 @@ private[sql] case class InMemoryColumnarTableScan( case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 } - @transient val partitionFilters: Seq[Expression] = { + val partitionFilters: Seq[Expression] = { predicates.flatMap { p => val filter = buildFilter.lift(p) val boundFilter = @@ -268,15 +268,16 @@ private[sql] case class InMemoryColumnarTableScan( readBatches.setValue(0) } - val schemaIndex = relation.partitionStatistics.schema.zipWithIndex + val schema = relation.partitionStatistics.schema + val schemaIndex = schema.zipWithIndex val relOutput = relation.output val buffers = relation.cachedColumnBuffers + buffers.mapPartitions { cachedBatchIterator => val partitionFilter = newPredicate( partitionFilters.reduceOption(And).getOrElse(Literal(true)), - relation.partitionStatistics.schema) + schema) - buffers.mapPartitions { cachedBatchIterator => // Find the ordinals and data types of the requested columns. If none are requested, use the // narrowest (the field with minimum default element size). val (requestedColumnIndices, requestedColumnDataTypes) = if (attributes.isEmpty) { @@ -300,7 +301,7 @@ private[sql] case class InMemoryColumnarTableScan( // Build column accessors val columnAccessors = requestedColumnIndices.map { batchColumnIndex => ColumnAccessor( - relation.output(batchColumnIndex).dataType, + relOutput(batchColumnIndex).dataType, ByteBuffer.wrap(cachedBatch.buffers(batchColumnIndex))) } From f01f989180c844ce660d96e4f38a5d270462d8fc Mon Sep 17 00:00:00 2001 From: Yash Datta Date: Fri, 11 Sep 2015 14:43:20 +0530 Subject: [PATCH 3/3] SPARK-10451: Incorporate review comments --- .../apache/spark/sql/columnar/InMemoryColumnarTableScan.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 6b6ada09ff3b6..d7e145f9c2bb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -268,6 +268,8 @@ private[sql] case class InMemoryColumnarTableScan( readBatches.setValue(0) } + // Using these variables here to avoid serialization of entire objects (if referenced directly) + // within the map Partitions closure. val schema = relation.partitionStatistics.schema val schemaIndex = schema.zipWithIndex val relOutput = relation.output