Skip to content

Conversation

@heary-cao
Copy link
Contributor

@heary-cao heary-cao commented Aug 17, 2017

What changes were proposed in this pull request?

This is a follow-up of #18892 :
Currently, Did a lot of special handling for non-deterministic projects in optimizer. but not good enough. this patch add a new special case for non-deterministic projects. the application logic is as follows, in my spark-shell, Simple creates a 22 column Orc data table.

val df = (0 until 50).map(i => (i % 2, i % 3, i % 4, i % 5, i % 6, i % 7, i % 8, i % 9, i % 10, i % 11,(i % 2).toString, (i % 3).toString, (i % 4).toString, (i % 5).toString, (i % 6).toString, (i % 7).toString, (i % 2).toDouble, (i % 3).toDouble, (i % 4).toDouble, (i % 5).toDouble, (i % 6).toDouble, (i % 7).toDouble)).toDF("i2","i3","i4","i5","i6","i7","i8","i9","i10","i11","s2","s3","s4","s5","s6","s7","d2","d3","d4","d5","d6","d7")
df.write.format("orc").partitionBy("i2").bucketBy(8, "i3","i4","i5","i6","i7","i8","i9","i10","i11","s2","s3","s4","s5","s6","s7","d2","d3","d4","d5","d6","d7").saveAsTable("tableorc")

sql Query statement:

val df_sql = spark.sql("SELECT i2,i3,i4,CASE WHEN i5 < 3 then i5 else ceil(rand(100)*100) end as i5 from tableorc")
println("executed Plan:" + df_sql.queryExecution.executedPlan)
df_sql.show(500)

Before modifiy, executed Plan:

*Project [i2#100, i3#79, i4#80, CASE WHEN (i5#81 < 3) THEN cast(i5#81 as bigint) ELSE CEIL((rand(100) * 100.0)) END AS i5#78L]
+- *FileScan orc default.tableorc[i3#79,i4#80,i5#81,i6#82,i7#83,i8#84,i9#85,i10#86,i11#87,s2#88,s3#89,s4#90,s5#91,s6#92,s7#93,d2#94,d3#95,d4#96,d5#97,d6#98,d7#99,i2#100] Batched: false, Format: ORC, Location: CatalogFileIndex[file:/home/cxw/spark/bin/spark-warehouse/tableorc], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i3:int,i4:int,i5:int,i6:int,i7:int,i8:int,i9:int,i10:int,i11:int,s2:string,s3:string,s4:st...

FileScanRDD read a row of userdata: [0,0,2,3,0,4,2,0,8,7,b800000001,c000000001,c800000001,d000000001,d800000001,e000000001,0,0,4000000000000000,4008000000000000,0,4010000000000000,0,30,30,32,33,30,34]
we read all columns of userdata from orc table. there is such a change, Because the fields of projects is non-deterministic, contains nondeterministic function(rand function). PhysicalOperation excluded that the fields of projects is non-deterministic.

After modifiy, executed Plan:

*Project [i2#87, i3#66, i4#67, CASE WHEN (i5#68 < 3) THEN cast(i5#68 as bigint) ELSE CEIL((rand(100) * 100.0)) END AS i5#65L]
+- *FileScan orc default.tableorc[i3#66,i4#67,i5#68,i2#87] Batched: false, Format: ORC, Location: CatalogFileIndex[file:/home/cxw/spark/bin/spark-warehouse/tableorc], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i3:int,i4:int,i5:int>

FileScanRDD read a row of userdata: [0,0,2,3,0]
we read only need to userdata from orc table.

In addition, HiveTableScans also scan more columns according to the execution plan:

*HashAggregate(keys=[k#403L], functions=[partial_sum(cast(id#402 as bigint))], output=[k#403L, sum#800L])
+- Project [d004#607 AS id#402, FLOOR((rand(8828525941469309371) * 10000.0)) AS k#403L]
   +- HiveTableScan [c030#606L, d004#607, d005#608, d025#609, c002#610, d023#611, d024#612, c005#613L, c008#614, c009#615, c010#616, d021#617, d022#618, c017#619, c018#620, c019#621, c020#622, c021#623, c022#624, c023#625, c024#626, c025#627, c026#628, c027#629, ... 169 more fields], MetastoreRelation XXX_database, XXX_table

and it will affect the performance of task.
Similar query statements as follows:
select k,k,sum(id) from (select d004 as id, floor(rand() * 10000) as k, ceil(c010) as cceila from t054) a group by k,k;

In a spark cluster environment with 1 master and 2 work nodes, the performance before and after modification is 556s vs 5996s.

this PR describe ways to solve the problem.

How was this patch tested?

Should be covered existing test cases and add test cases.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@heary-cao
Copy link
Contributor Author

heary-cao commented Aug 17, 2017

cc @gatorsmile @cloud-fan
Could you take a look again?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should still consider whether the fields are non-deterministic. It makes sense only when the non-deterministic fields are not referencing any attribute. Thus, your use case is pretty rare.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks,
I have modify it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @gatorsmile .
Could you review again?

@heary-cao heary-cao force-pushed the followup-non-deterministic branch from e84425f to 6ab7f15 Compare August 22, 2017 07:37
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha, :(
I misunderstood you.
Is not that we have added a condition.
case p @ Project(fields, child: LeafNode) if p.references.nonEmpty =>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might not worth to fix. This only covers a rare case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be a rare case. but in business scenario, there are still a lot of scenes to use the rare case. similar business scenarios:
1.Random grouping, add a random factor to each row of data before grouping.
2.Use the random value to fill a field, easy to follow algorithm for calculation or prevents querying data anomalies.
3.Data skew, data are discretized using random values.
thanks.

@heary-cao heary-cao force-pushed the followup-non-deterministic branch from 6ab7f15 to 28a7ffa Compare August 24, 2017 07:09
@heary-cao heary-cao force-pushed the followup-non-deterministic branch 2 times, most recently from 7b53c97 to 55af92b Compare September 12, 2017 07:15
@gatorsmile
Copy link
Member

@heary-cao Maybe you can close this PR first? @jiangxb1987 will handle it in the previous PR.

@heary-cao heary-cao closed this Oct 7, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants