From c3f1a9bb4b53ceef8ad3fb31b3164665549f8bc9 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 09:37:37 -0500 Subject: [PATCH 01/36] improve the doc for "spark.memory.offHeap.size" --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index d3c358bb74173..09f77d700df56 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1055,7 +1055,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory in bytes which can be used for off-heap allocation. + The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 6a2b3ca56d26f5fb03d165466e9b6edadeb0adac Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 14:00:16 -0500 Subject: [PATCH 02/36] fix --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 09f77d700df56..d3c358bb74173 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1055,7 +1055,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. + The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 6e37fa2d06107c175cede4ff1b1a65dd14de7d5a Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 27 Oct 2017 10:36:08 -0700 Subject: [PATCH 03/36] add configuration for partition_metadata --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 21e4685fcc456..eeec6625c5394 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -137,10 +137,18 @@ object SQLConf { val IN_MEMORY_PARTITION_PRUNING = buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning") .internal() - .doc("When true, enable partition pruning for in-memory columnar tables.") + .doc("When true, enable partition batch pruning for in-memory columnar tables.") .booleanConf .createWithDefault(true) + val IN_MEMORY_PARTITION_METADATA = + buildConf("spark.sql.inMemoryColumnarStorage.partitionMetadata") + .internal() + .doc("When true, spark sql will collect partition level stats for in-memory columnar tables and" + + " do coarse-grained pruning") + .booleanConf + .createWithDefault(false) + val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin") .internal() .doc("When true, prefer sort merge join over shuffle hash join.") From aa7066038e23cea7c5eb720aa70cd1a84d6f751f Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 27 Oct 2017 12:49:33 -0700 Subject: [PATCH 04/36] framework of CachedColumnarRDD --- .../columnar/CachedColumnarRDD.scala | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala new file mode 100644 index 0000000000000..6ddcf519873c7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import scala.reflect.ClassTag + +import org.apache.spark.{Dependency, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow + +private[columnar] class CachedColumnarRDDPartition( + partitionIndex: Int, + columnnStats: Array[InternalRow]) extends Partition { + + override def index: Int = partitionIndex + + def columnStats: Array[InternalRow] = columnnStats +} + +private[columnar] class CachedColumnarRDD[T: ClassTag]( + @transient private var _sc: SparkContext, + @transient private var deps: Seq[Dependency[_]], + dataRDD: RDD[T], + partitionStats: Array[Array[InternalRow]]) extends RDD[T](_sc, deps) { + + /** + * :: DeveloperApi :: + * Implemented by subclasses to compute a given partition. + */ + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + Iterator() + } + + /** + * Implemented by subclasses to return the set of partitions in this RDD. This method will only + * be called once, so it is safe to implement a time-consuming computation in it. + * + * The partitions in this array must satisfy the following property: + * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }` + */ + override protected def getPartitions: Array[Partition] = { + partitionStats.zipWithIndex.map { + case (statsRow, index) => + new CachedColumnarRDDPartition(index, statsRow) + } + } +} From d1380821d4a9cbb45f5b28c775939e0d130f0922 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 27 Oct 2017 15:53:38 -0700 Subject: [PATCH 05/36] code framework --- .../columnar/CachedColumnarRDD.scala | 14 +- .../execution/columnar/InMemoryRelation.scala | 196 ++++++++++++++---- .../columnar/InMemoryTableScanExec.scala | 59 ++++-- 3 files changed, 196 insertions(+), 73 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 6ddcf519873c7..5af2e26563fff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -25,25 +25,25 @@ import org.apache.spark.sql.catalyst.InternalRow private[columnar] class CachedColumnarRDDPartition( partitionIndex: Int, - columnnStats: Array[InternalRow]) extends Partition { + columnnStats: InternalRow) extends Partition { override def index: Int = partitionIndex - def columnStats: Array[InternalRow] = columnnStats + def columnStats: InternalRow = columnnStats } -private[columnar] class CachedColumnarRDD[T: ClassTag]( +private[columnar] class CachedColumnarRDD( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]], - dataRDD: RDD[T], - partitionStats: Array[Array[InternalRow]]) extends RDD[T](_sc, deps) { + dataRDD: RDD[CachedBatch], + partitionStats: Array[InternalRow]) extends RDD[CachedBatch](_sc, deps) { /** * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. */ - override def compute(split: Partition, context: TaskContext): Iterator[T] = { - Iterator() + override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { + dataRDD.iterator(split, context) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index a1c62a729900e..248ef9008d57a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -50,7 +50,138 @@ object InMemoryRelation { * @param stats The stat of columns */ private[columnar] -case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) +case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: Option[InternalRow]) + +private[columnar] class CachedPartitionIterator( + rowIterator: Iterator[InternalRow], + output: Seq[Attribute], + batchSize: Int, + useCompression: Boolean, + batchStats: LongAccumulator) extends Iterator[(CachedBatch, InternalRow)] { + + def next(): (CachedBatch, InternalRow) = { + val columnBuilders = output.map { attribute => + ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) + }.toArray + + var rowCount = 0 + var totalSize = 0L + while (rowIterator.hasNext) { + val row = rowIterator.next() + + // Added for SPARK-6082. This assertion can be useful for scenarios when something + // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM + // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat + // hard to decipher. + assert( + row.numFields == columnBuilders.length, + s"Row column number mismatch, expected ${output.size} columns, " + + s"but got ${row.numFields}." + + s"\nRow content: $row") + + var i = 0 + totalSize = 0 + while (i < row.numFields) { + columnBuilders(i).appendFrom(row, i) + totalSize += columnBuilders(i).columnStats.sizeInBytes + i += 1 + } + rowCount += 1 + } + + batchStats.add(totalSize) + val stats = InternalRow.fromSeq(columnBuilders.flatMap(_.columnStats.collectedStatistics)) + (CachedBatch(rowCount, columnBuilders.map { builder => + JavaUtils.bufferToArray(builder.build()) + }, None), stats) + /* + while (rowIterator.hasNext && rowCount < batchSize + && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { + val row = rowIterator.next() + + // Added for SPARK-6082. This assertion can be useful for scenarios when something + // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM + // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat + // hard to decipher. + assert( + row.numFields == columnBuilders.length, + s"Row column number mismatch, expected ${output.size} columns, " + + s"but got ${row.numFields}." + + s"\nRow content: $row") + + var i = 0 + totalSize = 0 + while (i < row.numFields) { + columnBuilders(i).appendFrom(row, i) + totalSize += columnBuilders(i).columnStats.sizeInBytes + i += 1 + } + rowCount += 1 + } + + batchStats.add(totalSize) + + val stats = InternalRow.fromSeq( + columnBuilders.flatMap(_.columnStats.collectedStatistics)) + CachedBatch(rowCount, columnBuilders.map { builder => + JavaUtils.bufferToArray(builder.build()) + }, stats) + */ + } + + def hasNext: Boolean = rowIterator.hasNext +} + +private[columnar] class CachedBatchIterator( + rowIterator: Iterator[InternalRow], + output: Seq[Attribute], + batchSize: Int, + useCompression: Boolean, + batchStats: LongAccumulator) extends Iterator[CachedBatch] { + + def next(): CachedBatch = { + val columnBuilders = output.map { attribute => + ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) + }.toArray + + var rowCount = 0 + var totalSize = 0L + + while (rowIterator.hasNext && rowCount < batchSize + && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { + val row = rowIterator.next() + + // Added for SPARK-6082. This assertion can be useful for scenarios when something + // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM + // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat + // hard to decipher. + assert( + row.numFields == columnBuilders.length, + s"Row column number mismatch, expected ${output.size} columns, " + + s"but got ${row.numFields}." + + s"\nRow content: $row") + + var i = 0 + totalSize = 0 + while (i < row.numFields) { + columnBuilders(i).appendFrom(row, i) + totalSize += columnBuilders(i).columnStats.sizeInBytes + i += 1 + } + rowCount += 1 + } + + batchStats.add(totalSize) + + val stats = InternalRow.fromSeq( + columnBuilders.flatMap(_.columnStats.collectedStatistics)) + CachedBatch(rowCount, columnBuilders.map { builder => + JavaUtils.bufferToArray(builder.build()) + }, Some(stats)) + } + + def hasNext: Boolean = rowIterator.hasNext +} case class InMemoryRelation( output: Seq[Attribute], @@ -69,6 +200,8 @@ case class InMemoryRelation( @transient val partitionStatistics = new PartitionStatistics(output) + private val usePartitionLevelMetadata = conf.inMemoryPartitionMetadata + override def computeStats(): Statistics = { if (batchStats.value == 0L) { // Underlying columnar RDD hasn't been materialized, no useful statistics information @@ -87,56 +220,29 @@ case class InMemoryRelation( private def buildBuffers(): Unit = { val output = child.output - val cached = child.execute().mapPartitionsInternal { rowIterator => - new Iterator[CachedBatch] { - def next(): CachedBatch = { - val columnBuilders = output.map { attribute => - ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) - }.toArray - - var rowCount = 0 - var totalSize = 0L - while (rowIterator.hasNext && rowCount < batchSize - && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { - val row = rowIterator.next() - - // Added for SPARK-6082. This assertion can be useful for scenarios when something - // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM - // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat - // hard to decipher. - assert( - row.numFields == columnBuilders.length, - s"Row column number mismatch, expected ${output.size} columns, " + - s"but got ${row.numFields}." + - s"\nRow content: $row") - - var i = 0 - totalSize = 0 - while (i < row.numFields) { - columnBuilders(i).appendFrom(row, i) - totalSize += columnBuilders(i).columnStats.sizeInBytes - i += 1 - } - rowCount += 1 - } - - batchStats.add(totalSize) - - val stats = InternalRow.fromSeq( - columnBuilders.flatMap(_.columnStats.collectedStatistics)) - CachedBatch(rowCount, columnBuilders.map { builder => - JavaUtils.bufferToArray(builder.build()) - }, stats) - } - def hasNext: Boolean = rowIterator.hasNext + // TODO: + val batchedRDD = child.execute().mapPartitionsInternal { rowIterator => + if (!usePartitionLevelMetadata) { + new CachedBatchIterator(rowIterator, output, batchSize, useCompression, batchStats) + } else { + new CachedPartitionIterator(rowIterator, output, batchSize, useCompression, batchStats) } - }.persist(storageLevel) + } + + val cached = if (!usePartitionLevelMetadata) { + batchedRDD.persist(storageLevel) + } else { + val r = batchedRDD.map(_.asInstanceOf[(CachedBatch, InternalRow)]) + val partitionLevelStats = r.map(_._2).collect() + new CachedColumnarRDD(batchedRDD.sparkContext, batchedRDD.dependencies, r.map(_._1), + partitionLevelStats) + } cached.setName( tableName.map(n => s"In-memory table $n") .getOrElse(StringUtils.abbreviate(child.toString, 1024))) - _cachedColumnBuffers = cached + _cachedColumnBuffers = cached.asInstanceOf[RDD[CachedBatch]] } def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 2ae3f35eb1da1..7b08bb6dae8dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -187,30 +188,46 @@ case class InMemoryTableScanExec( val schemaIndex = schema.zipWithIndex val buffers = relation.cachedColumnBuffers - buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) => - val partitionFilter = newPredicate( - partitionFilters.reduceOption(And).getOrElse(Literal(true)), - schema) - partitionFilter.initialize(index) - - // Do partition batch pruning if enabled - if (inMemoryPartitionPruningEnabled) { - cachedBatchIterator.filter { cachedBatch => - if (!partitionFilter.eval(cachedBatch.stats)) { - logDebug { - val statsString = schemaIndex.map { case (a, i) => - val value = cachedBatch.stats.get(i, a.dataType) - s"${a.name}: $value" - }.mkString(", ") - s"Skipping partition based on stats $statsString" + if (sqlContext.conf.inMemoryPartitionMetadata) { + val cachedColumnarRDD = buffers.asInstanceOf[CachedColumnarRDD] + val partitions = cachedColumnarRDD.partitions.map(_.asInstanceOf[CachedColumnarRDDPartition]) + buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) => + val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + schema) + partitionFilter.initialize(index) + if (!partitionFilter.eval(partitions(index).columnStats)) { + Iterator() + } else { + cachedBatchIterator + } + } + } else { + buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) => + val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + schema) + partitionFilter.initialize(index) + + // Do partition batch pruning if enabled + if (inMemoryPartitionPruningEnabled) { + cachedBatchIterator.filter { cachedBatch => + if (!partitionFilter.eval(cachedBatch.stats.get)) { + logDebug { + val statsString = schemaIndex.map { case (a, i) => + val value = cachedBatch.stats.get.get(i, a.dataType) + s"${a.name}: $value" + }.mkString(", ") + s"Skipping partition based on stats $statsString" + } + false + } else { + true } - false - } else { - true } + } else { + cachedBatchIterator } - } else { - cachedBatchIterator } } } From a72d7798b17000250eda0bcc8ae726b7dce7aa0c Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 30 Oct 2017 13:41:18 -0700 Subject: [PATCH 06/36] remove cachedcolumnarbatchRDD --- .../apache/spark/sql/internal/SQLConf.scala | 2 + .../columnar/CachedColumnarRDD.scala | 62 --------------- .../execution/columnar/InMemoryRelation.scala | 71 ++++++----------- .../columnar/InMemoryTableScanExec.scala | 79 ++++++++++--------- 4 files changed, 66 insertions(+), 148 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index eeec6625c5394..b143c19529dda 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1142,6 +1142,8 @@ class SQLConf extends Serializable with Logging { def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) + def inMemoryPartitionMetadata: Boolean = getConf(IN_MEMORY_PARTITION_METADATA) + def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD) def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala deleted file mode 100644 index 5af2e26563fff..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.columnar - -import scala.reflect.ClassTag - -import org.apache.spark.{Dependency, Partition, SparkContext, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow - -private[columnar] class CachedColumnarRDDPartition( - partitionIndex: Int, - columnnStats: InternalRow) extends Partition { - - override def index: Int = partitionIndex - - def columnStats: InternalRow = columnnStats -} - -private[columnar] class CachedColumnarRDD( - @transient private var _sc: SparkContext, - @transient private var deps: Seq[Dependency[_]], - dataRDD: RDD[CachedBatch], - partitionStats: Array[InternalRow]) extends RDD[CachedBatch](_sc, deps) { - - /** - * :: DeveloperApi :: - * Implemented by subclasses to compute a given partition. - */ - override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { - dataRDD.iterator(split, context) - } - - /** - * Implemented by subclasses to return the set of partitions in this RDD. This method will only - * be called once, so it is safe to implement a time-consuming computation in it. - * - * The partitions in this array must satisfy the following property: - * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }` - */ - override protected def getPartitions: Array[Partition] = { - partitionStats.zipWithIndex.map { - case (statsRow, index) => - new CachedColumnarRDDPartition(index, statsRow) - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 248ef9008d57a..2f5cddd3d8df1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -57,9 +57,13 @@ private[columnar] class CachedPartitionIterator( output: Seq[Attribute], batchSize: Int, useCompression: Boolean, - batchStats: LongAccumulator) extends Iterator[(CachedBatch, InternalRow)] { + batchStats: LongAccumulator) extends Iterator[AnyRef] { - def next(): (CachedBatch, InternalRow) = { + private var partitionStats: InternalRow = _ + + private var fetchingFirstElement = true + + private def buildCachedBatch(): Option[CachedBatch] = { val columnBuilders = output.map { attribute => ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) }.toArray @@ -88,45 +92,24 @@ private[columnar] class CachedPartitionIterator( } rowCount += 1 } - + partitionStats = InternalRow.fromSeq(columnBuilders.flatMap(_.columnStats.collectedStatistics)) batchStats.add(totalSize) - val stats = InternalRow.fromSeq(columnBuilders.flatMap(_.columnStats.collectedStatistics)) - (CachedBatch(rowCount, columnBuilders.map { builder => + Some(CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) - }, None), stats) - /* - while (rowIterator.hasNext && rowCount < batchSize - && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { - val row = rowIterator.next() - - // Added for SPARK-6082. This assertion can be useful for scenarios when something - // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM - // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat - // hard to decipher. - assert( - row.numFields == columnBuilders.length, - s"Row column number mismatch, expected ${output.size} columns, " + - s"but got ${row.numFields}." + - s"\nRow content: $row") + }, Some(partitionStats))) + } - var i = 0 - totalSize = 0 - while (i < row.numFields) { - columnBuilders(i).appendFrom(row, i) - totalSize += columnBuilders(i).columnStats.sizeInBytes - i += 1 + def next(): AnyRef = { + if (partitionStats == null) { + buildCachedBatch().get + } else { + if (fetchingFirstElement) { + fetchingFirstElement = false + partitionStats + } else { + buildCachedBatch() } - rowCount += 1 } - - batchStats.add(totalSize) - - val stats = InternalRow.fromSeq( - columnBuilders.flatMap(_.columnStats.collectedStatistics)) - CachedBatch(rowCount, columnBuilders.map { builder => - JavaUtils.bufferToArray(builder.build()) - }, stats) - */ } def hasNext: Boolean = rowIterator.hasNext @@ -175,6 +158,7 @@ private[columnar] class CachedBatchIterator( val stats = InternalRow.fromSeq( columnBuilders.flatMap(_.columnStats.collectedStatistics)) + CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, Some(stats)) @@ -190,7 +174,7 @@ case class InMemoryRelation( storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( - @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, + @transient var _cachedColumnBuffers: RDD[AnyRef] = null, val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) extends logical.LeafNode with MultiInstanceRelation { @@ -230,19 +214,12 @@ case class InMemoryRelation( } } - val cached = if (!usePartitionLevelMetadata) { - batchedRDD.persist(storageLevel) - } else { - val r = batchedRDD.map(_.asInstanceOf[(CachedBatch, InternalRow)]) - val partitionLevelStats = r.map(_._2).collect() - new CachedColumnarRDD(batchedRDD.sparkContext, batchedRDD.dependencies, r.map(_._1), - partitionLevelStats) - } + val cached = batchedRDD.persist(storageLevel) cached.setName( tableName.map(n => s"In-memory table $n") .getOrElse(StringUtils.abbreviate(child.toString, 1024))) - _cachedColumnBuffers = cached.asInstanceOf[RDD[CachedBatch]] + _cachedColumnBuffers = cached } def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { @@ -263,7 +240,7 @@ case class InMemoryRelation( batchStats).asInstanceOf[this.type] } - def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers + def cachedColumnBuffers: RDD[AnyRef] = _cachedColumnBuffers override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, batchStats) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 7b08bb6dae8dd..acd057f3a644d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -21,11 +21,11 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => GenPredicate, _} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.execution.vectorized._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -181,52 +181,53 @@ case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning + private def filterCachedBatchesInternal( + cachedBatchIterator: Iterator[CachedBatch], + partitionStatsSchema: Seq[AttributeReference], + partitionFilter: GenPredicate): Iterator[CachedBatch] = { + val schemaIndex = partitionStatsSchema.zipWithIndex + cachedBatchIterator.filter { cachedBatch => + if (!partitionFilter.eval(cachedBatch.stats.get)) { + logDebug { + val statsString = schemaIndex.map { case (a, i) => + val value = cachedBatch.stats.get.get(i, a.dataType) + s"${a.name}: $value" + }.mkString(", ") + s"Skipping partition based on stats $statsString" + } + false + } else { + true + } + } + } + private def filteredCachedBatches(): RDD[CachedBatch] = { // Using these variables here to avoid serialization of entire objects (if referenced directly) // within the map Partitions closure. val schema = relation.partitionStatistics.schema - val schemaIndex = schema.zipWithIndex val buffers = relation.cachedColumnBuffers - if (sqlContext.conf.inMemoryPartitionMetadata) { - val cachedColumnarRDD = buffers.asInstanceOf[CachedColumnarRDD] - val partitions = cachedColumnarRDD.partitions.map(_.asInstanceOf[CachedColumnarRDDPartition]) - buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) => - val partitionFilter = newPredicate( - partitionFilters.reduceOption(And).getOrElse(Literal(true)), - schema) - partitionFilter.initialize(index) - if (!partitionFilter.eval(partitions(index).columnStats)) { - Iterator() - } else { - cachedBatchIterator - } - } - } else { - buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) => - val partitionFilter = newPredicate( - partitionFilters.reduceOption(And).getOrElse(Literal(true)), - schema) - partitionFilter.initialize(index) - - // Do partition batch pruning if enabled - if (inMemoryPartitionPruningEnabled) { - cachedBatchIterator.filter { cachedBatch => - if (!partitionFilter.eval(cachedBatch.stats.get)) { - logDebug { - val statsString = schemaIndex.map { case (a, i) => - val value = cachedBatch.stats.get.get(i, a.dataType) - s"${a.name}: $value" - }.mkString(", ") - s"Skipping partition based on stats $statsString" - } - false + buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) => + + val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + schema) + partitionFilter.initialize(index) + val (iterForPartitionCase, iterForDefault) = cachedBatchIterator.duplicate + if (!iterForDefault.hasNext) { + Iterator[CachedBatch]() + } else { + iterForPartitionCase.next() match { + case partitionStats: InternalRow => + if (!partitionFilter.eval(partitionStats)) { + Iterator[CachedBatch]() } else { - true + filterCachedBatchesInternal(iterForPartitionCase.map(_.asInstanceOf[CachedBatch]), + schema, partitionFilter) } - } - } else { - cachedBatchIterator + case _: CachedBatch => + iterForDefault.map(_.asInstanceOf[CachedBatch]) } } } From 0fe35f82fcd9175b409429c03c9f7b33712df8ae Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 30 Oct 2017 14:12:24 -0700 Subject: [PATCH 07/36] fix styly error --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b143c19529dda..a079b2042c48b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -144,8 +144,8 @@ object SQLConf { val IN_MEMORY_PARTITION_METADATA = buildConf("spark.sql.inMemoryColumnarStorage.partitionMetadata") .internal() - .doc("When true, spark sql will collect partition level stats for in-memory columnar tables and" + - " do coarse-grained pruning") + .doc("When true, spark sql will collect partition level stats for in-memory columnar" + + " tables and do coarse-grained pruning") .booleanConf .createWithDefault(false) From 9e342432a82cbe325f338bd787be6427002981e1 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 30 Oct 2017 17:01:53 -0700 Subject: [PATCH 08/36] temp --- .../columnar/CachedColumnarRDD.scala | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala new file mode 100644 index 0000000000000..81488558d3895 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark._ +import org.apache.spark.rdd.RDD + +class CachedColumnarRDD( + @transient private var _sc: SparkContext, + private var dataRDD: RDD[CachedBatch]) + extends RDD[AnyRef](_sc, Seq(new OneToOneDependency(dataRDD))) { + + override def compute(split: Partition, context: TaskContext): Iterator[AnyRef] = { + null + } + + override protected def getPartitions = { + null + } + + override private[spark] def getOrCompute(partition: Partition, context: TaskContext): + Iterator[AnyRef] = { + null + } +} From 677ca81709fa34b3cad13f5843b8f408e401476b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 1 Nov 2017 16:01:49 -0700 Subject: [PATCH 09/36] 'CachedColumnarRDD' --- .../org/apache/spark/storage/BlockId.scala | 10 ++- .../columnar/CachedColumnarRDD.scala | 70 +++++++++++++++++-- 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 7ac2c71c18eb3..e26a3c6aff035 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -31,7 +31,7 @@ import org.apache.spark.annotation.DeveloperApi * If your BlockId should be serializable, be sure to add it to the BlockId.apply() method. */ @DeveloperApi -sealed abstract class BlockId { +abstract class BlockId { /** A globally unique identifier for this Block. Can be used for ser/de. */ def name: String @@ -49,6 +49,11 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { override def name: String = "rdd_" + rddId + "_" + splitIndex } +@DeveloperApi +case class RDDPartitionMetadataBlockId(rddId: Int, splitIndex: Int) extends BlockId { + override def name: String = "rdd_" + rddId + "_" + splitIndex + ".metadata" +} + // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi @@ -103,6 +108,7 @@ class UnrecognizedBlockId(name: String) @DeveloperApi object BlockId { val RDD = "rdd_([0-9]+)_([0-9]+)".r + val PARTITION_METADATA = "rdd_([0-9]+)_([0-9]+).metadata".r val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r @@ -116,6 +122,8 @@ object BlockId { def apply(name: String): BlockId = name match { case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt) + case PARTITION_METADATA(rddId, splitIndex) => + RDDPartitionMetadataBlockId(rddId.toInt, splitIndex.toInt) case SHUFFLE(shuffleId, mapId, reduceId) => ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) case SHUFFLE_DATA(shuffleId, mapId, reduceId) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 81488558d3895..f97a8313d4bd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.columnar import org.apache.spark._ import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.storage.{RDDBlockId, RDDPartitionMetadataBlockId} class CachedColumnarRDD( @transient private var _sc: SparkContext, @@ -26,15 +28,73 @@ class CachedColumnarRDD( extends RDD[AnyRef](_sc, Seq(new OneToOneDependency(dataRDD))) { override def compute(split: Partition, context: TaskContext): Iterator[AnyRef] = { - null + val cachedBatch = dataRDD.iterator(split, context).next() + // put metadata to blockmanager + SparkEnv.get.blockManager.putSingle(RDDPartitionMetadataBlockId(id, split.index), + cachedBatch.stats.get, dataRDD.getStorageLevel) + Iterator(cachedBatch) } - override protected def getPartitions = { - null + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + private def fetchOrComputeCachedBatch(partition: Partition, context: TaskContext): + Iterator[CachedBatch] = { + // metadata block can be evicted by BlockManagers but we may still keep CachedBatch in memory + // so that we still need to try to fetch it from Cache + val blockId = RDDBlockId(id, partition.index) + SparkEnv.get.blockManager.getOrElseUpdate(blockId, getStorageLevel, elementClassTag, () => { + computeOrReadCheckpoint(partition, context) + }) match { + case Left(blockResult) => + val existingMetrics = context.taskMetrics().inputMetrics + existingMetrics.incBytesRead(blockResult.bytes) + new InterruptibleIterator[CachedBatch](context, + blockResult.data.asInstanceOf[Iterator[CachedBatch]]) { + override def next(): CachedBatch = { + existingMetrics.incRecordsRead(1) + delegate.next() + } + } + case Right(iter) => + new InterruptibleIterator(context, iter.asInstanceOf[Iterator[CachedBatch]]) + } } - override private[spark] def getOrCompute(partition: Partition, context: TaskContext): + override private[spark] def getOrCompute(split: Partition, context: TaskContext): Iterator[AnyRef] = { - null + val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) + // if metadata block is not contained + val metadataBlockOpt = SparkEnv.get.blockManager.get[InternalRow](metadataBlockId) + if (metadataBlockOpt.isDefined) { + val metadataBlock = metadataBlockOpt.get + new InterruptibleIterator[AnyRef](context, new Iterator[AnyRef] { + + var fetchingFirstElement = true + + var delegate: Iterator[CachedBatch] = _ + + override def hasNext: Boolean = { + if (fetchingFirstElement) { + true + } else { + delegate = fetchOrComputeCachedBatch(split, context) + delegate.hasNext + } + } + + override def next(): AnyRef = { + if (fetchingFirstElement) { + fetchingFirstElement = false + val mb = metadataBlock.data.next() + mb.asInstanceOf[InternalRow] + } else { + delegate.next() + } + } + }) + } else { + fetchOrComputeCachedBatch(split, context) + } + } } From df1d79620f7c8073b6bc1a119a245c3d5413ec71 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 1 Nov 2017 16:11:52 -0700 Subject: [PATCH 10/36] change types --- .../execution/columnar/InMemoryRelation.scala | 37 +++++++------------ .../columnar/InMemoryTableScanExec.scala | 4 +- 2 files changed, 15 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 2f5cddd3d8df1..16082ae4e4446 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -50,20 +50,16 @@ object InMemoryRelation { * @param stats The stat of columns */ private[columnar] -case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: Option[InternalRow]) +case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) private[columnar] class CachedPartitionIterator( rowIterator: Iterator[InternalRow], output: Seq[Attribute], batchSize: Int, useCompression: Boolean, - batchStats: LongAccumulator) extends Iterator[AnyRef] { - - private var partitionStats: InternalRow = _ - - private var fetchingFirstElement = true + batchStats: LongAccumulator) extends Iterator[CachedBatch] { - private def buildCachedBatch(): Option[CachedBatch] = { + private def buildCachedBatch(): CachedBatch = { val columnBuilders = output.map { attribute => ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) }.toArray @@ -92,24 +88,17 @@ private[columnar] class CachedPartitionIterator( } rowCount += 1 } - partitionStats = InternalRow.fromSeq(columnBuilders.flatMap(_.columnStats.collectedStatistics)) batchStats.add(totalSize) - Some(CachedBatch(rowCount, columnBuilders.map { builder => - JavaUtils.bufferToArray(builder.build()) - }, Some(partitionStats))) + + val stats = InternalRow.fromSeq( + columnBuilders.flatMap(_.columnStats.collectedStatistics)) + + CachedBatch(rowCount, columnBuilders.map { builder => + JavaUtils.bufferToArray(builder.build())}, stats) } - def next(): AnyRef = { - if (partitionStats == null) { - buildCachedBatch().get - } else { - if (fetchingFirstElement) { - fetchingFirstElement = false - partitionStats - } else { - buildCachedBatch() - } - } + def next(): CachedBatch = { + buildCachedBatch() } def hasNext: Boolean = rowIterator.hasNext @@ -161,7 +150,7 @@ private[columnar] class CachedBatchIterator( CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) - }, Some(stats)) + }, stats) } def hasNext: Boolean = rowIterator.hasNext @@ -214,7 +203,7 @@ case class InMemoryRelation( } } - val cached = batchedRDD.persist(storageLevel) + val cached = new CachedColumnarRDD(batchedRDD.sparkContext, batchedRDD).persist(storageLevel) cached.setName( tableName.map(n => s"In-memory table $n") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index acd057f3a644d..dbaa5ab758b24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -187,10 +187,10 @@ case class InMemoryTableScanExec( partitionFilter: GenPredicate): Iterator[CachedBatch] = { val schemaIndex = partitionStatsSchema.zipWithIndex cachedBatchIterator.filter { cachedBatch => - if (!partitionFilter.eval(cachedBatch.stats.get)) { + if (!partitionFilter.eval(cachedBatch.stats)) { logDebug { val statsString = schemaIndex.map { case (a, i) => - val value = cachedBatch.stats.get.get(i, a.dataType) + val value = cachedBatch.stats.get(i, a.dataType) s"${a.name}: $value" }.mkString(", ") s"Skipping partition based on stats $statsString" From 08fd0857024a192307e66b6c3cffb19ae879000b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 1 Nov 2017 16:16:58 -0700 Subject: [PATCH 11/36] fix compilation error --- .../apache/spark/sql/execution/columnar/CachedColumnarRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index f97a8313d4bd5..523a44b1248d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -31,7 +31,7 @@ class CachedColumnarRDD( val cachedBatch = dataRDD.iterator(split, context).next() // put metadata to blockmanager SparkEnv.get.blockManager.putSingle(RDDPartitionMetadataBlockId(id, split.index), - cachedBatch.stats.get, dataRDD.getStorageLevel) + cachedBatch.stats, dataRDD.getStorageLevel) Iterator(cachedBatch) } From d4fc2b772b60161a24892133bdae2ff28233250a Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 10:13:15 -0700 Subject: [PATCH 12/36] update --- .../columnar/CachedColumnarRDD.scala | 26 ++++++++++++------- .../execution/columnar/InMemoryRelation.scala | 11 +++----- .../columnar/InMemoryTableScanExec.scala | 7 ++--- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 523a44b1248d3..cce25d1beed17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -24,15 +24,24 @@ import org.apache.spark.storage.{RDDBlockId, RDDPartitionMetadataBlockId} class CachedColumnarRDD( @transient private var _sc: SparkContext, - private var dataRDD: RDD[CachedBatch]) + private var dataRDD: RDD[CachedBatch], + containsPartitionMetadata: Boolean) extends RDD[AnyRef](_sc, Seq(new OneToOneDependency(dataRDD))) { override def compute(split: Partition, context: TaskContext): Iterator[AnyRef] = { - val cachedBatch = dataRDD.iterator(split, context).next() - // put metadata to blockmanager - SparkEnv.get.blockManager.putSingle(RDDPartitionMetadataBlockId(id, split.index), - cachedBatch.stats, dataRDD.getStorageLevel) - Iterator(cachedBatch) + if (containsPartitionMetadata) { + val parentIterator = dataRDD.iterator(split, context) + if (!parentIterator.hasNext) { + Iterator() + } else { + val cachedBatch = parentIterator.next() + SparkEnv.get.blockManager.putSingle(RDDPartitionMetadataBlockId(id, split.index), + cachedBatch.stats, dataRDD.getStorageLevel) + Iterator(cachedBatch) + } + } else { + firstParent.iterator(split, context) + } } override protected def getPartitions: Array[Partition] = dataRDD.partitions @@ -63,15 +72,14 @@ class CachedColumnarRDD( override private[spark] def getOrCompute(split: Partition, context: TaskContext): Iterator[AnyRef] = { val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) - // if metadata block is not contained val metadataBlockOpt = SparkEnv.get.blockManager.get[InternalRow](metadataBlockId) if (metadataBlockOpt.isDefined) { val metadataBlock = metadataBlockOpt.get new InterruptibleIterator[AnyRef](context, new Iterator[AnyRef] { - var fetchingFirstElement = true + private var fetchingFirstElement = true - var delegate: Iterator[CachedBatch] = _ + private var delegate: Iterator[CachedBatch] = _ override def hasNext: Boolean = { if (fetchingFirstElement) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 16082ae4e4446..871d7f13f24c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -59,7 +59,7 @@ private[columnar] class CachedPartitionIterator( useCompression: Boolean, batchStats: LongAccumulator) extends Iterator[CachedBatch] { - private def buildCachedBatch(): CachedBatch = { + def next(): CachedBatch = { val columnBuilders = output.map { attribute => ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) }.toArray @@ -97,10 +97,6 @@ private[columnar] class CachedPartitionIterator( JavaUtils.bufferToArray(builder.build())}, stats) } - def next(): CachedBatch = { - buildCachedBatch() - } - def hasNext: Boolean = rowIterator.hasNext } @@ -111,7 +107,7 @@ private[columnar] class CachedBatchIterator( useCompression: Boolean, batchStats: LongAccumulator) extends Iterator[CachedBatch] { - def next(): CachedBatch = { + def next(): CachedBatch = { val columnBuilders = output.map { attribute => ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) }.toArray @@ -203,7 +199,8 @@ case class InMemoryRelation( } } - val cached = new CachedColumnarRDD(batchedRDD.sparkContext, batchedRDD).persist(storageLevel) + val cached = new CachedColumnarRDD(batchedRDD.sparkContext, batchedRDD, + usePartitionLevelMetadata).persist(storageLevel) cached.setName( tableName.map(n => s"In-memory table $n") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index dbaa5ab758b24..2b5851f53585d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -181,7 +181,7 @@ case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning - private def filterCachedBatchesInternal( + private def doFilterCachedBatches( cachedBatchIterator: Iterator[CachedBatch], partitionStatsSchema: Seq[AttributeReference], partitionFilter: GenPredicate): Iterator[CachedBatch] = { @@ -223,11 +223,12 @@ case class InMemoryTableScanExec( if (!partitionFilter.eval(partitionStats)) { Iterator[CachedBatch]() } else { - filterCachedBatchesInternal(iterForPartitionCase.map(_.asInstanceOf[CachedBatch]), + doFilterCachedBatches(iterForPartitionCase.map(_.asInstanceOf[CachedBatch]), schema, partitionFilter) } case _: CachedBatch => - iterForDefault.map(_.asInstanceOf[CachedBatch]) + doFilterCachedBatches(iterForDefault.map(_.asInstanceOf[CachedBatch]), schema, + partitionFilter) } } } From 97a63d6d1c1cd81bb97f0b998e716b13f5bd92d9 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 10:21:23 -0700 Subject: [PATCH 13/36] fix storage level --- .../sql/execution/columnar/CachedColumnarRDD.scala | 10 ++++++---- .../sql/execution/columnar/InMemoryRelation.scala | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index cce25d1beed17..bb9ec494a7fad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -20,12 +20,13 @@ package org.apache.spark.sql.execution.columnar import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.storage.{RDDBlockId, RDDPartitionMetadataBlockId} +import org.apache.spark.storage.{RDDBlockId, RDDPartitionMetadataBlockId, StorageLevel} class CachedColumnarRDD( @transient private var _sc: SparkContext, private var dataRDD: RDD[CachedBatch], - containsPartitionMetadata: Boolean) + containsPartitionMetadata: Boolean, + expectedStorageLevel: StorageLevel) extends RDD[AnyRef](_sc, Seq(new OneToOneDependency(dataRDD))) { override def compute(split: Partition, context: TaskContext): Iterator[AnyRef] = { @@ -36,7 +37,7 @@ class CachedColumnarRDD( } else { val cachedBatch = parentIterator.next() SparkEnv.get.blockManager.putSingle(RDDPartitionMetadataBlockId(id, split.index), - cachedBatch.stats, dataRDD.getStorageLevel) + cachedBatch.stats, expectedStorageLevel) Iterator(cachedBatch) } } else { @@ -51,7 +52,8 @@ class CachedColumnarRDD( // metadata block can be evicted by BlockManagers but we may still keep CachedBatch in memory // so that we still need to try to fetch it from Cache val blockId = RDDBlockId(id, partition.index) - SparkEnv.get.blockManager.getOrElseUpdate(blockId, getStorageLevel, elementClassTag, () => { + SparkEnv.get.blockManager.getOrElseUpdate(blockId, expectedStorageLevel, elementClassTag, + () => { computeOrReadCheckpoint(partition, context) }) match { case Left(blockResult) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 871d7f13f24c2..973117613ac2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -200,7 +200,7 @@ case class InMemoryRelation( } val cached = new CachedColumnarRDD(batchedRDD.sparkContext, batchedRDD, - usePartitionLevelMetadata).persist(storageLevel) + usePartitionLevelMetadata, storageLevel).persist(storageLevel) cached.setName( tableName.map(n => s"In-memory table $n") From a24b7bbfa6c393974d553ba703777934bed85ec1 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 10:42:25 -0700 Subject: [PATCH 14/36] fix getOrCompute --- .../columnar/CachedColumnarRDD.scala | 31 +++---------------- 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index bb9ec494a7fad..b63cc2e1d8ac0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -47,47 +47,24 @@ class CachedColumnarRDD( override protected def getPartitions: Array[Partition] = dataRDD.partitions - private def fetchOrComputeCachedBatch(partition: Partition, context: TaskContext): - Iterator[CachedBatch] = { - // metadata block can be evicted by BlockManagers but we may still keep CachedBatch in memory - // so that we still need to try to fetch it from Cache - val blockId = RDDBlockId(id, partition.index) - SparkEnv.get.blockManager.getOrElseUpdate(blockId, expectedStorageLevel, elementClassTag, - () => { - computeOrReadCheckpoint(partition, context) - }) match { - case Left(blockResult) => - val existingMetrics = context.taskMetrics().inputMetrics - existingMetrics.incBytesRead(blockResult.bytes) - new InterruptibleIterator[CachedBatch](context, - blockResult.data.asInstanceOf[Iterator[CachedBatch]]) { - override def next(): CachedBatch = { - existingMetrics.incRecordsRead(1) - delegate.next() - } - } - case Right(iter) => - new InterruptibleIterator(context, iter.asInstanceOf[Iterator[CachedBatch]]) - } - } - override private[spark] def getOrCompute(split: Partition, context: TaskContext): Iterator[AnyRef] = { val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) val metadataBlockOpt = SparkEnv.get.blockManager.get[InternalRow](metadataBlockId) + val superGetOrCompute: (Partition, TaskContext) => Iterator[AnyRef] = super.getOrCompute if (metadataBlockOpt.isDefined) { val metadataBlock = metadataBlockOpt.get new InterruptibleIterator[AnyRef](context, new Iterator[AnyRef] { private var fetchingFirstElement = true - private var delegate: Iterator[CachedBatch] = _ + private var delegate: Iterator[AnyRef] = _ override def hasNext: Boolean = { if (fetchingFirstElement) { true } else { - delegate = fetchOrComputeCachedBatch(split, context) + delegate = superGetOrCompute(split, context) delegate.hasNext } } @@ -103,7 +80,7 @@ class CachedColumnarRDD( } }) } else { - fetchOrComputeCachedBatch(split, context) + superGetOrCompute(split, context) } } From 0e8e6395df97b4adeabb5a32ad441b96e5c19eb9 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 10:55:36 -0700 Subject: [PATCH 15/36] evaluate with partition metadata --- .../apache/spark/sql/execution/columnar/CachedColumnarRDD.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index b63cc2e1d8ac0..0d01be24d1cec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -31,6 +31,7 @@ class CachedColumnarRDD( override def compute(split: Partition, context: TaskContext): Iterator[AnyRef] = { if (containsPartitionMetadata) { + println("evaluate with partition metadata") val parentIterator = dataRDD.iterator(split, context) if (!parentIterator.hasNext) { Iterator() From b89d58b26650ce9b63713a8a2371e280986720bb Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 11:21:19 -0700 Subject: [PATCH 16/36] fix getOrCompute --- .../execution/columnar/CachedColumnarRDD.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 0d01be24d1cec..706edcaf4d4f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -31,7 +31,6 @@ class CachedColumnarRDD( override def compute(split: Partition, context: TaskContext): Iterator[AnyRef] = { if (containsPartitionMetadata) { - println("evaluate with partition metadata") val parentIterator = dataRDD.iterator(split, context) if (!parentIterator.hasNext) { Iterator() @@ -51,10 +50,8 @@ class CachedColumnarRDD( override private[spark] def getOrCompute(split: Partition, context: TaskContext): Iterator[AnyRef] = { val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) - val metadataBlockOpt = SparkEnv.get.blockManager.get[InternalRow](metadataBlockId) val superGetOrCompute: (Partition, TaskContext) => Iterator[AnyRef] = super.getOrCompute - if (metadataBlockOpt.isDefined) { - val metadataBlock = metadataBlockOpt.get + SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(metadataBlock => new InterruptibleIterator[AnyRef](context, new Iterator[AnyRef] { private var fetchingFirstElement = true @@ -65,7 +62,9 @@ class CachedColumnarRDD( if (fetchingFirstElement) { true } else { - delegate = superGetOrCompute(split, context) + if (delegate == null) { + delegate = superGetOrCompute(split, context) + } delegate.hasNext } } @@ -73,16 +72,13 @@ class CachedColumnarRDD( override def next(): AnyRef = { if (fetchingFirstElement) { fetchingFirstElement = false - val mb = metadataBlock.data.next() + val mb = metadataBlock mb.asInstanceOf[InternalRow] } else { delegate.next() } } }) - } else { - superGetOrCompute(split, context) - } - + ).getOrElse(superGetOrCompute(split, context)) } } From 3f2eae73cefd5676c799a6fd1e384556ee6c33a8 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 12:06:49 -0700 Subject: [PATCH 17/36] add logging --- .../apache/spark/sql/execution/columnar/CachedColumnarRDD.scala | 2 ++ .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 1 + 2 files changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 706edcaf4d4f3..88669af40ec6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -29,6 +29,8 @@ class CachedColumnarRDD( expectedStorageLevel: StorageLevel) extends RDD[AnyRef](_sc, Seq(new OneToOneDependency(dataRDD))) { + logInfo(s"Created CachedColumnarRDD with ${partitions.length} partitions") + override def compute(split: Partition, context: TaskContext): Iterator[AnyRef] = { if (containsPartitionMetadata) { val parentIterator = dataRDD.iterator(split, context) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 2b5851f53585d..2412df3d5d327 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -223,6 +223,7 @@ case class InMemoryTableScanExec( if (!partitionFilter.eval(partitionStats)) { Iterator[CachedBatch]() } else { + logInfo(s"accept partition $index based on the stats") doFilterCachedBatches(iterForPartitionCase.map(_.asInstanceOf[CachedBatch]), schema, partitionFilter) } From 507c1a22da7de8b5bd24c066ec61a1c5f1c604dd Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 12:12:57 -0700 Subject: [PATCH 18/36] add logging for skipped partition --- .../apache/spark/sql/execution/columnar/CachedColumnarRDD.scala | 2 +- .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 88669af40ec6d..66722978a954e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -29,7 +29,7 @@ class CachedColumnarRDD( expectedStorageLevel: StorageLevel) extends RDD[AnyRef](_sc, Seq(new OneToOneDependency(dataRDD))) { - logInfo(s"Created CachedColumnarRDD with ${partitions.length} partitions") + logInfo(s"Created CachedColumnarRDD $id with ${partitions.length} partitions") override def compute(split: Partition, context: TaskContext): Iterator[AnyRef] = { if (containsPartitionMetadata) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 2412df3d5d327..31e7acf06b3cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -221,6 +221,7 @@ case class InMemoryTableScanExec( iterForPartitionCase.next() match { case partitionStats: InternalRow => if (!partitionFilter.eval(partitionStats)) { + logInfo(s"skip partition $index based on the stats") Iterator[CachedBatch]() } else { logInfo(s"accept partition $index based on the stats") From 40d441cc6985aa995e5c7fe1387744381425e6ef Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 13:15:29 -0700 Subject: [PATCH 19/36] try to print stats --- .../spark/sql/execution/columnar/InMemoryRelation.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 973117613ac2b..49fb503b91ba6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -90,8 +90,13 @@ private[columnar] class CachedPartitionIterator( } batchStats.add(totalSize) - val stats = InternalRow.fromSeq( - columnBuilders.flatMap(_.columnStats.collectedStatistics)) + val statsInSeq = columnBuilders.flatMap(_.columnStats.collectedStatistics) + + // scalastyle:off + println(s"stats ${statsInSeq.toSeq}") + // scalastyle:on + + val stats = InternalRow.fromSeq(statsInSeq) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build())}, stats) From 520e5aab3a06b9c1599af3984d4d2d028ced2ad6 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 13:20:43 -0700 Subject: [PATCH 20/36] add logging for skipped partition --- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 49fb503b91ba6..958ebe5aaa8e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -93,7 +93,7 @@ private[columnar] class CachedPartitionIterator( val statsInSeq = columnBuilders.flatMap(_.columnStats.collectedStatistics) // scalastyle:off - println(s"stats ${statsInSeq.toSeq}") + println(s"generate stats ${statsInSeq.toSeq}") // scalastyle:on val stats = InternalRow.fromSeq(statsInSeq) From 885808fc0c0f78d0ce7bbb5a5d06222bb06bf2cb Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 13:25:18 -0700 Subject: [PATCH 21/36] add logging for skipped partition --- .../spark/sql/execution/columnar/InMemoryRelation.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 958ebe5aaa8e7..f17de3240b049 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -146,8 +146,13 @@ private[columnar] class CachedBatchIterator( batchStats.add(totalSize) - val stats = InternalRow.fromSeq( - columnBuilders.flatMap(_.columnStats.collectedStatistics)) + val statsInSeq = columnBuilders.flatMap(_.columnStats.collectedStatistics) + + val stats = InternalRow.fromSeq(statsInSeq) + + // scalastyle:off + println(s"generate stats ${statsInSeq.toSeq}") + // scalastyle:on CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) From 37b5971524c4bb7f42b23a2ea013564ce3f6015c Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Nov 2017 14:14:57 -0700 Subject: [PATCH 22/36] add logging for skipped partition --- .../spark/sql/execution/columnar/InMemoryRelation.scala | 4 ---- .../sql/execution/columnar/InMemoryTableScanExec.scala | 6 ++++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index f17de3240b049..e5be062fbd891 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -150,10 +150,6 @@ private[columnar] class CachedBatchIterator( val stats = InternalRow.fromSeq(statsInSeq) - // scalastyle:off - println(s"generate stats ${statsInSeq.toSeq}") - // scalastyle:on - CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 31e7acf06b3cd..9533f630ebdf2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -220,14 +220,16 @@ case class InMemoryTableScanExec( } else { iterForPartitionCase.next() match { case partitionStats: InternalRow => + // scalastyle:off if (!partitionFilter.eval(partitionStats)) { - logInfo(s"skip partition $index based on the stats") + println(s"skip partition $index based on the stats") Iterator[CachedBatch]() } else { - logInfo(s"accept partition $index based on the stats") + println(s"accept partition $index based on the stats") doFilterCachedBatches(iterForPartitionCase.map(_.asInstanceOf[CachedBatch]), schema, partitionFilter) } + // scalastyle:on case _: CachedBatch => doFilterCachedBatches(iterForDefault.map(_.asInstanceOf[CachedBatch]), schema, partitionFilter) From 4dbfe37d6d719e7755bb6c9160262201908d36f4 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 13:23:33 -0800 Subject: [PATCH 23/36] refactor the code --- .../CachedColumnarPartitionIterator.scala | 27 ++++++++++++ .../columnar/CachedColumnarRDD.scala | 43 +++++-------------- .../execution/columnar/InMemoryRelation.scala | 6 +-- .../columnar/InMemoryTableScanExec.scala | 33 ++++++-------- 4 files changed, 54 insertions(+), 55 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarPartitionIterator.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarPartitionIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarPartitionIterator.scala new file mode 100644 index 0000000000000..23c37c2e58fdd --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarPartitionIterator.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.{InterruptibleIterator, Partition, TaskContext} +import org.apache.spark.sql.catalyst.InternalRow + +private[columnar] class CachedColumnarPartitionIterator( + val metadataBlock: InternalRow, + context: TaskContext, + delegate: Iterator[CachedBatch]) + extends InterruptibleIterator[CachedBatch](context, delegate) {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 66722978a954e..348006ee50d79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -20,16 +20,14 @@ package org.apache.spark.sql.execution.columnar import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.storage.{RDDBlockId, RDDPartitionMetadataBlockId, StorageLevel} +import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel} class CachedColumnarRDD( @transient private var _sc: SparkContext, private var dataRDD: RDD[CachedBatch], containsPartitionMetadata: Boolean, expectedStorageLevel: StorageLevel) - extends RDD[AnyRef](_sc, Seq(new OneToOneDependency(dataRDD))) { - - logInfo(s"Created CachedColumnarRDD $id with ${partitions.length} partitions") + extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) { override def compute(split: Partition, context: TaskContext): Iterator[AnyRef] = { if (containsPartitionMetadata) { @@ -52,35 +50,16 @@ class CachedColumnarRDD( override private[spark] def getOrCompute(split: Partition, context: TaskContext): Iterator[AnyRef] = { val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) - val superGetOrCompute: (Partition, TaskContext) => Iterator[AnyRef] = super.getOrCompute + val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(metadataBlock => - new InterruptibleIterator[AnyRef](context, new Iterator[AnyRef] { - - private var fetchingFirstElement = true - - private var delegate: Iterator[AnyRef] = _ - - override def hasNext: Boolean = { - if (fetchingFirstElement) { - true - } else { - if (delegate == null) { - delegate = superGetOrCompute(split, context) - } - delegate.hasNext - } - } - - override def next(): AnyRef = { - if (fetchingFirstElement) { - fetchingFirstElement = false - val mb = metadataBlock - mb.asInstanceOf[InternalRow] - } else { - delegate.next() - } - } - }) + new CachedColumnarPartitionIterator(metadataBlock, context, superGetOrCompute(split, context)) ).getOrElse(superGetOrCompute(split, context)) } } + +private[columnar] class CachedColumnarPartitionIterator( + val metadataBlock: InternalRow, + split: Partition, + context: TaskContext, + delegate: Iterator[CachedBatch]) + extends InterruptibleIterator[CachedBatch](context, delegate) {} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index e5be062fbd891..74977cbe931ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -165,7 +165,7 @@ case class InMemoryRelation( storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( - @transient var _cachedColumnBuffers: RDD[AnyRef] = null, + @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) extends logical.LeafNode with MultiInstanceRelation { @@ -196,7 +196,7 @@ case class InMemoryRelation( private def buildBuffers(): Unit = { val output = child.output - // TODO: + // TODO: need better abstraction for two iterators here val batchedRDD = child.execute().mapPartitionsInternal { rowIterator => if (!usePartitionLevelMetadata) { new CachedBatchIterator(rowIterator, output, batchSize, useCompression, batchStats) @@ -232,7 +232,7 @@ case class InMemoryRelation( batchStats).asInstanceOf[this.type] } - def cachedColumnBuffers: RDD[AnyRef] = _cachedColumnBuffers + def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, batchStats) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 9533f630ebdf2..7d3bf46d46bb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.columnar +import org.apache.spark.SparkEnv import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -26,7 +27,9 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.storage.RDDPartitionMetadataBlockId case class InMemoryTableScanExec( @@ -214,26 +217,16 @@ case class InMemoryTableScanExec( partitionFilters.reduceOption(And).getOrElse(Literal(true)), schema) partitionFilter.initialize(index) - val (iterForPartitionCase, iterForDefault) = cachedBatchIterator.duplicate - if (!iterForDefault.hasNext) { - Iterator[CachedBatch]() - } else { - iterForPartitionCase.next() match { - case partitionStats: InternalRow => - // scalastyle:off - if (!partitionFilter.eval(partitionStats)) { - println(s"skip partition $index based on the stats") - Iterator[CachedBatch]() - } else { - println(s"accept partition $index based on the stats") - doFilterCachedBatches(iterForPartitionCase.map(_.asInstanceOf[CachedBatch]), - schema, partitionFilter) - } - // scalastyle:on - case _: CachedBatch => - doFilterCachedBatches(iterForDefault.map(_.asInstanceOf[CachedBatch]), schema, - partitionFilter) - } + + cachedBatchIterator match { + case cachedIter: CachedColumnarPartitionIterator + if !partitionFilter.eval(cachedIter.metadataBlock) => + // scalastyle:off + println(s"skipped partition $index") + // scalastyle:on + Iterator[CachedBatch]() + case _ => + cachedBatchIterator } } } From 6165838371bbe345f6bc9d462671ffb3f32cbb94 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 13:30:57 -0800 Subject: [PATCH 24/36] fix compilation issue --- .../CachedColumnarPartitionIterator.scala | 27 ------------------- .../columnar/CachedColumnarRDD.scala | 3 +-- 2 files changed, 1 insertion(+), 29 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarPartitionIterator.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarPartitionIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarPartitionIterator.scala deleted file mode 100644 index 23c37c2e58fdd..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarPartitionIterator.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.columnar - -import org.apache.spark.{InterruptibleIterator, Partition, TaskContext} -import org.apache.spark.sql.catalyst.InternalRow - -private[columnar] class CachedColumnarPartitionIterator( - val metadataBlock: InternalRow, - context: TaskContext, - delegate: Iterator[CachedBatch]) - extends InterruptibleIterator[CachedBatch](context, delegate) {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 348006ee50d79..5f5b06fc07f55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -59,7 +59,6 @@ class CachedColumnarRDD( private[columnar] class CachedColumnarPartitionIterator( val metadataBlock: InternalRow, - split: Partition, context: TaskContext, delegate: Iterator[CachedBatch]) - extends InterruptibleIterator[CachedBatch](context, delegate) {} \ No newline at end of file + extends InterruptibleIterator[CachedBatch](context, delegate) {} From 05f226772e26ab81b764632a5d979eca4e4deffd Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 13:33:51 -0800 Subject: [PATCH 25/36] refactor the code --- .../spark/sql/execution/columnar/CachedColumnarRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 5f5b06fc07f55..1c531ad674015 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -29,7 +29,7 @@ class CachedColumnarRDD( expectedStorageLevel: StorageLevel) extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) { - override def compute(split: Partition, context: TaskContext): Iterator[AnyRef] = { + override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { if (containsPartitionMetadata) { val parentIterator = dataRDD.iterator(split, context) if (!parentIterator.hasNext) { @@ -48,7 +48,7 @@ class CachedColumnarRDD( override protected def getPartitions: Array[Partition] = dataRDD.partitions override private[spark] def getOrCompute(split: Partition, context: TaskContext): - Iterator[AnyRef] = { + Iterator[CachedBatch] = { val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(metadataBlock => From bcafe822a879bf48982f6d9f3255cbc7a5e8236c Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 13:49:14 -0800 Subject: [PATCH 26/36] test --- .../sql/execution/columnar/CachedColumnarRDD.scala | 11 +++++++++-- .../execution/columnar/InMemoryTableScanExec.scala | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 1c531ad674015..196818b75501b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -58,7 +58,14 @@ class CachedColumnarRDD( } private[columnar] class CachedColumnarPartitionIterator( - val metadataBlock: InternalRow, + val partitionStats: InternalRow, context: TaskContext, delegate: Iterator[CachedBatch]) - extends InterruptibleIterator[CachedBatch](context, delegate) {} + extends InterruptibleIterator[CachedBatch](context, delegate) { + override def next(): CachedBatch = {\ + // scalastyle:off + println("next") + // scalastyle:on + super.next() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 7d3bf46d46bb8..191d3287a5022 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -220,7 +220,7 @@ case class InMemoryTableScanExec( cachedBatchIterator match { case cachedIter: CachedColumnarPartitionIterator - if !partitionFilter.eval(cachedIter.metadataBlock) => + if !partitionFilter.eval(cachedIter.partitionStats) => // scalastyle:off println(s"skipped partition $index") // scalastyle:on From 5b888d303a2b68fd36c293286e8bf6e67c06802c Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 13:50:11 -0800 Subject: [PATCH 27/36] fix compilation issue --- .../apache/spark/sql/execution/columnar/CachedColumnarRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 196818b75501b..ce6d5aa31f74e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -62,7 +62,7 @@ private[columnar] class CachedColumnarPartitionIterator( context: TaskContext, delegate: Iterator[CachedBatch]) extends InterruptibleIterator[CachedBatch](context, delegate) { - override def next(): CachedBatch = {\ + override def next(): CachedBatch = { // scalastyle:off println("next") // scalastyle:on From 977b93f4b67ff5e57f1d0081b5aead70a90ac13f Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 14:11:04 -0800 Subject: [PATCH 28/36] add missing filtering --- .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 191d3287a5022..3182f33e62efc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -224,9 +224,9 @@ case class InMemoryTableScanExec( // scalastyle:off println(s"skipped partition $index") // scalastyle:on - Iterator[CachedBatch]() + Iterator() case _ => - cachedBatchIterator + doFilterCachedBatches(cachedBatchIterator, schema, partitionFilter) } } } From 9c9bcadf6f7f2ae90393f1ab26db595a0a836c99 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 14:16:43 -0800 Subject: [PATCH 29/36] test --- .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 3182f33e62efc..baa0a24f9923d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -218,6 +218,10 @@ case class InMemoryTableScanExec( schema) partitionFilter.initialize(index) + if (cachedBatchIterator.isInstanceOf[CachedColumnarPartitionIterator]) { + cachedBatchIterator.asInstanceOf[CachedColumnarPartitionIterator].partitionStats + } + cachedBatchIterator match { case cachedIter: CachedColumnarPartitionIterator if !partitionFilter.eval(cachedIter.partitionStats) => From 56a430722fc7b76b3f2365eee4d94e6ab62380a6 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 14:22:23 -0800 Subject: [PATCH 30/36] test --- .../execution/columnar/InMemoryTableScanExec.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index baa0a24f9923d..bf909dd5f068b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -219,9 +219,20 @@ case class InMemoryTableScanExec( partitionFilter.initialize(index) if (cachedBatchIterator.isInstanceOf[CachedColumnarPartitionIterator]) { - cachedBatchIterator.asInstanceOf[CachedColumnarPartitionIterator].partitionStats + val cachedIter = cachedBatchIterator.asInstanceOf[CachedColumnarPartitionIterator] + if (!partitionFilter.eval(cachedIter.partitionStats)) { + // scalastyle:off + println(s"skipped partition $index") + // scalastyle:on + Iterator() + } else { + doFilterCachedBatches(cachedBatchIterator, schema, partitionFilter) + } + } else { + doFilterCachedBatches(cachedBatchIterator, schema, partitionFilter) } + /* cachedBatchIterator match { case cachedIter: CachedColumnarPartitionIterator if !partitionFilter.eval(cachedIter.partitionStats) => @@ -232,6 +243,7 @@ case class InMemoryTableScanExec( case _ => doFilterCachedBatches(cachedBatchIterator, schema, partitionFilter) } + */ } } From 7936033c54ce67ea3fe3de0737d63d06abba4478 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 15:02:03 -0800 Subject: [PATCH 31/36] fix rebundant read --- .../columnar/CachedColumnarRDD.scala | 23 +++++++++++++------ .../columnar/InMemoryTableScanExec.scala | 20 ++-------------- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index ce6d5aa31f74e..022eaecd86ea7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -52,20 +52,29 @@ class CachedColumnarRDD( val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(metadataBlock => - new CachedColumnarPartitionIterator(metadataBlock, context, superGetOrCompute(split, context)) + new InterruptibleIterator[CachedBatch](context, + new CachedColumnarPartitionIterator(metadataBlock, split, context, superGetOrCompute)) ).getOrElse(superGetOrCompute(split, context)) } } private[columnar] class CachedColumnarPartitionIterator( val partitionStats: InternalRow, + partition: Partition, context: TaskContext, - delegate: Iterator[CachedBatch]) - extends InterruptibleIterator[CachedBatch](context, delegate) { + fetchRDDPartition: (Partition, TaskContext) => Iterator[CachedBatch]) + extends Iterator[CachedBatch] { + + private var delegate: Iterator[CachedBatch] = _ + + override def hasNext: Boolean = { + if (delegate == null) { + delegate = fetchRDDPartition(partition, context) + } + delegate.hasNext + } + override def next(): CachedBatch = { - // scalastyle:off - println("next") - // scalastyle:on - super.next() + delegate.next() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index bf909dd5f068b..1b3ef5ca8f2ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.columnar -import org.apache.spark.SparkEnv +import org.apache.spark.{InterruptibleIterator, SparkEnv} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -218,22 +218,7 @@ case class InMemoryTableScanExec( schema) partitionFilter.initialize(index) - if (cachedBatchIterator.isInstanceOf[CachedColumnarPartitionIterator]) { - val cachedIter = cachedBatchIterator.asInstanceOf[CachedColumnarPartitionIterator] - if (!partitionFilter.eval(cachedIter.partitionStats)) { - // scalastyle:off - println(s"skipped partition $index") - // scalastyle:on - Iterator() - } else { - doFilterCachedBatches(cachedBatchIterator, schema, partitionFilter) - } - } else { - doFilterCachedBatches(cachedBatchIterator, schema, partitionFilter) - } - - /* - cachedBatchIterator match { + cachedBatchIterator.asInstanceOf[InterruptibleIterator[_]].delegate match { case cachedIter: CachedColumnarPartitionIterator if !partitionFilter.eval(cachedIter.partitionStats) => // scalastyle:off @@ -243,7 +228,6 @@ case class InMemoryTableScanExec( case _ => doFilterCachedBatches(cachedBatchIterator, schema, partitionFilter) } - */ } } From 3b6bfa23384f2ade2842e6beb29b7bc9b5e7ea1e Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 16:32:03 -0800 Subject: [PATCH 32/36] compact iterators --- .../execution/columnar/InMemoryRelation.scala | 85 +++++-------------- 1 file changed, 19 insertions(+), 66 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 74977cbe931ff..8570ecb1907b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -52,12 +52,13 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -private[columnar] class CachedPartitionIterator( +private[columnar] class CachedBatchIterator( rowIterator: Iterator[InternalRow], output: Seq[Attribute], batchSize: Int, useCompression: Boolean, - batchStats: LongAccumulator) extends Iterator[CachedBatch] { + batchStats: LongAccumulator, + singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] { def next(): CachedBatch = { val columnBuilders = output.map { attribute => @@ -66,7 +67,17 @@ private[columnar] class CachedPartitionIterator( var rowCount = 0 var totalSize = 0L - while (rowIterator.hasNext) { + + val terminateLoop = (singleBatch: Boolean, rowIter: Iterator[InternalRow], + rowCount: Int, size: Long) => { + if (!singleBatch) { + rowIter.hasNext && rowCount < batchSize && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE + } else { + rowIter.hasNext + } + } + + while (terminateLoop(singleBatchPerPartition, rowIterator, rowCount, totalSize)) { val row = rowIterator.next() // Added for SPARK-6082. This assertion can be useful for scenarios when something @@ -88,76 +99,21 @@ private[columnar] class CachedPartitionIterator( } rowCount += 1 } + batchStats.add(totalSize) val statsInSeq = columnBuilders.flatMap(_.columnStats.collectedStatistics) - // scalastyle:off - println(s"generate stats ${statsInSeq.toSeq}") - // scalastyle:on - val stats = InternalRow.fromSeq(statsInSeq) CachedBatch(rowCount, columnBuilders.map { builder => - JavaUtils.bufferToArray(builder.build())}, stats) + JavaUtils.bufferToArray(builder.build()) + }, stats) } def hasNext: Boolean = rowIterator.hasNext } -private[columnar] class CachedBatchIterator( - rowIterator: Iterator[InternalRow], - output: Seq[Attribute], - batchSize: Int, - useCompression: Boolean, - batchStats: LongAccumulator) extends Iterator[CachedBatch] { - - def next(): CachedBatch = { - val columnBuilders = output.map { attribute => - ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) - }.toArray - - var rowCount = 0 - var totalSize = 0L - - while (rowIterator.hasNext && rowCount < batchSize - && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { - val row = rowIterator.next() - - // Added for SPARK-6082. This assertion can be useful for scenarios when something - // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM - // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat - // hard to decipher. - assert( - row.numFields == columnBuilders.length, - s"Row column number mismatch, expected ${output.size} columns, " + - s"but got ${row.numFields}." + - s"\nRow content: $row") - - var i = 0 - totalSize = 0 - while (i < row.numFields) { - columnBuilders(i).appendFrom(row, i) - totalSize += columnBuilders(i).columnStats.sizeInBytes - i += 1 - } - rowCount += 1 - } - - batchStats.add(totalSize) - - val statsInSeq = columnBuilders.flatMap(_.columnStats.collectedStatistics) - - val stats = InternalRow.fromSeq(statsInSeq) - - CachedBatch(rowCount, columnBuilders.map { builder => - JavaUtils.bufferToArray(builder.build()) - }, stats) - } - - def hasNext: Boolean = rowIterator.hasNext -} - case class InMemoryRelation( output: Seq[Attribute], useCompression: Boolean, @@ -198,11 +154,8 @@ case class InMemoryRelation( // TODO: need better abstraction for two iterators here val batchedRDD = child.execute().mapPartitionsInternal { rowIterator => - if (!usePartitionLevelMetadata) { - new CachedBatchIterator(rowIterator, output, batchSize, useCompression, batchStats) - } else { - new CachedPartitionIterator(rowIterator, output, batchSize, useCompression, batchStats) - } + new CachedBatchIterator(rowIterator, output, batchSize, useCompression, batchStats, + usePartitionLevelMetadata) } val cached = new CachedColumnarRDD(batchedRDD.sparkContext, batchedRDD, From 963ca0a92528ab9548955d23f7cc227b7b8347fa Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Nov 2017 16:39:00 -0800 Subject: [PATCH 33/36] update --- .../spark/sql/execution/columnar/CachedColumnarRDD.scala | 4 ++-- .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 022eaecd86ea7..08ecbcd26109a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -53,12 +53,12 @@ class CachedColumnarRDD( val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(metadataBlock => new InterruptibleIterator[CachedBatch](context, - new CachedColumnarPartitionIterator(metadataBlock, split, context, superGetOrCompute)) + new CachedColumnarIterator(metadataBlock, split, context, superGetOrCompute)) ).getOrElse(superGetOrCompute(split, context)) } } -private[columnar] class CachedColumnarPartitionIterator( +private[columnar] class CachedColumnarIterator( val partitionStats: InternalRow, partition: Partition, context: TaskContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 1b3ef5ca8f2ac..b72fe15602c93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -219,7 +219,7 @@ case class InMemoryTableScanExec( partitionFilter.initialize(index) cachedBatchIterator.asInstanceOf[InterruptibleIterator[_]].delegate match { - case cachedIter: CachedColumnarPartitionIterator + case cachedIter: CachedColumnarIterator if !partitionFilter.eval(cachedIter.partitionStats) => // scalastyle:off println(s"skipped partition $index") From d4f12b18c4f33d2854c3585eccebc8cf87b627bf Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 10 Nov 2017 08:44:13 -0800 Subject: [PATCH 34/36] add first test case fix compilation of tests fix tests revise the test fix test revise the test add missing file revise the test revise the test revise the test revise the test revise the test revise the test revise the test revise the test --- .../execution/columnar/InMemoryRelation.scala | 1 - .../columnar/InMemoryTableScanExec.scala | 2 +- .../columnar/InMemoryColumnarQuerySuite.scala | 30 +++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 8570ecb1907b7..4b5323ec87c76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -152,7 +152,6 @@ case class InMemoryRelation( private def buildBuffers(): Unit = { val output = child.output - // TODO: need better abstraction for two iterators here val batchedRDD = child.execute().mapPartitionsInternal { rowIterator => new CachedBatchIterator(rowIterator, output, batchSize, useCompression, batchStats, usePartitionLevelMetadata) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index b72fe15602c93..dde61edbceef4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -223,10 +223,10 @@ case class InMemoryTableScanExec( if !partitionFilter.eval(cachedIter.partitionStats) => // scalastyle:off println(s"skipped partition $index") - // scalastyle:on Iterator() case _ => doFilterCachedBatches(cachedBatchIterator, schema, partitionFilter) + // scalastyle:on } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index e662e294228db..7e7692dfa3044 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.columnar import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning @@ -479,4 +480,33 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("table cache can prune unnecessary partitions correctly") { + // scalastyle:off + var bytesReadWithoutPruning = 0L + var bytesReadWithPruning = 0L + var inMemoryPartitionMetadata = false + sparkContext.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val metrics = taskEnd.taskMetrics + if (inMemoryPartitionMetadata) { + bytesReadWithPruning += metrics.inputMetrics.bytesRead + } else { + bytesReadWithoutPruning += metrics.inputMetrics.bytesRead + } + } + }) + Seq("true", "false").foreach { enabled => + withSQLConf(SQLConf.IN_MEMORY_PARTITION_METADATA.key -> enabled) { + inMemoryPartitionMetadata = conf.inMemoryPartitionMetadata + val df1 = (0 until 1000000).toDF("value").repartition(4).cache() + df1.where("value >= 999999").collect() + val resultArr = df1.where("value >= 999999").collect() + assert(resultArr.length == 1) + assert(resultArr.head.getInt(0) == 999999) + df1.unpersist(true) + } + } + assert(bytesReadWithoutPruning > bytesReadWithPruning * 3) + } } From 46d68dba9794c8e721814894ac04aedbaf72a4d3 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 10 Nov 2017 11:20:18 -0800 Subject: [PATCH 35/36] test for remove metadata block test for remove metadata block fix the test fix the test fix the test --- .../columnar/CachedColumnarRDD.scala | 26 +++++---- .../columnar/InMemoryColumnarQuerySuite.scala | 53 +++++++++++++++++-- 2 files changed, 60 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala index 08ecbcd26109a..2cd26f72ae0f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala @@ -30,19 +30,7 @@ class CachedColumnarRDD( extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) { override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { - if (containsPartitionMetadata) { - val parentIterator = dataRDD.iterator(split, context) - if (!parentIterator.hasNext) { - Iterator() - } else { - val cachedBatch = parentIterator.next() - SparkEnv.get.blockManager.putSingle(RDDPartitionMetadataBlockId(id, split.index), - cachedBatch.stats, expectedStorageLevel) - Iterator(cachedBatch) - } - } else { - firstParent.iterator(split, context) - } + firstParent.iterator(split, context) } override protected def getPartitions: Array[Partition] = dataRDD.partitions @@ -54,7 +42,17 @@ class CachedColumnarRDD( SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(metadataBlock => new InterruptibleIterator[CachedBatch](context, new CachedColumnarIterator(metadataBlock, split, context, superGetOrCompute)) - ).getOrElse(superGetOrCompute(split, context)) + ).getOrElse { + val batchIter = superGetOrCompute(split, context) + if (containsPartitionMetadata && getStorageLevel != StorageLevel.NONE && batchIter.hasNext) { + val cachedBatch = batchIter.next() + SparkEnv.get.blockManager.putSingle(metadataBlockId, cachedBatch.stats, + expectedStorageLevel) + new InterruptibleIterator[CachedBatch](context, Iterator(cachedBatch)) + } else { + batchIter + } + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 7e7692dfa3044..5e4966a9f5156 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.columnar import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} +import org.apache.spark.SparkEnv import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In} @@ -30,6 +31,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ +import org.apache.spark.storage.RDDPartitionMetadataBlockId import org.apache.spark.storage.StorageLevel._ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { @@ -482,7 +484,6 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } test("table cache can prune unnecessary partitions correctly") { - // scalastyle:off var bytesReadWithoutPruning = 0L var bytesReadWithPruning = 0L var inMemoryPartitionMetadata = false @@ -499,14 +500,56 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { Seq("true", "false").foreach { enabled => withSQLConf(SQLConf.IN_MEMORY_PARTITION_METADATA.key -> enabled) { inMemoryPartitionMetadata = conf.inMemoryPartitionMetadata - val df1 = (0 until 1000000).toDF("value").repartition(4).cache() - df1.where("value >= 999999").collect() - val resultArr = df1.where("value >= 999999").collect() + val df1 = (0 until 100000).toDF("value").repartition(4).cache() + df1.where("value >= 99999").collect() + val resultArr = df1.where("value >= 99999").collect() assert(resultArr.length == 1) - assert(resultArr.head.getInt(0) == 999999) + assert(resultArr.head.getInt(0) == 99999) df1.unpersist(true) } } + assert(bytesReadWithoutPruning > 0) + assert(bytesReadWithPruning > 0) assert(bytesReadWithoutPruning > bytesReadWithPruning * 3) } + + test("generate correct results when metadata block is removed") { + var bytesReadWithMetadata = 0L + var bytesReadWithoutMetadata = 0L + @volatile var removePartitionMetadata = false + sparkContext.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val metrics = taskEnd.taskMetrics + if (removePartitionMetadata) { + bytesReadWithoutMetadata += metrics.inputMetrics.bytesRead + } else { + bytesReadWithMetadata += metrics.inputMetrics.bytesRead + } + } + }) + Seq("true").foreach { enabled => + withSQLConf(SQLConf.IN_MEMORY_PARTITION_METADATA.key -> enabled) { + removePartitionMetadata = true + val df1 = (0 until 100000).toDF("value").repartition(4).cache() + val inMemoryRelation = df1.queryExecution.optimizedPlan.collect { + case m: InMemoryRelation => m + } + df1.where("value >= 99999").collect() + (0 until 4).foreach(partitionId => SparkEnv.get.blockManager.removeBlock( + RDDPartitionMetadataBlockId(inMemoryRelation.head.cachedColumnBuffers.id, partitionId))) + var resultArr = df1.where("value >= 99999").collect() + assert(resultArr.length === 1) + assert(resultArr.head.getInt(0) === 99999) + // scalastyle:off + removePartitionMetadata = false + resultArr = df1.where("value >= 99999").collect() + assert(resultArr.length === 1) + assert(resultArr.head.getInt(0) === 99999) + df1.unpersist(blocking = true) + assert(bytesReadWithMetadata > 0) + assert(bytesReadWithoutMetadata > 0) + assert(bytesReadWithoutMetadata > bytesReadWithMetadata * 3) + } + } + } } From 77cf789d8b2b123258f1873801a6fe3a5e4d7aa0 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 10 Nov 2017 20:40:11 -0800 Subject: [PATCH 36/36] generate correct results when data block is removed --- .../columnar/InMemoryColumnarQuerySuite.scala | 43 ++++++++++++++++++- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 5e4966a9f5156..4de87d56b92db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ -import org.apache.spark.storage.RDDPartitionMetadataBlockId +import org.apache.spark.storage.{RDDBlockId, RDDPartitionMetadataBlockId} import org.apache.spark.storage.StorageLevel._ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { @@ -486,7 +486,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("table cache can prune unnecessary partitions correctly") { var bytesReadWithoutPruning = 0L var bytesReadWithPruning = 0L - var inMemoryPartitionMetadata = false + @volatile var inMemoryPartitionMetadata = false sparkContext.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val metrics = taskEnd.taskMetrics @@ -552,4 +552,43 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("generate correct results when data block is removed") { + var bytesReadWithCachedBlock = 0L + var bytesReadWithoutCachedBlock = 0L + @volatile var removeCachedBlock = false + sparkContext.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val metrics = taskEnd.taskMetrics + if (removeCachedBlock) { + bytesReadWithoutCachedBlock += metrics.inputMetrics.bytesRead + } else { + bytesReadWithCachedBlock += metrics.inputMetrics.bytesRead + } + } + }) + Seq("true").foreach { enabled => + withSQLConf(SQLConf.IN_MEMORY_PARTITION_METADATA.key -> enabled) { + removeCachedBlock = true + val df1 = (0 until 100000).toDF("value").repartition(4).cache() + val inMemoryRelation = df1.queryExecution.optimizedPlan.collect { + case m: InMemoryRelation => m + } + df1.where("value >= 99999").collect() + (0 until 4).foreach(partitionId => SparkEnv.get.blockManager.removeBlock( + RDDBlockId(inMemoryRelation.head.cachedColumnBuffers.id, partitionId))) + var resultArr = df1.where("value >= 99999").collect() + assert(resultArr.length === 1) + assert(resultArr.head.getInt(0) === 99999) + // scalastyle:off + removeCachedBlock = false + resultArr = df1.where("value >= 99999").collect() + assert(resultArr.length === 1) + assert(resultArr.head.getInt(0) === 99999) + df1.unpersist(blocking = true) + assert(bytesReadWithCachedBlock > 0) + assert(bytesReadWithoutCachedBlock == 0) + } + } + } }