Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Nov 13, 2019

What changes were proposed in this pull request?

This PR proposes to add as API to RelationalGroupedDataset. It creates KeyValueGroupedDataset instance using given grouping expressions, instead of a typed function in groupByKey API. Because it can leverage existing columns, it can use existing data partition, if any, when doing operations like cogroup.

Why are the changes needed?

Currently if users want to do cogroup on DataFrames, there is no good way to do except for KeyValueGroupedDataset.

  1. KeyValueGroupedDataset ignores existing data partition if any. That is a problem.
  2. groupByKey calls typed function to create additional keys. You can not reuse existing columns, if you just need grouping by them.
// df1 and df2 are certainly partitioned and sorted.
val df1 = Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c")
  .repartition($"a").sortWithinPartitions("a")
val df2 = Seq((1, 2, 4), (2, 3, 5)).toDF("a", "b", "c")
  .repartition($"a").sortWithinPartitions("a")
// This groupBy.as.cogroup won't unnecessarily repartition the data 
val df3 = df1.groupBy("a").as[Int]
  .cogroup(df2.groupBy("a").as[Int]) { case (key, data1, data2) =>
    data1.zip(data2).map { p =>
      p._1.getInt(2) + p._2.getInt(2)
    }
}
== Physical Plan ==
*(5) SerializeFromObject [input[0, int, false] AS value#11247]
+- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4922/1206709281@6eec1b6f, a#11209: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [a#11209], [a#11225], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11246: int
   :- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#11209, 5), false, [id=#10218]
   :     +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211]
   :        +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204]
   +- *(4) Sort [a#11225 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#11225, 5), false, [id=#10223]
         +- *(3) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227]
            +- *(3) LocalTableScan [_1#11218, _2#11219, _3#11220]
// Current approach creates additional AppendColumns and repartition data again
val df4 = df1.groupByKey(r => r.getInt(0)).cogroup(df2.groupByKey(r => r.getInt(0))) {
  case (key, data1, data2) =>
    data1.zip(data2).map { p =>
      p._1.getInt(2) + p._2.getInt(2)
  }
}
== Physical Plan ==
*(7) SerializeFromObject [input[0, int, false] AS value#11257]
+- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4933/1381027007@37171997, value#11252: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [value#11252], [value#11254], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11256: int
   :- *(3) Sort [value#11252 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(value#11252, 5), true, [id=#10302]
   :     +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4930/1952919534@7ce07f47, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11252]
   :        +- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0
   :           +- Exchange hashpartitioning(a#11209, 5), false, [id=#10297]
   :              +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211]
   :                 +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204]
   +- *(6) Sort [value#11254 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(value#11254, 5), true, [id=#10312]
         +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4932/1526528849@1f0e0c1f, createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11254]
            +- *(5) Sort [a#11225 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(a#11225, 5), false, [id=#10307]
                  +- *(4) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227]
                     +- *(4) LocalTableScan [_1#11218, _2#11219, _3#11220]

Does this PR introduce any user-facing change?

Yes, this adds a new as API to RelationalGroupedDataset. Users can use it to create KeyValueGroupedDataset and do cogroup.

How was this patch tested?

Unit tests.

@viirya
Copy link
Member Author

viirya commented Nov 13, 2019

cc @cloud-fan @HyukjinKwon @hagerf

@SparkQA
Copy link

SparkQA commented Nov 14, 2019

Test build #113733 has finished for PR 26509 at commit 61b1947.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RelationalGroupedDataset[T] protected[sql](

@viirya
Copy link
Member Author

viirya commented Nov 14, 2019

retest this please.

@SparkQA
Copy link

SparkQA commented Nov 14, 2019

Test build #113744 has finished for PR 26509 at commit 61b1947.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RelationalGroupedDataset[T] protected[sql](

@SparkQA
Copy link

SparkQA commented Nov 14, 2019

Test build #113748 has finished for PR 26509 at commit c5f2e26.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Stable
class RelationalGroupedDataset protected[sql](
private[sql] val df: DataFrame,
class RelationalGroupedDataset[T] protected[sql](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a stable API, is it OK to add type parameter? @srowen @dongjoon-hyun

Copy link
Member

@dongjoon-hyun dongjoon-hyun Nov 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the goal seems to extend Dataset[Row] to Dataset[T] in 3.0.0, I'm not sure about this approach. Not only this change (the generic class), line 51 also changes from df: DataFrame to ds: Dataset[T]. If there is some 3rd party classes extending this with override, this variable name and type change will break those classes.

@viirya . Do we need this change for your original goal, Add API ...?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, it seems that MiMa doesn't complain about this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option is to add new API as .as[(K, T)]. It's not ideal as T should be known when we create the RelationalGroupedDataset, but we can avoid changing the stable API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a binary-incompatible change because of type erasure, so MiMa doesn't flag it. However it will indeed probably not be source-compatible. We should only do it if there's a pretty necessary reason in 3.0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun > Do we need this change for your original goal, Add API ...?
To change from df: DataFrame to ds: Dataset[T] is because we need the typed information. For DataFrame, we have no idea about original object type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.as[(K, T)] sounds good so we don't need changing the stable API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would break source compatibility.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just add as[K, T] and do not add typed parameter to RelationalGroupedDataset class, does it still break source compatibility?

val additionalCols = aliasedGrps.filter(g => !df.logicalPlan.outputSet.contains(g.toAttribute))
val qe = Dataset.ofRows(
df.sparkSession,
Project(df.logicalPlan.output ++ additionalCols, df.logicalPlan)).queryExecution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems inefficient. Can we make KeyValueGroupedDataset.groupingAttributes a Seq[Expression]?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it also break break source compatibility?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groupingAttributes is private in KeyValueGroupedDataset

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyValueGroupedDataset does not produce grouping attributes. These grouping attributes still come from the given queryExecution.

If we change groupingAttributes to Seq[NamedExpression], we still need add this Project to produce these grouping attributes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the grouping attributes? AFAIK it's used to specify the required distribution, which doesn't have to be attributes.

Copy link
Member Author

@viirya viirya Nov 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, we use groupingAttributes to construct MapGroups in flatMapGroups. These should be attributes so the UnresolvedDeserializer can be resolved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make MapGroupsExec follow aggregate, to calculate grouping columns ahead and put it in the buffer row, then run key/value deserializer on buffer row.

But I admit that it's a big change, we can do it in the future.

@rxin
Copy link
Contributor

rxin commented Nov 14, 2019

This is going to break source compatibility in major ways. Doesn't make sense to do it this way.

@SparkQA
Copy link

SparkQA commented Nov 15, 2019

Test build #113856 has finished for PR 26509 at commit 576558d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RelationalGroupedDataset protected[sql](

@viirya
Copy link
Member Author

viirya commented Nov 15, 2019

retest this please.

@SparkQA
Copy link

SparkQA commented Nov 15, 2019

Test build #113860 has finished for PR 26509 at commit 576558d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RelationalGroupedDataset protected[sql](

@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114122 has finished for PR 26509 at commit 04aa387.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114137 has finished for PR 26509 at commit 5b0923c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114144 has finished for PR 26509 at commit 5b0923c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Merged to master.
Thank you all!

@hagerf
Copy link

hagerf commented Nov 22, 2019

Thanks a lot all for solving my issue, especially @viirya 🙌

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too. Sorry for late response.

@viirya viirya deleted the SPARK-29427-2 branch December 27, 2023 18:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants