Skip to content

Conversation

@yucai
Copy link
Contributor

@yucai yucai commented Aug 23, 2018

What changes were proposed in this pull request?

Currently, filter pushdown will not work if Parquet schema and Hive metastore schema are in different letter cases even spark.sql.caseSensitive is false.

Like the below case:

spark.sparkContext.hadoopConfiguration.setInt("parquet.block.size", 8 * 1024 * 1024)
spark.range(1, 40 * 1024 * 1024, 1, 1).sortWithinPartitions("id").write.parquet("/tmp/t")
sql("CREATE TABLE t (ID LONG) USING parquet LOCATION '/tmp/t'")
sql("select * from t where id < 100L").write.csv("/tmp/id")

Although filter "ID < 100L" is generated by Spark, it fails to pushdown into parquet actually, Spark still does the full table scan when reading.
This PR provides a case-insensitive field resolution to make it work.

Before - "ID < 100L" fail to pushedown:
screen shot 2018-08-23 at 10 08 26 pm
After - "ID < 100L" pushedown sucessfully:
screen shot 2018-08-23 at 10 08 40 pm

How was this patch tested?

Added UTs.

@SparkQA
Copy link

SparkQA commented Aug 23, 2018

Test build #95147 has finished for PR 22197 at commit 5902afe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2018

Test build #95149 has finished for PR 22197 at commit 2226eae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(parquetSchema, _))
.flatMap(parquetFilters.createFilter(parquetSchema, _, isCaseSensitive))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pass this config when creating ParquetFilters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that way is better.

@SparkQA
Copy link

SparkQA commented Aug 25, 2018

Test build #95247 has finished for PR 22197 at commit c76189d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

assertResult(None) {
caseInsensitiveParquetFilters.createFilter(
dupParquetSchema, sources.EqualTo("CINT", 1000))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add one negative test that having name names in case insensitive modes, for example, cInt, CINT and check if that throws an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, thanks!


/**
* Returns a map from name of the column to the data type, if predicate push down applies.
* Returns nameMap and typeMap based on different case sensitive mode, if predicate push
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of returning 2 maps, can we just add a originalName field to ParquetSchemaType?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for avoiding returning 2maps if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea!

@yucai
Copy link
Contributor Author

yucai commented Aug 25, 2018

@cloud-fan @HyukjinKwon Seem cannot simply add originalName into ParquetSchemaType.

Because we need exact ParquetSchemaType info for type match, like:

  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null)
  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null)
  private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null)
  ...
private val makeEq: = {
  case ParquetByteType | ParquetShortType | ParquetIntegerType =>

I use a new case class ParquetField to reduce the map.

  private case class ParquetField(
      name: String,
      schema: ParquetSchemaType)

Let me know if you are OK with this way.

@SparkQA
Copy link

SparkQA commented Aug 25, 2018

Test build #95253 has finished for PR 22197 at commit 10cd89f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

} else {
// Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
// mode, just skip pushdown for these fields, they will trigger Exception when reading,
// See: SPARK-25132.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't need to consider ambiguity, can't we just lowercase f.getName above instead of doing dedup here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a good question!

Let's see the scenario like below:

  1. parquet file has duplicate fields "a INT, A INT".
  2. user wants to pushdown "A > 0".

Without dedup, we possible pushdown "a > 0" instead of "A > 0",
although it is wrong, it will still trigger the Exception finally when reading parquet,
so whether dedup or not, we will get the same result.

@cloud-fan , @gatorsmile any idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do the dedup before parquet filter pushdown and parquet column pruning? Then we can simplify the code in both cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @yucai

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, it is a great idea, thanks!
I think it is not to "dedup" before pushdown and pruning.
Maybe we should do parquet schema clip before pushdown and pruning.
If duplicated fields are detected, throw the exception.
If not, pass clipped parquet schema via hadoopconf to parquet lib.

    catalystRequestedSchema = {
      val conf = context.getConfiguration
      val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
      assert(schemaString != null, "Parquet requested schema not set.")
      StructType.fromString(schemaString)
    }

    val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
      SQLConf.CASE_SENSITIVE.defaultValue.get)
    val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
      context.getFileSchema, catalystRequestedSchema, caseSensitive)

I am trying this way, will update soon.

val caseInsensitiveParquetFilters =
new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
conf.parquetFilterPushDownInFilterThreshold, caseSensitive = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a method like:

def createParquetFilter(caseSensitive: Boolean) = {
  new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
    conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
    conf.parquetFilterPushDownInFilterThreshold, caseSensitive = caseSensitive)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, thanks!

testCaseInsensitiveResolution(
schema,
FilterApi.gtEq(intColumn("cint"), 1000: Integer),
sources.GreaterThanOrEqual("CINT", 1000))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we don't need to test against so many predicate. We just want to make sure case insensitive resolution work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each test is corresponding to one line code change in createFilter. Like:

      case sources.IsNull(name) if canMakeFilterOn(name, null) =>
        makeEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, null))

All tests together can cover all my change in createFilter.

}

test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet" +
" - exception when duplicate fields in case-insensitive mode") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can have just exception when duplicate fields in case-insensitive mode as test title. Original one is too verbose.

caseSensitive: Boolean) {

private case class ParquetField(
name: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolvedName? This name and the name in schema looks confused in following code.

@gatorsmile
Copy link
Member

@dongjoon-hyun Do you think we face the same issue in ORC?

*/
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
val nameToType = getFieldMap(schema)
val nameToParquet = getFieldMap(schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> nameToParquetField

@gatorsmile
Copy link
Member

This PR is basically trying to resolve case sensitivity when the logical schema and physical schema do not match. This sounds like a general issue in all the data sources. Could any of you do us a favor? Check whether all the built-in data sources respect the conf spark.sql.caseSensitive in this case?

pushDownInFilterThreshold: Int,
caseSensitive: Boolean) {

private case class ParquetField(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a description about these two fields? It is confusing what is resolvedName for the future code maintainer.

@yucai
Copy link
Contributor Author

yucai commented Aug 26, 2018

@gatorsmile I can help check spark.sql.caseSensitive for all the built-in data sources.

@SparkQA
Copy link

SparkQA commented Aug 26, 2018

Test build #95256 has finished for PR 22197 at commit 1ea94cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 26, 2018

Test build #95258 has finished for PR 22197 at commit 10c437e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 26, 2018

Test build #95257 has finished for PR 22197 at commit 90b8717.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yucai
Copy link
Contributor Author

yucai commented Aug 26, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Aug 26, 2018

Test build #95264 has finished for PR 22197 at commit 10c437e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Aug 27, 2018

Thanks, @yucai . Could you rebase your code to master branch and update the PR description? Also please update SPARK-25207 together.

According to your example, this issue is a general regression introduced at Spark 2.4. It's not specific to schema mismatch case. For example, in the following schema matched case, the input size is less than or equal to 8.0 MB in Spark 2.3.1, but now master seems to show the following.

spark.sparkContext.hadoopConfiguration.setInt("parquet.block.size", 8 * 1024 * 1024)
spark.range(1, 40 * 1024 * 1024, 1, 1).sortWithinPartitions("id").write.mode("overwrite").parquet("/tmp/t")
sql("CREATE TABLE t (id LONG) USING parquet LOCATION '/tmp/t'")
// It should be less than and equal to 8MB.
sql("select * from t where id < 100L").show()      
// It's already less than and equal to 8MB.
sql("select * from t where id < 100L").write.mode("overwrite").csv("/tmp/id")  

image

Also, if you don't mind, could you update the PR description? This PR doesn't generate new filters here. This only changes field resolution logic. With and without this PR, there exists filters and there occurs filter push-downs. If I'm wrong, please correct me.

- No filter will be pushdown.
+ Wrong filters will be pushed down.

}

test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
withTempDir { dir =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: withTempPath

@dongjoon-hyun
Copy link
Member

@gatorsmile . I don't think so we have this regression on ORC data source.
However, there was another JIRA report, SPARK-25175 5 days ago. There was not much details on it. So, I'm still monitoring that issue.

@yucai
Copy link
Contributor Author

yucai commented Aug 27, 2018

@dongjoon-hyun In the schema matched case as you listed, it is expected behavior in current master.

spark.sparkContext.hadoopConfiguration.setInt("parquet.block.size", 8 * 1024 * 1024)
spark.range(1, 40 * 1024 * 1024, 1, 1).sortWithinPartitions("id").write.mode("overwrite").parquet("/tmp/t")
sql("CREATE TABLE t (id LONG) USING parquet LOCATION '/tmp/t'")

// master and 2.3 have different plan for top limit (see below), that's why 28.4 MB are read in master
sql("select * from t where id < 100L").show()

This difference is probably introduced by #21573, @cloud-fan, current master read more data than 2.3 for top limit like in #22197 (comment) , is it a regression or not?

Master:
image

2.3 branch:
image

@gatorsmile
Copy link
Member

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Duplicate column name c1 in the table definition.
	at org.apache.hadoop.hive.ql.metadata.Table.validateColumns(Table.java:952)
	at org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:216)
	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:495)
	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:484)
	... 88 more

@cloud-fan
Copy link
Contributor

Is it acceptable?

apparently not...

OK let's just check duplicated filed names twice: one in filter pushdown, one in column pruning. And clean it up in followup PRs.

@yucai
Copy link
Contributor Author

yucai commented Aug 30, 2018

@dongjoon-hyun Sorry for the late response, description is changed to:

Although filter "ID < 100L" is generated by Spark, it fails to pushdown into parquet actually, Spark still does the full table scan when reading.
This PR provides a case-insensitive field resolution to make it work.

Let me know if you have any suggestion :).

@yucai
Copy link
Contributor Author

yucai commented Aug 30, 2018

@cloud-fan I reverted to the previous version.

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95454 has finished for PR 22197 at commit 04b88c5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95449 has finished for PR 22197 at commit cb03fb7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yucai
Copy link
Contributor Author

yucai commented Aug 30, 2018

retest this please


withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
val e = intercept[SparkException] {
sql(s"select a from $tableName where b > 0").collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we read this table with case-sensitive mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can, see below.

val tableName = "test"
val tableDir = "/tmp/data"
spark.conf.set("spark.sql.caseSensitive", true)
spark.range(10).selectExpr("id as A", "2 * id as B", "3 * id as b").write.mode("overwrite").parquet(tableDir)
sql(s"DROP TABLE $tableName")
sql(s"CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET LOCATION '$tableDir'")
scala> sql("select A from test where B > 0").show
+---+
|  A|
+---+
|  7|
|  8|
|  9|
|  2|
|  3|
|  4|
|  5|
|  6|
|  1|
+---+

Let me add one test case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to be consistent with the following query, I'd make this query as select A from $tableName where B > 0 too.

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95455 has finished for PR 22197 at commit 04b88c5.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95456 has finished for PR 22197 at commit 41a7b83.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95462 has finished for PR 22197 at commit 41a7b83.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yucai
Copy link
Contributor Author

yucai commented Aug 31, 2018

@cloud-fan, tests have passed. And I will use a followup PR to make it cleaner.

caseSensitive: Boolean) {

private case class ParquetField(
// field name in parquet file
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just move those into the doc for this case class above, for instance,

/**
 * blabla
 * @param blabla
 */
private case class ParquetField

@HyukjinKwon
Copy link
Member

Seems fine to me too.

@SparkQA
Copy link

SparkQA commented Aug 31, 2018

Test build #95517 has finished for PR 22197 at commit e0d6196.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@viirya
Copy link
Member

viirya commented Aug 31, 2018

One minor comment that can be addressed in a follow-up PR. LGTM.

@SparkQA
Copy link

SparkQA commented Aug 31, 2018

Test build #95524 has finished for PR 22197 at commit e0d6196.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants