Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jun 16, 2016

What changes were proposed in this pull request?

The base class SpecificParquetRecordReaderBase used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader.

The benchmark that excludes the time of writing Parquet file:

test("Benchmark for Parquet") {
  val N = 500 << 12
    withParquetTable((0 until N).map(i => (101, i)), "t") {
      val benchmark = new Benchmark("Parquet reader", N)
      benchmark.addCase("reading Parquet file", 10) { iter =>
        sql("SELECT _1 FROM t where t._1 < 100").collect()
      }
      benchmark.run()
  }
}

withParquetTable in default will run tests for vectorized reader non-vectorized readers. I only let it run vectorized reader.

When we set the block size of parquet as 1024 to have multiple row groups. The benchmark is:

Before this patch:

The retrieved row groups: 8063

Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
reading Parquet file                           825 / 1233          2.5         402.6       1.0X

After this patch:

The retrieved row groups: 0

Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
reading Parquet file                           306 /  503          6.7         149.6       1.0X

Next, I run the benchmark for non-pushdown case using the same benchmark code but with disabled pushdown configuration. This time the parquet block size is default value.

Before this patch:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
reading Parquet file                           136 /  238         15.0          66.5       1.0X

After this patch:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
reading Parquet file                           124 /  193         16.5          60.7       1.0X

For non-pushdown case, from the results, I think this patch doesn't affect normal code path.

I've manually output the totalRowCount in SpecificParquetRecordReaderBase to see if this patch actually filter the row-groups. When running the above benchmark:

After this patch:
totalRowCount = 0

Before this patch:
totalRowCount = 1024000

How was this patch tested?

Existing tests should be passed.

@viirya
Copy link
Member Author

viirya commented Jun 16, 2016

@yhuai I am not sure the row-group info is exposed. But I've manually output the totalRowCount in SpecificParquetRecordReaderBase to check the total number of rows this RecordReader will eventually read. The results are shown in the PR description.

…-push-down-filter2

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@viirya viirya changed the title [SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader [SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader Jun 16, 2016
@SparkQA
Copy link

SparkQA commented Jun 16, 2016

Test build #60628 has finished for PR 13701 at commit 5711ae4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 16, 2016

Test build #60627 has finished for PR 13701 at commit 97ccacf.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]]
    • class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister
    • public static class Prefix
    • abstract class ForeachWriter[T] extends Serializable
    • * case class Person(name: String, age: Long)
    • abstract class SparkStrategy extends GenericStrategy[SparkPlan]
    • class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister
    • case class RefreshResource(path: String)
    • abstract class TextBasedFileFormat extends FileFormat
    • class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister
    • class TextFileFormat extends TextBasedFileFormat with DataSourceRegister
    • class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable

@viirya
Copy link
Member Author

viirya commented Jun 17, 2016

@liancheng I've updated the benchmark. Please take a look. Thanks.

@gatorsmile
Copy link
Member

Just realized my PR #13728 is related to your PR, especially the description of the two configuration spark.sql.parquet.filterPushdown and spark.sql.parquet.enableVectorizedReader

@viirya
Copy link
Member Author

viirya commented Jun 17, 2016

@gatorsmile yea.

@gatorsmile
Copy link
Member

gatorsmile commented Jun 17, 2016

Based on my understanding, this performance result is highly affected by the number of row groups, the number of pruned groups and the number of average rows per group and more. Do you have any thought about how to control the number of rows per group? Based on the test cases in https://github.com/gatorsmile/spark/blob/9967cc72545324e7a542fcf1b49372d977c0011b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala#L519-L547, it sounds like each group has only one row... This is strange to me. Do you know how to control it?

@viirya
Copy link
Member Author

viirya commented Jun 17, 2016

@gatorsmile No, I have no idea currently. To make sure the pushed down filter is working, I've manually check the totalRowCount as shown in PR description.

@gatorsmile
Copy link
Member

See the comment by @HyukjinKwon the filter is applied more than once. That means, it could be expensive if the filter does not prune a lot of rows. Not sure if my understanding is right.

@gatorsmile
Copy link
Member

gatorsmile commented Jun 17, 2016

@rdblue Not sure if you can help us provide more details how to measure the performance benefits? Thank you very much!

@viirya
Copy link
Member Author

viirya commented Jun 20, 2016

ping @liancheng @yhuai I've updated the benchmark results. Please see if you have other thoughts. Thanks!

@viirya
Copy link
Member Author

viirya commented Jun 20, 2016

@liancheng Is it possible to merge into before 2.0 release?

@viirya
Copy link
Member Author

viirya commented Jun 20, 2016

@liancheng @yhuai How about this? Is it ready to merge? Thanks!

…-push-down-filter2

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@viirya
Copy link
Member Author

viirya commented Jun 24, 2016

ping @liancheng @yhuai again...

@SparkQA
Copy link

SparkQA commented Jun 24, 2016

Test build #61143 has finished for PR 13701 at commit 36fd059.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jun 24, 2016

retest this please.

case _ => c
}
}.getOrElse(c)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess a better question is if it is part of the bug fix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use metadata in merged schema to mark the optional field (not existing in all partitions), the metadata is lost after resolving. If we don't add them back, the pushed-down filters will be failed due to non existing field error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Could you add a regression test for this (it's easy to forget)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Should we fix that in l.resolve() ? cc @marmbrus

Copy link
Member Author

@viirya viirya Aug 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I add a regression test, I find that this change is not needed now. I am sure that it doesn't work without this change when I submitted this PR. Not sure which recent commit helps keeping metadata after resolving. I will add the regression test still.

@yhuai
Copy link
Contributor

yhuai commented Jun 24, 2016

Thank you for the testing. Can you also test the case that a file contains multiple row groups and we can avoid of scanning unneeded ones?

Also since it is not fixing a critical bug, let's not merge it into branch-2.0.

@viirya
Copy link
Member Author

viirya commented Jun 24, 2016

@yhuai As I mentioned in the description, I am not sure if we can manipulate row groups as we want, but I have manually tested it to show the actually scanned row numbers.

@SparkQA
Copy link

SparkQA commented Jun 24, 2016

Test build #61147 has finished for PR 13701 at commit 36fd059.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Jun 24, 2016

Sorry. I am not sure I get it. We can set the row group size to a small size. Then, it will not be hard to create a parquet file having multiple row groups.

@viirya
Copy link
Member Author

viirya commented Jun 24, 2016

@yhuai Ok. I will have it a try. But looks like I can only manually test it?

@rdblue
Copy link
Contributor

rdblue commented Jun 24, 2016

@gatorsmile, sorry for the delay, I was evidently not getting notifications until I changed some settings yesterday.

There are a few tests in Parquet that generate files with test data that would be appropriate. The simplest one is TestReadWriteEncodingStats where you can see how to build a file with at least one column that has stats you can verify filters with. To get it working for row groups, you'd just need to set the row group size to something small, like 8k, and bump up the number of records until you get a few groups.

Unfortunately, we don't have any pre-built test cases you can use for a performance baseline, though. I think the best you can do with the data generation approach is validate that row groups are being filtered using an approach like you already have, counting the number of rows.

@SparkQA
Copy link

SparkQA commented Aug 8, 2016

Test build #63337 has finished for PR 13701 at commit 462edc7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Aug 8, 2016

ping @davies Please review the latest changes. Thanks.

"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
// Only for test purpose.
"numRowGroups" -> SQLMetrics.createMetric(sparkContext, "numRowGroups"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we only create this for unit test (manually create in test case)?

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63410 has finished for PR 13701 at commit 0b38ba1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63412 has finished for PR 13701 at commit cee74b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Aug 9, 2016

LGTM, could you fix the conflict (should be trivial)?

…-push-down-filter2

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@viirya
Copy link
Member Author

viirya commented Aug 10, 2016

@davies Thanks. I've fixed it. Waiting for jenkins tests.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63492 has finished for PR 13701 at commit bbc5f7b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
    • class CacheManager extends Logging
    • trait DataSourceScanExec extends LeafExecNode with CodegenSupport
    • case class RowDataSourceScanExec(
    • case class FileSourceScanExec(
    • case class ExternalRDD[T](
    • case class ExternalRDDScanExec[T](
    • case class LogicalRDD(
    • case class RDDScanExec(
    • trait FileRelation
    • case class LocalTableScanExec(
    • abstract class RowIterator
    • trait LeafExecNode extends SparkPlan
    • trait UnaryExecNode extends SparkPlan
    • trait BinaryExecNode extends SparkPlan
    • case class PlanLater(plan: LogicalPlan) extends LeafExecNode
    • abstract class SparkStrategies extends QueryPlanner[SparkPlan]
    • class UnsafeRowSerializer(
    • class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double]
    • class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long]
    • class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long]
    • class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double]
    • case class ScalaUDAF(
    • case class InMemoryRelation(
    • case class InMemoryTableScanExec(
    • trait RunnableCommand extends LogicalPlan with logical.Command
    • case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan
    • case class AlterTableRecoverPartitionsCommand(
    • case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan]
    • class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
    • case class InsertIntoDataSourceCommand(
    • case class InsertIntoHadoopFsRelationCommand(
    • case class PartitionDirectory(values: InternalRow, path: Path)
    • case class PartitionSpec(
    • case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[LogicalPlan])
    • case class JDBCPartition(whereClause: String, idx: Int) extends Partition
    • class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan]
    • case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan]
    • case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan]
    • case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
    • case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport
    • class ExchangeCoordinator(
    • case class MapPartitionsRWrapper(
    • class IncrementalExecution(
    • class ExecutionPage(parent: SQLTab) extends WebUIPage(\"execution\") with Logging
    • class SQLHistoryListenerFactory extends SparkHistoryListenerFactory
    • class SQLListener(conf: SparkConf) extends SparkListener with Logging
    • class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
    • class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
    • case class SparkPlanGraph(

@viirya
Copy link
Member Author

viirya commented Aug 10, 2016

@davies The accumulator could be released by JVM early for optimization. I made new change to prevent it.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63503 has finished for PR 13701 at commit a2ba343.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Aug 10, 2016

an unrelated test failed...

@viirya
Copy link
Member Author

viirya commented Aug 10, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63512 has finished for PR 13701 at commit a2ba343.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Aug 10, 2016

@davies Due to the accumulators for vectorized setting "true" and "false" are with the same names "numRowGroups", the API looking for the accumulator will return one among them. So the test becomes unstable. By removing it from AccumulatorContext at the end of test, it should solve this problem.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63532 has finished for PR 13701 at commit ca074f1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Aug 10, 2016

Merging this into master and 2.0, thanks!

@asfgit asfgit closed this in 19af298 Aug 10, 2016
asfgit pushed a commit that referenced this pull request Aug 10, 2016
… for parquet reader

The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader.

The benchmark that excludes the time of writing Parquet file:

    test("Benchmark for Parquet") {
      val N = 500 << 12
        withParquetTable((0 until N).map(i => (101, i)), "t") {
          val benchmark = new Benchmark("Parquet reader", N)
          benchmark.addCase("reading Parquet file", 10) { iter =>
            sql("SELECT _1 FROM t where t._1 < 100").collect()
          }
          benchmark.run()
      }
    }

`withParquetTable` in default will run tests for vectorized reader non-vectorized readers. I only let it run vectorized reader.

When we set the block size of parquet as 1024 to have multiple row groups. The benchmark is:

Before this patch:

The retrieved row groups: 8063

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           825 / 1233          2.5         402.6       1.0X

After this patch:

The retrieved row groups: 0

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           306 /  503          6.7         149.6       1.0X

Next, I run the benchmark for non-pushdown case using the same benchmark code but with disabled pushdown configuration. This time the parquet block size is default value.

Before this patch:

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           136 /  238         15.0          66.5       1.0X

After this patch:

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           124 /  193         16.5          60.7       1.0X

For non-pushdown case, from the results, I think this patch doesn't affect normal code path.

I've manually output the `totalRowCount` in `SpecificParquetRecordReaderBase` to see if this patch actually filter the row-groups. When running the above benchmark:

After this patch:
    `totalRowCount = 0`

Before this patch:
    `totalRowCount = 1024000`

Existing tests should be passed.

Author: Liang-Chi Hsieh <[email protected]>

Closes #13701 from viirya/vectorized-reader-push-down-filter2.

(cherry picked from commit 19af298)
Signed-off-by: Davies Liu <[email protected]>
accu.register(sparkContext, Some("numRowGroups"))

val df = spark.read.parquet(path).filter("a < 100")
df.foreachPartition(_.foreach(v => accu.add(0)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this test? shouldn't accu always be zero?

Copy link
Member Author

@viirya viirya Apr 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SpecificParquetRecordReaderBase, it looks for an accumulator numRowGroups if any and update it with the row group number to read. It is for test purpose only.

Here we force this trivial foreach function refer this accumulator, but doesn't change it, so the executor side can see it.

asfgit pushed a commit that referenced this pull request Sep 7, 2017
…f and docs

## What changes were proposed in this pull request?

Since [SPARK-15639](#13701), `spark.sql.parquet.cacheMetadata` and `PARQUET_CACHE_METADATA` is not used. This PR removes from SQLConf and docs.

## How was this patch tested?

Pass the existing Jenkins.

Author: Dongjoon Hyun <[email protected]>

Closes #19129 from dongjoon-hyun/SPARK-13656.
@viirya viirya deleted the vectorized-reader-push-down-filter2 branch December 27, 2023 18:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants