-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-7243][SQL] Contingency Tables for DataFrames #5842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
27a5a81
fd53b00
7f098bc
939b7c4
6805df8
a0cad97
a63ad00
bced829
9106585
ae9e01d
a07c01e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,11 +17,14 @@ | |
|
|
||
| package org.apache.spark.sql.execution.stat | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.Cast | ||
| import org.apache.spark.Logging | ||
| import org.apache.spark.sql.{Column, DataFrame} | ||
| import org.apache.spark.sql.types.{DoubleType, NumericType} | ||
| import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Cast} | ||
| import org.apache.spark.sql.catalyst.plans.logical.LocalRelation | ||
| import org.apache.spark.sql.functions._ | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
| private[sql] object StatFunctions { | ||
| private[sql] object StatFunctions extends Logging { | ||
|
|
||
| /** Calculate the Pearson Correlation Coefficient for the given columns */ | ||
| private[sql] def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = { | ||
|
|
@@ -95,4 +98,32 @@ private[sql] object StatFunctions { | |
| val counts = collectStatisticalData(df, cols) | ||
| counts.cov | ||
| } | ||
|
|
||
| /** Generate a table of frequencies for the elements of two columns. */ | ||
| private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = { | ||
| val tableName = s"${col1}_$col2" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my previous comment, I mean this minor: It would be good to check pandas' OR R's naming for this column and follow one of them.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pandas and R have the concept of row names, which we currently don't. We have to have the first column as the "row names". |
||
| val counts = df.groupBy(col1, col2).agg(col(col1), col(col2), count("*")).take(1e8.toInt) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check the size of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @brkyvz can you submit a follow up pr to reduce 1e8 to 1e6? 1e8 is too large. |
||
| if (counts.length == 1e8.toInt) { | ||
| logWarning("The maximum limit of 1e8 pairs have been collected, which may not be all of " + | ||
| "the pairs. Please try reducing the amount of distinct items in your columns.") | ||
| } | ||
| // get the distinct values of column 2, so that we can make them the column names | ||
| val distinctCol2 = counts.map(_.get(1)).distinct.zipWithIndex.toMap | ||
| val columnSize = distinctCol2.size | ||
| require(columnSize < 1e4, s"The number of distinct values for $col2, can't " + | ||
| s"exceed 1e4. Currently $columnSize") | ||
| val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) => | ||
| val countsRow = new GenericMutableRow(columnSize + 1) | ||
| rows.foreach { row => | ||
| countsRow.setLong(distinctCol2.get(row.get(1)).get + 1, row.getLong(2)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| // the value of col1 is the first value, the rest are the counts | ||
| countsRow.setString(0, col1Item.toString) | ||
| countsRow | ||
| }.toSeq | ||
| val headerNames = distinctCol2.map(r => StructField(r._1.toString, LongType)).toSeq | ||
| val schema = StructType(StructField(tableName, StringType) +: headerNames) | ||
|
|
||
| new DataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document the first column name.
1e5->1e4.