Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented May 28, 2016

What changes were proposed in this pull request?

The base class SpecificParquetRecordReaderBase used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader.

How was this patch tested?

Existing tests should be passed.

@SparkQA
Copy link

SparkQA commented May 28, 2016

Test build #59549 has finished for PR 13371 at commit 5687a3b.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented May 28, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented May 28, 2016

Test build #59550 has finished for PR 13371 at commit 5687a3b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented May 30, 2016

cc @nongli @liancheng

@viirya
Copy link
Member Author

viirya commented May 30, 2016

also cc @yhuai

new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)

// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide link to the doc saying it is row group level?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(it is not obvious to know this is just for row group level)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does parquet support row group level predicate evaluation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups in SpecificParquetRecordReaderBase to do filtering.

The implementation of RowGroupFilter is at https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java.

From this, Looks like it does filtering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, as we use the metadata in merged schema to figure out if a field is optional (i.e. not in all parquet files) or not to decide to push down a filter regarding it, this info has been ignored in FileSourceStrategy now. Without the fixing in this change, the push-down row-group level filtering will be failed due to not existing field in parquet file.

@yhuai
Copy link
Contributor

yhuai commented Jun 1, 2016

Can you provide a test case that shows the problem? Also, can you provide benchmark results of the performance benefit?

@viirya
Copy link
Member Author

viirya commented Jun 1, 2016

@yhuai As you can see, this is not to fix a bug/problem. So I think it might be hard to provide a test case for it. I will try to do the benchmark.

@viirya
Copy link
Member Author

viirya commented Jun 1, 2016

BTW, I can't see any reason not to add a row-group level filter for parquet.

@yhuai
Copy link
Contributor

yhuai commented Jun 1, 2016

It is a good idea to add it if parquet supports it (I have an impression that parquet does not support it. But maybe I am wrong). I think having benchmark results is a good practice, so we can avoid it hit any obvious issue.

@viirya
Copy link
Member Author

viirya commented Jun 2, 2016

@yhuai I've run a simple benchmark as following:

test("Benchmark for Parquet") {
  val N = 1 << 20

  val benchmark = new Benchmark("Parquet reader", N)
  benchmark.addCase("reading Parquet file", 1) { iter =>
    withParquetTable((0 until N).map(i => (101, i)), "t") {
      sql("SELECT _1 FROM t where t._1 < 100").collect()
    }
  }
  benchmark.run()
}

Before this patch:

Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
reading Parquet file                        34225 / 34225          0.0       32639.5       1.0X

After this patch:

Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
reading Parquet file                        31350 / 31350          0.0       29897.6       1.0X

@viirya
Copy link
Member Author

viirya commented Jun 3, 2016

ping @yhuai I've addressed the comments. Please take a look again. Thanks!

@viirya
Copy link
Member Author

viirya commented Jun 6, 2016

ping @yhuai again

@viirya
Copy link
Member Author

viirya commented Jun 8, 2016

cc @rxin Can you also take a look of this? This is staying for a while too. Thanks!

@viirya
Copy link
Member Author

viirya commented Jun 8, 2016

cc @cloud-fan too.

…-push-down-filter

Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@viirya
Copy link
Member Author

viirya commented Jun 9, 2016

ping @yhuai @rxin @cloud-fan

@SparkQA
Copy link

SparkQA commented Jun 9, 2016

Test build #60246 has finished for PR 13371 at commit 077f7f8.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jun 9, 2016

Is this a bug fix or performance fix? Sorry I don't really understand after reading your description.

@viirya
Copy link
Member Author

viirya commented Jun 9, 2016

It is not really a bug fix because without this filtering push-down, the thing still works. This should be a performance fix. I should modify the description.

@viirya
Copy link
Member Author

viirya commented Jun 9, 2016

retest this please.

@viirya
Copy link
Member Author

viirya commented Jun 9, 2016

The description is updated.

@SparkQA
Copy link

SparkQA commented Jun 10, 2016

Test build #60256 has finished for PR 13371 at commit 077f7f8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Jun 10, 2016

@viirya I took a look at parquet's code. Seems parquet only evaluate row group level filters when generating splits (https://github.com/apache/parquet-mr/blob/apache-parquet-1.7.0/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java#L673). With FileSourceStrategy in Spark, I am not sure we will actually evaluate filter unneeded row groups as expected. Can you take a look? Also, it will be great if you can have a test to make sure that we actually can skip unneeded row groups. This test can be created as follows.

  1. We first write a parquet file containing multiple row groups. Also, let's that there is a column c and those row groups have disjoint ranges of c's values.
  2. We write a query having a filter on c and we make sure that this query only need a subset of row groups.
  3. We verify that we only create splits for the needed row groups.

@viirya
Copy link
Member Author

viirya commented Jun 10, 2016

@yhuai Parquet also does this filtering at ParquetRecordReader (https://github.com/apache/parquet-mr/blob/apache-parquet-1.7.0/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java#L160) and ParquetReader(https://github.com/apache/parquet-mr/blob/apache-parquet-1.7.0/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L147).

In Spark, we also did this at SpecificParquetRecordReaderBase (

).

I've manually tested it as I mentioned above. But it should be good to have a formal test case for it as you said. I will try to add it later, maybe when I come back to work few days later...

@viirya
Copy link
Member Author

viirya commented Jun 10, 2016

@yhuai Your step 3 may not work. We are going to filter the row groups for each parquet file to read in VectorizedParquetRecordReader. I think we don't do anything regarding creating splits?

@liancheng
Copy link
Contributor

@yhuai We used to support row group level filter push-down before refactoring HadoopFsRelation into FileFormat, but lost it (by accident I guess) after the refactoring. So now we only have row group level filtering when the vectorized reader is not used, see here.

And yes, both ParquetInputFormat and ParquetRecordReader do row group level filtering.

This LGTM. Thanks for fixing it! Merging to master and 2.0.

@asfgit asfgit closed this in bba5d79 Jun 11, 2016
asfgit pushed a commit that referenced this pull request Jun 11, 2016
…quet reader

## What changes were proposed in this pull request?

The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader.

## How was this patch tested?
Existing tests should be passed.

Author: Liang-Chi Hsieh <[email protected]>

Closes #13371 from viirya/vectorized-reader-push-down-filter.

(cherry picked from commit bba5d79)
Signed-off-by: Cheng Lian <[email protected]>
@rxin
Copy link
Contributor

rxin commented Jun 11, 2016

I just talked to @liancheng offline. I don't think we should've merged this until we have verified there is no performance regression, and we definitely shouldn't have merged this in 2.0.

@liancheng can you revert this from both master and branch-2.0?

@viirya can you run some parquet scan benchmark and make sure this does not result in perf regression?

@rxin
Copy link
Contributor

rxin commented Jun 11, 2016

To be more clear, please write a proper benchmark that reads data when filter push down is not useful to compare whether this regress performance for the non-push-down case. Also make sure the benchmark does not include the time it takes to write the parquet data.

@rxin
Copy link
Contributor

rxin commented Jun 11, 2016

And once we have more data, it might make sense to merge this in 2.0!

@viirya
Copy link
Member Author

viirya commented Jun 11, 2016

@rxin One thing needs to be explain is, because we just have one configuration to control filter push down, it affects row-based filter push down and this row-group filter push down.

The benchmark I posted above is running it against this patch (so with both two push down) and master branch (only row-based, without this patch) individually. Of course it includes the time to write the parquet data, I will change it. I want to confirm if this kind of benchmark is enough?

@liancheng
Copy link
Contributor

Reverted from master and branch-2.0.

@viirya For the benchmark, there are two things:

  1. The benchmark also counts Parquet file writing into it, so the real number should be much better than the posted one.
  2. We should also benchmark for cases where no filters are pushed down to verify that this patch doesn't affect normal code path.

@viirya
Copy link
Member Author

viirya commented Jun 11, 2016

@liancheng Got it.

@viirya
Copy link
Member Author

viirya commented Jun 14, 2016

@liancheng

I rerun the benchmark that excludes the time of writing Parquet file:

test("Benchmark for Parquet") {
  val N = 1 << 50
    withParquetTable((0 until N).map(i => (101, i)), "t") {
      val benchmark = new Benchmark("Parquet reader", N)
      benchmark.addCase("reading Parquet file", 10) { iter =>
        sql("SELECT _1 FROM t where t._1 < 100").collect()
      }
      benchmark.run()
  }
}

withParquetTable in default will run tests for vectorized reader non-vectorized readers. I only let it run vectorized reader.

After this patch:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_25-b17 on Linux 3.13.0-57-generic
Westmere E56xx/L56xx/X56xx (Nehalem-C)
Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
reading Parquet file                            76 /   88          3.4         291.0       1.0X

Before this patch:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_25-b17 on Linux 3.13.0-57-generic
Westmere E56xx/L56xx/X56xx (Nehalem-C)
Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
reading Parquet file                            81 /   91          3.2         310.2       1.0X

Next, I run the benchmark for non-pushdown case using the same benchmark code but with disabled pushdown configuration.

After this patch:

Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
reading Parquet file                            80 /   95          3.3         306.5       1.0X

Before this patch:

Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
reading Parquet file                            80 /  103          3.3         306.7       1.0X

For non-pushdown case, from the results, I think this patch doesn't affect normal code path.

@yhuai
Copy link
Contributor

yhuai commented Jun 14, 2016

Can you add results showing that there are skipped row groups with this change (and before this patch all row groups are loaded)?

For those results, let's also put them in the description of the new PR.

@viirya
Copy link
Member Author

viirya commented Jun 16, 2016

@yhuai ok. Do you mean I need to create a new PR for this?

@yhuai
Copy link
Contributor

yhuai commented Jun 16, 2016

Yea. Since this one was closed by asfgit, I am not sure you can reopen it.

On Wed, Jun 15, 2016 at 7:39 PM -0700, "Liang-Chi Hsieh" [email protected] wrote:

@yhuai ok. Do you mean I need to create a new PR for this?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.

@liancheng
Copy link
Contributor

liancheng commented Jun 16, 2016

@viirya One problem in your new benchmark code is that 1 << 50 is actually very small since it's an Int:

scala> 1 << 50
res0: Int = 262144

Anyway, 1 << 50, which is 1PB, might be too large a value for such a microbenchmark :)

So the generated Parquet file probably only contains a single row group, I guess that's why the numbers are so close to each other no matter you enable row group filter push-down or not.

@viirya
Copy link
Member Author

viirya commented Jun 17, 2016

@liancheng Thanks! I didn't notice that. I will rerun the benchmark. I've re-submitted this PR at #13701.

@viirya viirya deleted the vectorized-reader-push-down-filter branch December 27, 2023 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants