Skip to content

Conversation

@NarineK
Copy link
Contributor

@NarineK NarineK commented May 4, 2016

What changes were proposed in this pull request?

Implement repartitionByColumn on DataFrame.
This will allow us to run R functions on each partition identified by column groups with dapply() method.

How was this patch tested?

Unit tests

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57712 has finished for PR 12887 at commit 87c0a58.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

#' df <- read.json(sqlContext, path)
#' newDF <- repartitionByColumn(df, df$col1, df$col2)
#'}
setMethod("repartitionByColumn",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this just be repartition with a Column parameter, instead of a different name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @felixcheung ,

thanks for your prompt response.
That was my first try too, however there already exist a definition of repartition and if I try the following:

setGeneric("repartition",
           function(x, col, ...) {
             standardGeneric("repartition")
          })

It fails saying:
unused argument (numPartitions = c("numeric", ""))
Error : unable to load R code in package ‘SparkR’

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically you would need to remove that from the signature line and add default value in the function line, something like:

setMethod("repartition",
        signature(x = "SparkDataFrame"),
        function(x, numPartitions = NULL, col = NULL)

and then check for which one of numPartitions or col is set, that they are the right type (since types are not specified now in the signature), and that they are not both set and so on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is one of the possible options.
We are not forcing by signature, but we have to do some checks instead.
Whichever you prefer, I'm fine with it too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @felixcheung's proposal is good - better to not introduce a new keyword if the existing one suffices

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57714 has finished for PR 12887 at commit 5752f2d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57715 has finished for PR 12887 at commit 3ee277a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57790 has finished for PR 12887 at commit 8e9e34c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57801 has finished for PR 12887 at commit 01beaba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

#'
#' Return a new SparkDataFrame that has exactly numPartitions partitions.
#' There are two different options for repartition
#' Option 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roxygen2 by default strip out all the whitespace and new line.
if you want you could put these into \items or trail by \cr
http://stackoverflow.com/questions/9267584/when-documenting-in-roxygen-how-do-i-make-an-itemized-list-in-details
http://r-pkgs.had.co.nz/man.html

@shivaram
Copy link
Contributor

shivaram commented May 4, 2016

cc @davies to just confirm if this is what you proposed in #12836

# @seealso coalesce
# @export
setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") })
setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be function(x, numPartitions, ...) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, we want the numPartitions be optional.

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57814 has finished for PR 12887 at commit 02f81db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57816 has finished for PR 12887 at commit e48bb22.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@NarineK
Copy link
Contributor Author

NarineK commented May 4, 2016

@shivaram , @felixcheung , are you fine with default number of partitions: 200 or do you prefer an error message ?

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57818 has finished for PR 12887 at commit 8852892.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

shivaram commented May 4, 2016

Lets do the same thing as the scala / python API

@NarineK
Copy link
Contributor Author

NarineK commented May 4, 2016

This is what I see in python:
https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py#L434

They raise an error!

@SparkQA
Copy link

SparkQA commented May 5, 2016

Test build #57821 has finished for PR 12887 at commit cf54f09.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

shivaram commented May 5, 2016

From the python comment and examples it looks like numPartitions is not required if the columns are specified.

Also made numPartitions optional if partitioning columns are specified.

Can we match that behavior ?

@NarineK
Copy link
Contributor Author

NarineK commented May 5, 2016

@shivaram
Copy link
Contributor

shivaram commented May 5, 2016

Ah yes - I missed that. I think the logic is fine and matches the python API. LGTM. One minor thing: could we add test cases for all 3 scenarios detailed in the description ? I think we have it for the only column is specified case right now

@NarineK
Copy link
Contributor Author

NarineK commented May 5, 2016

sure!

@SparkQA
Copy link

SparkQA commented May 5, 2016

Test build #57833 has finished for PR 12887 at commit 9f85704.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

shivaram commented May 5, 2016

LGTM. Thanks @NarineK -- @davies Can you take one final look and then merge ?

@felixcheung
Copy link
Member

LGTM

#' the given columns into `numPartitions`.}
#' \item{"Option 2"} {Return a new SparkDataFrame that has exactly `numPartitions`.}
#' \item{"Option 3"} {Return a new SparkDataFrame partitioned by the given columns,
#' preserving the existing number of partitions.}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If numPartitions is not specified, the number of partition will be spark.sql.shuffle.partitions. Could you double check the doc on Scala and Python?

Copy link
Contributor Author

@NarineK NarineK May 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that python is raising an error:
https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py#L434

As far as I understand scala requires parameters by signature. I do not see repartition with empty or default parameter.
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2162

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We support:

repartition(N)
repartition(N, col1, col2)
repartition(col1, col2)

For the third case, the number of partition is spark.sql.shuffle.partitions, not preserving the existing number of partitions.

Have I misunderstood something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @NarineK is referring to the scala doc for repartition which says

  /**
   * Returns a new [[Dataset]] partitioned by the given partitioning expressions preserving
   * the existing number of partitions. The resulting Datasetis hash partitioned.
...
   * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
   */
  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not correct, please update it. see the doc for RepartitionByExpression:

/**
 * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
 * information about the number of partitions during execution. Used when a specific ordering or
 * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
 * `coalesce` and `repartition`.
 * If `numPartitions` is not specified, the number of partitions will be the number set by
 * `spark.sql.shuffle.partitions`.
 */
>>> spark.range(0, 100, 1, 1).repartition(col("id")).rdd.getNumPartitions()
200

Copy link
Contributor Author

@NarineK NarineK May 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, @shivaram , I did refer to scala doc in Dataset.scala.

In the reality for the case - repartition(col1, col2) - internally in the logical plan spark.sql.shuffle.partitions is being used

/**
 * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
 * information about the number of partitions during execution. Used when a specific ordering or
 * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
 * `coalesce` and `repartition`.
 * If `numPartitions` is not specified, the number of partitions will be the number set by
 * `spark.sql.shuffle.partitions`.
 */
case class RepartitionByExpression(
    partitionExpressions: Seq[Expression],
    child: LogicalPlan,
    numPartitions: Option[Int] = None) extends RedistributeData {
  numPartitions match {
    case Some(n) => require(n > 0, "numPartitions must be greater than 0.")
    case None => // Ok
  }
}

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala#L38

@NarineK
Copy link
Contributor Author

NarineK commented May 5, 2016

So, do you want me to update the Dataset.scala too or only the doc in R ?

@davies
Copy link
Contributor

davies commented May 5, 2016

@NarineK It will be great if could update that too.

@NarineK
Copy link
Contributor Author

NarineK commented May 5, 2016

sure!

@NarineK
Copy link
Contributor Author

NarineK commented May 5, 2016

Dear Jenkins, please test!

@shivaram
Copy link
Contributor

shivaram commented May 5, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented May 5, 2016

Test build #57854 has finished for PR 12887 at commit 700f886.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

LGTM

@davies
Copy link
Contributor

davies commented May 5, 2016

Merging this into master and 2.0 branch, thanks!

asfgit pushed a commit that referenced this pull request May 5, 2016
…rames

## What changes were proposed in this pull request?

Implement repartitionByColumn on DataFrame.
This will allow us to run R functions on each partition identified by column groups with dapply() method.

## How was this patch tested?

Unit tests

Author: NarineK <[email protected]>

Closes #12887 from NarineK/repartitionByColumns.

(cherry picked from commit 22226fc)
Signed-off-by: Davies Liu <[email protected]>
@asfgit asfgit closed this in 22226fc May 5, 2016
@SparkQA
Copy link

SparkQA commented May 5, 2016

Test build #57906 has finished for PR 12887 at commit cad7ce8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants