Skip to content
5 changes: 3 additions & 2 deletions R/pkg/tests/run-all.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ library(SparkR)
options("warn" = 2)

# Setup global test environment
# Install Spark first to set SPARK_HOME
install.spark()

sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R")
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)
sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")
invisible(lapply(sparkRWhitelistSQLDirs,
function(x) { unlink(file.path(sparkRDir, x), recursive = TRUE, force = TRUE)}))

install.spark()

test_package("SparkR")
2 changes: 1 addition & 1 deletion docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,12 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
<tr>
<td><code>/applications/[app-id]/stages</code></td>
<td>A list of all stages for a given application.</td>
<br><code>?status=[active|complete|pending|failed]</code> list only stages in the state.
</tr>
<tr>
<td><code>/applications/[app-id]/stages/[stage-id]</code></td>
<td>
A list of all attempts for the given stage.
<br><code>?status=[active|complete|pending|failed]</code> list only stages in the state.
</td>
</tr>
<tr>
Expand Down
16 changes: 13 additions & 3 deletions examples/src/main/r/ml/glm.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ sparkR.session(appName = "SparkR-ML-glm-example")
# $example on$
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
df_list <- randomSplit(training, c(7, 3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
Expand All @@ -44,8 +44,9 @@ gaussianGLM2 <- glm(label ~ features, gaussianDF, family = "gaussian")
summary(gaussianGLM2)

# Fit a generalized linear model of family "binomial" with spark.glm
training2 <- read.df("data/mllib/sample_binary_classification_data.txt", source = "libsvm")
df_list2 <- randomSplit(training2, c(7,3), 2)
training2 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training2 <- transform(training2, label = cast(training2$label > 1, "integer"))
df_list2 <- randomSplit(training2, c(7, 3), 2)
binomialDF <- df_list2[[1]]
binomialTestDF <- df_list2[[2]]
binomialGLM <- spark.glm(binomialDF, label ~ features, family = "binomial")
Expand All @@ -56,6 +57,15 @@ summary(binomialGLM)
# Prediction
binomialPredictions <- predict(binomialGLM, binomialTestDF)
head(binomialPredictions)

# Fit a generalized linear model of family "tweedie" with spark.glm
training3 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
tweedieDF <- transform(training3, label = training3$label * exp(randn(10)))
tweedieGLM <- spark.glm(tweedieDF, label ~ features, family = "tweedie",
var.power = 1.2, link.power = 0)

# Model summary
summary(tweedieGLM)
# $example off$

sparkR.session.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object Correlation {

/**
* :: Experimental ::
* Compute the correlation matrix for the input RDD of Vectors using the specified method.
* Compute the correlation matrix for the input Dataset of Vectors using the specified method.
* Methods currently supported: `pearson` (default), `spearman`.
*
* @param dataset A dataset or a dataframe
Expand All @@ -56,14 +56,14 @@ object Correlation {
* Here is how to access the correlation coefficient:
* {{{
* val data: Dataset[Vector] = ...
* val Row(coeff: Matrix) = Statistics.corr(data, "value").head
* val Row(coeff: Matrix) = Correlation.corr(data, "value").head
* // coeff now contains the Pearson correlation matrix.
* }}}
*
* @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column
* and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector],
* which is fairly costly. Cache the input RDD before calling corr with `method = "spearman"` to
* avoid recomputing the common lineage.
* which is fairly costly. Cache the input Dataset before calling corr with `method = "spearman"`
* to avoid recomputing the common lineage.
*/
@Since("2.2.0")
def corr(dataset: Dataset[_], column: String, method: String): DataFrame = {
Expand Down
61 changes: 61 additions & 0 deletions python/pyspark/ml/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,67 @@ def test(dataset, featuresCol, labelCol):
return _java2py(sc, javaTestObj.test(*args))


class Correlation(object):
"""
.. note:: Experimental

Compute the correlation matrix for the input dataset of Vectors using the specified method.
Methods currently supported: `pearson` (default), `spearman`.

.. note:: For Spearman, a rank correlation, we need to create an RDD[Double] for each column
and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector],
which is fairly costly. Cache the input Dataset before calling corr with `method = 'spearman'`
to avoid recomputing the common lineage.

:param dataset:
A dataset or a dataframe.
:param column:
The name of the column of vectors for which the correlation coefficient needs
to be computed. This must be a column of the dataset, and it must contain
Vector objects.
:param method:
String specifying the method to use for computing correlation.
Supported: `pearson` (default), `spearman`.
:return:
A dataframe that contains the correlation matrix of the column of vectors. This
dataframe contains a single row and a single column of name
'$METHODNAME($COLUMN)'.

>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.ml.stat import Correlation
>>> dataset = [[Vectors.dense([1, 0, 0, -2])],
... [Vectors.dense([4, 5, 0, 3])],
... [Vectors.dense([6, 7, 0, 8])],
... [Vectors.dense([9, 0, 0, 1])]]
>>> dataset = spark.createDataFrame(dataset, ['features'])
>>> pearsonCorr = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0]
>>> print(str(pearsonCorr).replace('nan', 'NaN'))
DenseMatrix([[ 1. , 0.0556..., NaN, 0.4004...],
[ 0.0556..., 1. , NaN, 0.9135...],
[ NaN, NaN, 1. , NaN],
[ 0.4004..., 0.9135..., NaN, 1. ]])
>>> spearmanCorr = Correlation.corr(dataset, 'features', method='spearman').collect()[0][0]
>>> print(str(spearmanCorr).replace('nan', 'NaN'))
DenseMatrix([[ 1. , 0.1054..., NaN, 0.4 ],
[ 0.1054..., 1. , NaN, 0.9486... ],
[ NaN, NaN, 1. , NaN],
[ 0.4 , 0.9486... , NaN, 1. ]])

.. versionadded:: 2.2.0

"""
@staticmethod
@since("2.2.0")
def corr(dataset, column, method="pearson"):
"""
Compute the correlation matrix with specified method using dataset.
"""
sc = SparkContext._active_spark_context
javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation
args = [_py2java(sc, arg) for arg in (dataset, column, method)]
return _java2py(sc, javaCorrObj.corr(*args))


if __name__ == "__main__":
import doctest
import pyspark.ml.stat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.google.common.base.Objects
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
Expand Down Expand Up @@ -403,14 +403,14 @@ object CatalogTypes {
*/
case class CatalogRelation(
tableMeta: CatalogTable,
dataCols: Seq[Attribute],
partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
dataCols: Seq[AttributeReference],
partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
assert(tableMeta.identifier.database.isDefined)
assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
assert(tableMeta.dataSchema.sameType(dataCols.toStructType))

// The partition column should always appear after data columns.
override def output: Seq[Attribute] = dataCols ++ partitionCols
override def output: Seq[AttributeReference] = dataCols ++ partitionCols

def isPartitioned: Boolean = partitionCols.nonEmpty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ class SQLConf extends Serializable with Logging {
}

// For test only
private[spark] def copy(entries: (ConfigEntry[_], Any)*): SQLConf = {
def copy(entries: (ConfigEntry[_], Any)*): SQLConf = {
val cloned = clone()
entries.foreach {
case (entry, value) => cloned.setConfString(entry.key, value.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
options = table.storage.properties ++ pathOption,
catalogTable = Some(table))

LogicalRelation(
dataSource.resolveRelation(checkFilesExist = false),
catalogTable = Some(table))
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
}
}).asInstanceOf[LogicalRelation]

// It's possible that the table schema is empty and need to be inferred at runtime. We should
// not specify expected outputs for this case.
val expectedOutputs = if (r.output.isEmpty) None else Some(r.output)
plan.copy(expectedOutputAttributes = expectedOutputs)
if (r.output.isEmpty) {
// It's possible that the table schema is empty and need to be inferred at runtime. For this
// case, we don't need to change the output of the cached plan.
plan
} else {
plan.copy(output = r.output)
}
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand Down
Loading