diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala index 917861309c57..37f173bc2046 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala @@ -27,17 +27,7 @@ import org.apache.spark.sql.types._ */ private[spark] class VectorUDT extends UserDefinedType[Vector] { - override def sqlType: StructType = { - // type: 0 = sparse, 1 = dense - // We only use "values" for dense vectors, and "size", "indices", and "values" for sparse - // vectors. The "values" field is nullable because we might want to add binary vectors later, - // which uses "size" and "indices", but not "values". - StructType(Seq( - StructField("type", ByteType, nullable = false), - StructField("size", IntegerType, nullable = true), - StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true), - StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true))) - } + override final def sqlType: StructType = _sqlType override def serialize(obj: Vector): InternalRow = { obj match { @@ -94,4 +84,16 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] { override def typeName: String = "vector" private[spark] override def asNullable: VectorUDT = this + + private[this] val _sqlType = { + // type: 0 = sparse, 1 = dense + // We only use "values" for dense vectors, and "size", "indices", and "values" for sparse + // vectors. The "values" field is nullable because we might want to add binary vectors later, + // which uses "size" and "indices", but not "values". + StructType(Seq( + StructField("type", ByteType, nullable = false), + StructField("size", IntegerType, nullable = true), + StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true), + StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true))) + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala new file mode 100644 index 000000000000..23ee455b9a66 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala @@ -0,0 +1,799 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import breeze.{linalg => la} +import breeze.linalg.{Vector => BV} +import breeze.numerics + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.internal.Logging +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors, VectorUDT} +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, TypedImperativeAggregate} +import org.apache.spark.sql.types._ + + +/** + * A builder object that provides summary statistics about a given column. + * + * Users should not directly create such builders, but instead use one of the methods in + * [[Summarizer]]. + */ +@Since("2.2.0") +abstract class SummaryBuilder { + /** + * Returns an aggregate object that contains the summary of the column with the requested metrics. + * @param column a column that contains Vector object. + * @return an aggregate column that contains the statistics. The exact content of this + * structure is determined during the creation of the builder. + */ + @Since("2.2.0") + def summary(column: Column): Column +} + +/** + * Tools for vectorized statistics on MLlib Vectors. + * + * The methods in this package provide various statistics for Vectors contained inside DataFrames. + * + * This class lets users pick the statistics they would like to extract for a given column. Here is + * an example in Scala: + * {{{ + * val dataframe = ... // Some dataframe containing a feature column + * val allStats = dataframe.select(Summarizer.metrics("min", "max").summary($"features")) + * val Row(min_, max_) = allStats.first() + * }}} + * + * If one wants to get a single metric, shortcuts are also available: + * {{{ + * val meanDF = dataframe.select(Summarizer.mean($"features")) + * val Row(mean_) = meanDF.first() + * }}} + */ +@Since("2.2.0") +object Summarizer extends Logging { + + import SummaryBuilderImpl._ + + /** + * Given a list of metrics, provides a builder that it turns computes metrics from a column. + * + * See the documentation of [[Summarizer]] for an example. + * + * The following metrics are accepted (case sensitive): + * - mean: a vector that contains the coefficient-wise mean. + * - variance: a vector tha contains the coefficient-wise variance. + * - count: the count of all vectors seen. + * - numNonzeros: a vector with the number of non-zeros for each coefficients + * - max: the maximum for each coefficient. + * - min: the minimum for each coefficient. + * - normL2: the Euclidian norm for each coefficient. + * - normL1: the L1 norm of each coefficient (sum of the absolute values). + * @param firstMetric the metric being provided + * @param metrics additional metrics that can be provided. + * @return a builder. + * @throws IllegalArgumentException if one of the metric names is not understood. + */ + @Since("2.2.0") + def metrics(firstMetric: String, metrics: String*): SummaryBuilder = { + val (typedMetrics, computeMetrics) = getRelevantMetrics(Seq(firstMetric) ++ metrics) + new SummaryBuilderImpl(typedMetrics, computeMetrics) + } + + def mean(col: Column): Column = getSingleMetric(col, "mean") + + def variance(col: Column): Column = getSingleMetric(col, "variance") + + def count(col: Column): Column = getSingleMetric(col, "count") + + def numNonZeros(col: Column): Column = getSingleMetric(col, "numNonZeros") + + def max(col: Column): Column = getSingleMetric(col, "max") + + def min(col: Column): Column = getSingleMetric(col, "min") + + def normL1(col: Column): Column = getSingleMetric(col, "normL1") + + def normL2(col: Column): Column = getSingleMetric(col, "normL2") + + private def getSingleMetric(col: Column, metric: String): Column = { + val c1 = metrics(metric).summary(col) + c1.getField(metric).as(s"$metric($col)") + } +} + +private[ml] class SummaryBuilderImpl( + requestedMetrics: Seq[SummaryBuilderImpl.Metrics], + requestedCompMetrics: Seq[SummaryBuilderImpl.ComputeMetrics]) extends SummaryBuilder { + + override def summary(column: Column): Column = { + val start = SummaryBuilderImpl.Buffer.fromMetrics(requestedCompMetrics) + val agg = SummaryBuilderImpl.MetricsAggregate( + requestedMetrics, + start, + column.expr, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + new Column(AggregateExpression(agg, mode = Complete, isDistinct = false)) + } +} + +private[ml] +object SummaryBuilderImpl extends Logging { + + def implementedMetrics: Seq[String] = allMetrics.map(_._1).sorted + + @throws[IllegalArgumentException]("When the list is empty or not a subset of known metrics") + def getRelevantMetrics(requested: Seq[String]): (Seq[Metrics], Seq[ComputeMetrics]) = { + val all = requested.map { req => + val (_, metric, _, deps) = allMetrics.find(tup => tup._1 == req).getOrElse { + throw new IllegalArgumentException(s"Metric $req cannot be found." + + s" Valid metrics are $implementedMetrics") + } + metric -> deps + } + // Do not sort, otherwise the user has to look the schema to see the order that it + // is going to be given in. + val metrics = all.map(_._1) + val computeMetrics = all.flatMap(_._2).distinct.sortBy(_.toString) + metrics -> computeMetrics + } + + def structureForMetrics(metrics: Seq[Metrics]): StructType = { + val dct = allMetrics.map { case (n, m, dt, _) => m -> (n, dt) }.toMap + val fields = metrics.map(dct.apply).map { case (n, dt) => + StructField(n, dt, nullable = false) + } + StructType(fields) + } + + private val arrayDType = ArrayType(DoubleType, containsNull = false) + private val arrayLType = ArrayType(LongType, containsNull = false) + + /** + * All the metrics that can be currently computed by Spark for vectors. + * + * This list associates the user name, the internal (typed) name, and the list of computation + * metrics that need to de computed internally to get the final result. + */ + private val allMetrics: Seq[(String, Metrics, DataType, Seq[ComputeMetrics])] = Seq( + ("mean", Mean, arrayDType, Seq(ComputeMean, ComputeWeightSum)), + ("variance", Variance, arrayDType, Seq(ComputeWeightSum, ComputeMean, ComputeM2n)), + ("count", Count, LongType, Seq()), + ("numNonZeros", NumNonZeros, arrayLType, Seq(ComputeNNZ)), + ("max", Max, arrayDType, Seq(ComputeMax)), + ("min", Min, arrayDType, Seq(ComputeMin)), + ("normL2", NormL2, arrayDType, Seq(ComputeM2)), + ("normL1", NormL1, arrayDType, Seq(ComputeL1)) + ) + + /** + * The metrics that are currently implemented. + */ + sealed trait Metrics + case object Mean extends Metrics + case object Variance extends Metrics + case object Count extends Metrics + case object NumNonZeros extends Metrics + case object Max extends Metrics + case object Min extends Metrics + case object NormL2 extends Metrics + case object NormL1 extends Metrics + + /** + * The running metrics that are going to be computed. + * + * There is a bipartite graph between the metrics and the computed metrics. + */ + sealed trait ComputeMetrics + case object ComputeMean extends ComputeMetrics + case object ComputeM2n extends ComputeMetrics + case object ComputeM2 extends ComputeMetrics + case object ComputeL1 extends ComputeMetrics + case object ComputeWeightSum extends ComputeMetrics + case object ComputeNNZ extends ComputeMetrics + case object ComputeMax extends ComputeMetrics + case object ComputeMin extends ComputeMetrics + + /** + * The buffer that contains all the summary statistics. If the value is null, it is considered + * to be not required. + * + * If it is required but the size of the vectors (n) is not yet know, it is initialized to + * an empty array. + */ + case class Buffer private ( + var n: Int = -1, // 0 + var mean: Array[Double] = null, // 1 + var m2n: Array[Double] = null, // 2 + var m2: Array[Double] = null, // 3 + var l1: Array[Double] = null, // 4 + var totalCount: Long = 0, // 5 + var totalWeightSum: Double = 0.0, // 6 + var totalWeightSquareSum: Double = 0.0, // 7 + var weightSum: Array[Double] = null, // 8 + var nnz: Array[Long] = null, // 9 + var max: Array[Double] = null, // 10 + var min: Array[Double] = null // 11 + ) { + override def toString: String = { + def v(x: Array[Double]) = if (x==null) "null" else x.toSeq.mkString("[", " ", "]") + def vl(x: Array[Long]) = if (x==null) "null" else x.toSeq.mkString("[", " ", "]") + + s"Buffer(n=$n mean=${v(mean)} m2n=${v(m2n)} m2=${v(m2)} l1=${v(l1)}" + + s" totalCount=$totalCount totalWeightSum=$totalWeightSum" + + s" totalWeightSquareSum=$totalWeightSquareSum weightSum=${v(weightSum)} nnz=${vl(nnz)}" + + s" max=${v(max)} min=${v(min)})" + } + } + + object Buffer extends Logging { + // Recursive function, but the number of cases is really small. + def fromMetrics(requested: Seq[ComputeMetrics]): Buffer = { + if (requested.isEmpty) { + new Buffer() + } else { + val b = fromMetrics(requested.tail) + requested.head match { + case ComputeMean => b.copy(mean = Array.empty) + case ComputeM2n => b.copy(m2n = Array.empty) + case ComputeM2 => b.copy(m2 = Array.empty) + case ComputeL1 => b.copy(l1 = Array.empty) + case ComputeWeightSum => b.copy(weightSum = Array.empty) + case ComputeNNZ => b.copy(nnz = Array.empty) + case ComputeMax => b.copy(max = Array.empty) + case ComputeMin => b.copy(min = Array.empty) + case _ => b // These cases are already being computed + } + } + } + + /** + * (testing only). Makes a buffer with all the metrics enabled. + */ + def allMetrics(): Buffer = { + fromMetrics(Seq(ComputeMean, ComputeM2n, ComputeM2, ComputeL1, + ComputeWeightSum, ComputeNNZ, ComputeMax, + ComputeMin)) + } + + val bufferSchema: StructType = { + val fields = Seq( + "n" -> IntegerType, + "mean" -> arrayDType, + "m2n" -> arrayDType, + "m2" -> arrayDType, + "l1" -> arrayDType, + "totalCount" -> LongType, + "totalWeightSum" -> DoubleType, + "totalWeightSquareSum" -> DoubleType, + "weightSum" -> arrayDType, + "nnz" -> arrayLType, + "max" -> arrayDType, + "min" -> arrayDType + ) + StructType(fields.map { case (name, t) => StructField(name, t, nullable = true)}) + } + + val numFields = bufferSchema.fields.length + + def updateInPlace(buffer: Buffer, v: Vector, w: Double): Unit = { + val startN = buffer.n + if (startN == -1) { + // The buffer was not initialized, we initialize it with the incoming row. + fillBufferWithRow(buffer, v, w) + return + } else { + require(startN == v.size, + s"Trying to insert a vector of size $v into a buffer that " + + s"has been sized with $startN") + } + val n = buffer.n + assert(n > 0, n) + // Always update the following fields. + buffer.totalWeightSum += w + buffer.totalCount += 1 + buffer.totalWeightSquareSum += w * w + + v match { + case dv: DenseVector => updateInPlaceDense(buffer, dv, w) + case sv: SparseVector => updateInPlaceSparse(buffer, sv, w) + } + } + + /** + * Updates 'buffer' with the content of 'other', and returns 'buffer'. + */ + @throws[SparkException]("When the buffers are not compatible") + def mergeBuffers(buffer: Buffer, other: Buffer): Buffer = { + if (buffer.n == -1) { + // buffer is not initialized. + if (other.n == -1) { + // Both are not initialized. + buffer + } else { + // other is initialized + other + } + } else { + // Buffer is initialized. + if (other.n == -1) { + buffer + } else { + mergeInitializedBuffers(buffer, other) + buffer + } + } + } + + /** + * Reads a buffer from a serialized form, using the row object as an assistant. + */ + def read(bytes: Array[Byte], backingRow: UnsafeRow): Buffer = { + backingRow.pointTo(bytes.clone(), bytes.length) + val row = backingRow.getStruct(0, numFields) + new Buffer( + n = row.getInt(0), + mean = nullableArrayD(row, 1), + m2n = nullableArrayD(row, 2), + m2 = nullableArrayD(row, 3), + l1 = nullableArrayD(row, 4), + totalCount = row.getLong(5), + totalWeightSum = row.getDouble(6), + totalWeightSquareSum = row.getDouble(7), + weightSum = nullableArrayD(row, 8), + nnz = nullableArrayL(row, 9), + max = nullableArrayD(row, 10), + min = nullableArrayD(row, 11) + ) + } + + + def write(buffer: Buffer, project: UnsafeProjection): Array[Byte] = { + val ir = InternalRow.apply( + buffer.n, + gadD(buffer.mean), + gadD(buffer.m2n), + gadD(buffer.m2), + gadD(buffer.l1), + buffer.totalCount, + buffer.totalWeightSum, + buffer.totalWeightSquareSum, + gadD(buffer.weightSum), + gadL(buffer.nnz), + gadD(buffer.max), + gadD(buffer.min) + ) + project.apply(ir).getBytes + } + + def mean(buffer: Buffer): Array[Double] = { + require(buffer.totalWeightSum > 0) + require(buffer.mean != null) + require(buffer.weightSum != null) + val res = b(buffer.mean) :* b(buffer.weightSum) :/ buffer.totalWeightSum + res.toArray + } + + def variance(buffer: Buffer): Array[Double] = { + import buffer._ + require(n >= 0, n) + require(totalWeightSum > 0, totalWeightSum) + require(totalWeightSquareSum > 0, totalWeightSquareSum) + require(buffer.mean != null) + require(m2n != null) + require(weightSum != null) + + val denom = totalWeightSum - (totalWeightSquareSum / totalWeightSum) + if (denom > 0.0) { + val normWs = b(weightSum) :/ totalWeightSum + val x = b(buffer.mean) :* b(buffer.mean) :* b(weightSum) :* (- normWs :+ 1.0) + val res = (b(m2n) :+ x) :/ denom + res.toArray + } else { + Array.ofDim(n) // Return 0.0 instead. + } + } + + def totalCount(buffer: Buffer): Long = buffer.totalCount + + def nnz(buffer: Buffer): Array[Long] = { + require(buffer.nnz != null) + buffer.nnz + } + + def max(buffer: Buffer): Array[Double] = { + require(buffer.max != null) + buffer.max + } + + def min(buffer: Buffer): Array[Double] = { + require(buffer.min != null) + buffer.min + } + + def l2(buffer: Buffer): Array[Double] = { + import buffer._ + require(totalWeightSum > 0.0) + require(m2 != null) + numerics.sqrt(b(m2)).toArray + } + + def l1(buffer: Buffer): Array[Double] = { + require(buffer.l1 != null) + buffer.l1 + } + + + private def gadD(arr: Array[Double]): UnsafeArrayData = { + if (arr == null) { + null + } else { + UnsafeArrayData.fromPrimitiveArray(arr) + } + } + + private def gadL(arr: Array[Long]): UnsafeArrayData = { + if (arr == null) { + null + } else { + UnsafeArrayData.fromPrimitiveArray(arr) + } + } + + // Returns the array at a given index, or null if the array is null. + private def nullableArrayD(row: UnsafeRow, ordinal: Int): Array[Double] = { + if (row.isNullAt(ordinal)) { + null + } else { + row.getArray(ordinal).toDoubleArray + } + } + + // Returns the array at a given index, or null if the array is null. + private def nullableArrayL(row: UnsafeRow, ordinal: Int): Array[Long] = { + if (row.isNullAt(ordinal)) { + null + } else { + row.getArray(ordinal).toLongArray + } + } + + private def b(x: Array[Double]): BV[Double] = Vectors.dense(x).asBreeze + + private def bl(x: Array[Long]): BV[Long] = BV.apply(x) + + /** + * Sets the content of a buffer based on a single row (initialization). + * + * The buffer must be uninitialized first. + */ + private def fillBufferWithRow(buffer: Buffer, v: Vector, w: Double): Unit = { + require(buffer.n == -1, (buffer.n, buffer)) + val n = v.size + buffer.n = n + buffer.totalCount = 1L + buffer.totalWeightSum = w + buffer.totalWeightSquareSum = w * w + + val arr = v.toArray + assert(arr.length == n, (arr.toSeq, n)) + if (buffer.mean != null) { + buffer.mean = arr.clone() + } + if (buffer.m2n != null) { + buffer.m2n = Array.ofDim(n) + } + if (buffer.max != null) { + buffer.max = arr.clone() + } + if (buffer.min != null) { + buffer.min = arr.clone() + } + + // The rest of these operations have efficient bulk versions. + v match { + case dv: DenseVector => + if (buffer.m2 != null) { + buffer.m2 = Array.ofDim(n) + b(buffer.m2) := w * (b(arr) :* b(arr)) + } + if (buffer.l1 != null) { + buffer.l1 = Array.ofDim(n) + b(buffer.l1) := numerics.abs(b(arr)) + } + + case sv: SparseVector => + if (buffer.m2 != null) { + buffer.m2 = Array.ofDim(n) + v.foreachActive { (index, value) => + buffer.m2(index) = w * value * value + } + } + + if (buffer.l1 != null) { + buffer.l1 = Array.ofDim(n) + v.foreachActive { (index, value) => + buffer.l1(index) = w * math.abs(value) + } + } + } + + // In the case of the weightSum and NNZ, we also have to account for the value of + // the elements. + // TODO It would be nice to vectorize these operations too. + if (buffer.weightSum != null) { + buffer.weightSum = Array.ofDim(n) + v.foreachActive { (index, value) => + if (value != 0.0) { + buffer.weightSum(index) = w + } + } + } + + if (buffer.nnz != null) { + buffer.nnz = Array.ofDim(n) + v.foreachActive { (index, value) => + if (value != 0.0) { + buffer.nnz(index) = 1L + } + } + } + } + + private def updateInPlaceDense(buffer: Buffer, v: DenseVector, w: Double): Unit = { + val epsi = Double.MinPositiveValue + lazy val value = v.asBreeze + // The mask is zero for all the zero values, and one otherwise. + lazy val mask = numerics.ceil(la.min(numerics.abs(value), epsi)) + lazy val maskWeight = w * mask + + if (buffer.max != null) { + val x = b(buffer.max) + x := la.max(x, value) + } + + if (buffer.min != null) { + val x = b(buffer.min) + x := la.min(x, value) + } + + if (buffer.mean != null) { + assert(buffer.weightSum != null) + val prevMean = b(buffer.mean).copy + val diff = value :- prevMean + // Adding an epsilon to ensure that the denominator is always positive. + // This epsilon is not going to have impact since numerator(i) == 0 => denominator(i) == 0. + val denom = la.max(b(buffer.weightSum) :+ maskWeight, epsi) + b(buffer.mean) :+= (maskWeight :* diff) :/ denom + if (buffer.m2n != null) { + b(buffer.m2n) :+= maskWeight :* ((value :- b(buffer.mean)) :* diff) + } + } + + if (buffer.m2 != null) { + b(buffer.m2) :+= maskWeight :* (value :* value) + } + + if (buffer.l1 != null) { + b(buffer.l1) :+= maskWeight :* numerics.abs(value) + } + + + if (buffer.weightSum != null) { + b(buffer.weightSum) :+= maskWeight + } + + if (buffer.nnz != null) { + bl(buffer.nnz) :+= la.convert(maskWeight, Long) + } + } + + + private def updateInPlaceSparse(buffer: Buffer, v: SparseVector, w: Double): Unit = { + v.foreachActive { (index, value) => + if (value != 0.0) { + if (buffer.max != null && buffer.max(index) < value) { + buffer.max(index) = value + } + if (buffer.min != null && buffer.min(index) > value) { + buffer.min(index) = value + } + + if (buffer.mean != null) { + assert(buffer.weightSum != null) + val prevMean = buffer.mean(index) + val diff = value - prevMean + buffer.mean(index) += w * diff / (buffer.weightSum(index) + w) + if (buffer.m2n != null) { + buffer.m2n(index) += w * (value - buffer.mean(index)) * diff + } + } + if (buffer.m2 != null) { + buffer.m2(index) += w * value * value + } + if (buffer.l1 != null) { + buffer.l1(index) += w * math.abs(value) + } + if (buffer.weightSum != null) { + buffer.weightSum(index) += w + } + if (buffer.nnz != null) { + buffer.nnz(index) += 1 + } + } + } + + } + + /** + * Merges other into buffer. + */ + private def mergeInitializedBuffers(buffer: Buffer, other: Buffer): Unit = { + // Each buffer needs to be properly initialized. + require(buffer.n > 0 && other.n > 0, (buffer.n, other.n)) + require(buffer.n == other.n, (buffer.n, other.n)) + // Mandatory scalar values + buffer.totalWeightSquareSum += other.totalWeightSquareSum + buffer.totalWeightSum += other.totalWeightSum + buffer.totalCount += other.totalCount + // Keep the original weight sums. + val weightSum1 = if (buffer.weightSum == null) null else { buffer.weightSum.clone() } + val weightSum2 = if (other.weightSum == null) null else { other.weightSum.clone() } + + // This sum is going to be used as a denominator. In order to guarantee that the + // division is well-defined, we add an epsilon to the zero coefficients. + // This is not going to change the value of the resul since the numerator will also be zero. + val weightSum: BV[Double] = if (weightSum1 == null) null else { + require(weightSum2 != null, s"buffer=$buffer other=$other") + val x = b(weightSum1) :+ b(weightSum2) + la.max(x, Double.MinPositiveValue) + } + + + // Since the operations are dense, we can directly use BLAS calls here. + val deltaMean: BV[Double] = if (buffer.mean != null) { + require(other.mean != null) + b(other.mean) :- b(buffer.mean) + } else { null } + + if (buffer.mean != null) { + require(other.mean != null) + require(weightSum != null) + b(buffer.mean) :+= deltaMean :* (b(weightSum2) :/ weightSum) + } + + if (buffer.m2n != null) { + require(other.m2n != null) + val w = (b(weightSum1) :* b(weightSum2)) :/ weightSum + val z = (deltaMean :* deltaMean) :* w + b(buffer.m2n) :+= b(other.m2n) :+ z + } + + if (buffer.m2 != null) { + require(other.m2 != null) + b(buffer.m2) :+= b(other.m2) + } + + if (buffer.l1 != null) { + require(other.l1 != null) + b(buffer.l1) :+= b(other.l1) + } + + if (buffer.max != null) { + require(other.max != null) + b(buffer.max) := la.max(b(buffer.max), b(other.max)) + } + + if (buffer.min != null) { + require(other.min != null) + b(buffer.min) := la.min(b(buffer.min), b(other.min)) + } + + if (buffer.nnz != null) { + require(other.nnz != null) + bl(buffer.nnz) :+= bl(other.nnz) + } + + if (buffer.weightSum != null) { + require(other.weightSum != null) + b(buffer.weightSum) :+= b(other.weightSum) + } + } + } + + private case class MetricsAggregate( + requested: Seq[Metrics], + startBuffer: Buffer, + child: Expression, + mutableAggBufferOffset: Int, + inputAggBufferOffset: Int) + extends TypedImperativeAggregate[Buffer] { + + // These objects are not thread-safe, allocate them in the aggregator. + private[this] lazy val row = new UnsafeRow(Buffer.numFields) + private[this] lazy val projection = UnsafeProjection.create(Buffer.bufferSchema) + + + override def eval(buff: Buffer): InternalRow = { + val metrics = requested.map({ + case Mean => UnsafeArrayData.fromPrimitiveArray(Buffer.mean(buff)) + case Variance => UnsafeArrayData.fromPrimitiveArray(Buffer.variance(buff)) + case Count => Buffer.totalCount(buff) + case NumNonZeros => UnsafeArrayData.fromPrimitiveArray(Buffer.nnz(buff)) + case Max => UnsafeArrayData.fromPrimitiveArray(Buffer.max(buff)) + case Min => UnsafeArrayData.fromPrimitiveArray(Buffer.min(buff)) + case NormL2 => UnsafeArrayData.fromPrimitiveArray(Buffer.l2(buff)) + case NormL1 => UnsafeArrayData.fromPrimitiveArray(Buffer.l1(buff)) + }) + InternalRow.apply(metrics: _*) + } + + override def children: Seq[Expression] = child :: Nil + + override def update(buff: Buffer, row: InternalRow): Buffer = { + // Unsafe rows do not play well with UDTs, it seems. + // Directly call the deserializer. + val v = udt.deserialize(row.getStruct(0, udt.sqlType.size)) + + val w = row.numFields match { + case 1 => 1.0 + case 2 => row.getDouble(1) + case x => throw new SparkException(s"Expected 1 or 2 fields, got $x fields.") + } + Buffer.updateInPlace(buff, v, w) + buff + } + + override def merge(buff: Buffer, other: Buffer): Buffer = { + Buffer.mergeBuffers(buff, other) + } + + override def nullable: Boolean = false + + // Make a copy of the start buffer so that the current aggregator can be safely copied around. + override def createAggregationBuffer(): Buffer = startBuffer.copy() + + override def serialize(buff: Buffer): Array[Byte] = { + Buffer.write(buff, projection) + } + + override def deserialize(bytes: Array[Byte]): Buffer = { + Buffer.read(bytes, row) + } + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): MetricsAggregate = { + copy(mutableAggBufferOffset = newMutableAggBufferOffset) + } + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): MetricsAggregate = { + copy(inputAggBufferOffset = newInputAggBufferOffset) + } + + override lazy val dataType: DataType = structureForMetrics(requested) + + override def prettyName: String = "aggregate_metrics" + } + + private[this] val udt = new VectorUDT + +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala new file mode 100644 index 000000000000..b7d0c5635bbc --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.stat.SummaryBuilderImpl.Buffer +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} +import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, Statistics} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema + +class SummarizerSuite extends SparkFunSuite with MLlibTestSparkContext { + + import testImplicits._ + import Summarizer._ + + private case class ExpectedMetrics( + mean: Seq[Double], + variance: Seq[Double], + count: Long, + numNonZeros: Seq[Long], + max: Seq[Double], + min: Seq[Double], + normL2: Seq[Double], + normL1: Seq[Double]) + + // The input is expected to be either a sparse vector, a dense vector or an array of doubles + // (which will be converted to a dense vector) + // The expected is the list of all the known metrics. + // + // The tests take an list of input vectors and a list of all the summary values that + // are expected for this input. They currently test against some fixed subset of the + // metrics, but should be made fuzzy in the future. + + private def testExample(name: String, input: Seq[Any], exp: ExpectedMetrics): Unit = { + def inputVec: Seq[Vector] = input.map { + case x: Array[Double @unchecked] => Vectors.dense(x) + case x: Seq[Double @unchecked] => Vectors.dense(x.toArray) + case x: Vector => x + case x => throw new Exception(x.toString) + } + + val s = { + val s2 = new MultivariateOnlineSummarizer + inputVec.foreach(v => s2.add(OldVectors.fromML(v))) + s2 + } + + // Because the Spark context is reset between tests, we cannot hold a reference onto it. + def wrapped() = { + val df = sc.parallelize(inputVec).map(Tuple1.apply).toDF("features") + val c = df.col("features") + (df, c) + } + + registerTest(s"$name - mean only") { + val (df, c) = wrapped() + compare(df.select(metrics("mean").summary(c), mean(c)), Seq(Row(exp.mean), s.mean)) + } + + registerTest(s"$name - mean only (direct)") { + val (df, c) = wrapped() + compare(df.select(mean(c)), Seq(exp.mean)) + } + + registerTest(s"$name - variance only") { + val (df, c) = wrapped() + compare(df.select(metrics("variance").summary(c), variance(c)), + Seq(Row(exp.variance), s.variance)) + } + + registerTest(s"$name - variance only (direct)") { + val (df, c) = wrapped() + compare(df.select(variance(c)), Seq(s.variance)) + } + + registerTest(s"$name - count only") { + val (df, c) = wrapped() + compare(df.select(metrics("count").summary(c), count(c)), + Seq(Row(exp.count), exp.count)) + } + + registerTest(s"$name - count only (direct)") { + val (df, c) = wrapped() + compare(df.select(count(c)), + Seq(exp.count)) + } + + registerTest(s"$name - numNonZeros only") { + val (df, c) = wrapped() + compare(df.select(metrics("numNonZeros").summary(c), numNonZeros(c)), + Seq(Row(exp.numNonZeros), exp.numNonZeros)) + } + + registerTest(s"$name - numNonZeros only (direct)") { + val (df, c) = wrapped() + compare(df.select(numNonZeros(c)), + Seq(exp.numNonZeros)) + } + + registerTest(s"$name - min only") { + val (df, c) = wrapped() + compare(df.select(metrics("min").summary(c), min(c)), + Seq(Row(exp.min), exp.min)) + } + + registerTest(s"$name - max only") { + val (df, c) = wrapped() + compare(df.select(metrics("max").summary(c), max(c)), + Seq(Row(exp.max), exp.max)) + } + + registerTest(s"$name - normL1 only") { + val (df, c) = wrapped() + compare(df.select(metrics("normL1").summary(c), normL1(c)), + Seq(Row(exp.normL1), exp.normL1)) + } + + registerTest(s"$name - normL2 only") { + val (df, c) = wrapped() + compare(df.select(metrics("normL2").summary(c), normL2(c)), + Seq(Row(exp.normL2), exp.normL2)) + } + + registerTest(s"$name - all metrics at once") { + val (df, c) = wrapped() + compare(df.select( + metrics("mean", "variance", "count", "numNonZeros").summary(c), + mean(c), variance(c), count(c), numNonZeros(c)), + Seq(Row(exp.mean, exp.variance, exp.count, exp.numNonZeros), + exp.mean, exp.variance, exp.count, exp.numNonZeros)) + } + } + + private def denseData(input: Seq[Seq[Double]]): DataFrame = { + val data = input.map(_.toArray).map(Vectors.dense).map(Tuple1.apply) + sc.parallelize(data).toDF("features") + } + + private def compare(df: DataFrame, exp: Seq[Any]): Unit = { + val coll = df.collect().toSeq + val Seq(row) = coll + val res = row.toSeq + val names = df.schema.fieldNames.zipWithIndex.map { case (n, idx) => s"$n ($idx)" } + assert(res.size === exp.size, (res.size, exp.size)) + for (((x1, x2), name) <- res.zip(exp).zip(names)) { + compareStructures(x1, x2, name) + } + } + + // Compares structured content. + private def compareStructures(x1: Any, x2: Any, name: String): Unit = (x1, x2) match { + case (y1: Seq[Double @unchecked], v1: OldVector) => + compareStructures(y1, v1.toArray.toSeq, name) + case (d1: Double, d2: Double) => + assert2(Vectors.dense(d1) ~== Vectors.dense(d2) absTol 1e-4, name) + case (r1: GenericRowWithSchema, r2: Row) => + assert(r1.size === r2.size, (r1, r2)) + for (((fname, x1), x2) <- r1.schema.fieldNames.zip(r1.toSeq).zip(r2.toSeq)) { + compareStructures(x1, x2, s"$name.$fname") + } + case (r1: Row, r2: Row) => + assert(r1.size === r2.size, (r1, r2)) + for ((x1, x2) <- r1.toSeq.zip(r2.toSeq)) { compareStructures(x1, x2, name) } + case (v1: Vector, v2: Vector) => + assert2(v1 ~== v2 absTol 1e-4, name) + case (l1: Long, l2: Long) => assert(l1 === l2) + case (s1: Seq[_], s2: Seq[_]) => + assert(s1.size === s2.size, s"$name ${(s1, s2)}") + for (((x1, idx), x2) <- s1.zipWithIndex.zip(s2)) { + compareStructures(x1, x2, s"$name.$idx") + } + case (arr1: Array[_], arr2: Array[_]) => + assert(arr1.toSeq === arr2.toSeq) + case _ => throw new Exception(s"$name: ${x1.getClass} ${x2.getClass} $x1 $x2") + } + + private def assert2(x: => Boolean, hint: String): Unit = { + try { + assert(x, hint) + } catch { + case tfe: TestFailedException => + throw new TestFailedException(Some(s"Failure with hint $hint"), Some(tfe), 1) + } + } + + private def makeBuffer(vecs: Seq[Vector]): Buffer = { + val b = Buffer.allMetrics() + for (v <- vecs) { Buffer.updateInPlace(b, v, 1.0) } + b + } + + private def b(x: Array[Double]): Vector = Vectors.dense(x) + + private def l(x: Array[Long]): Vector = b(x.map(_.toDouble)) + + test("debugging test") { + val df = denseData(Nil) + val c = df.col("features") + val c1 = metrics("mean").summary(c) + val res = df.select(c1) + intercept[SparkException] { + compare(res, Seq.empty) + } + } + + test("basic error handling") { + val df = denseData(Nil) + val c = df.col("features") + val res = df.select(metrics("mean").summary(c), mean(c)) + intercept[SparkException] { + compare(res, Seq.empty) + } + } + + test("no element, working metrics") { + val df = denseData(Nil) + val c = df.col("features") + val res = df.select(metrics("count").summary(c), count(c)) + compare(res, Seq(Row(0L), 0L)) + } + + { + val x = Seq(0.0, 1.0, 2.0) + testExample("single element", Seq(x), ExpectedMetrics( + mean = x, + variance = Seq(0.0, 0.0, 0.0), + count = 1, + numNonZeros = Seq(0, 1, 1), + max = x, + min = x, + normL1 = x, + normL2 = x + )) + } + + testExample("two elements", Seq(Seq(0.0, 1.0, 2.0), Seq(0.0, -1.0, -2.0)), ExpectedMetrics( + mean = Seq(0.0, 0.0, 0.0), + // TODO: I have a doubt about these values, they are not normalized. + variance = Seq(0.0, 2.0, 8.0), + count = 2, + numNonZeros = Seq(0, 2, 2), + max = Seq(0.0, 1.0, 2.0), + min = Seq(0.0, -1.0, -2.0), + normL1 = Seq(0.0, 2.0, 4.0), + normL2 = Seq(0.0, math.sqrt(2.0), math.sqrt(2.0) * 2.0) + )) + + testExample("dense vector input", + Seq(Seq(-1.0, 0.0, 6.0), Seq(3.0, -3.0, 0.0)), + ExpectedMetrics( + mean = Seq(1.0, -1.5, 3.0), + variance = Seq(8.0, 4.5, 18.0), + count = 2, + numNonZeros = Seq(2, 1, 1), + max = Seq(3.0, 0.0, 6.0), + min = Seq(-1.0, -3, 0.0), + normL1 = Seq(4.0, 3.0, 6.0), + normL2 = Seq(math.sqrt(10), 3, 6.0) + )) + + test("mixing dense and sparse vector input") { + val summarizer = makeBuffer(Seq( + Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))), + Vectors.dense(0.0, -1.0, -3.0), + Vectors.sparse(3, Seq((1, -5.1))), + Vectors.dense(3.8, 0.0, 1.9), + Vectors.dense(1.7, -0.6, 0.0), + Vectors.sparse(3, Seq((1, 1.9), (2, 0.0))))) + + assert(b(Buffer.mean(summarizer)) ~== + Vectors.dense(0.583333333333, -0.416666666666, -0.183333333333) absTol 1E-5, "mean mismatch") + + assert(b(Buffer.min(summarizer)) ~== Vectors.dense(-2.0, -5.1, -3) absTol 1E-5, "min " + + "mismatch") + + assert(b(Buffer.max(summarizer)) ~== Vectors.dense(3.8, 2.3, 1.9) absTol 1E-5, "max mismatch") + + assert(l(Buffer.nnz(summarizer)) ~== Vectors.dense(3, 5, 2) absTol 1E-5, "numNonzeros mismatch") + + assert(b(Buffer.variance(summarizer)) ~== + Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5, + "variance mismatch") + + assert(Buffer.totalCount(summarizer) === 6) + } + + + test("merging two summarizers") { + val summarizer1 = makeBuffer(Seq( + Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))), + Vectors.dense(0.0, -1.0, -3.0))) + + val summarizer2 = makeBuffer(Seq( + Vectors.sparse(3, Seq((1, -5.1))), + Vectors.dense(3.8, 0.0, 1.9), + Vectors.dense(1.7, -0.6, 0.0), + Vectors.sparse(3, Seq((1, 1.9), (2, 0.0))))) + + val summarizer = Buffer.mergeBuffers(summarizer1, summarizer2) + + assert(b(Buffer.mean(summarizer)) ~== + Vectors.dense(0.583333333333, -0.416666666666, -0.183333333333) absTol 1E-5, "mean mismatch") + + assert(b(Buffer.min(summarizer)) ~== Vectors.dense(-2.0, -5.1, -3) absTol 1E-5, "min mismatch") + + assert(b(Buffer.max(summarizer)) ~== Vectors.dense(3.8, 2.3, 1.9) absTol 1E-5, "max mismatch") + + assert(l(Buffer.nnz(summarizer)) ~== Vectors.dense(3, 5, 2) absTol 1E-5, "numNonzeros mismatch") + + assert(b(Buffer.variance(summarizer)) ~== + Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5, + "variance mismatch") + + assert(Buffer.totalCount(summarizer) === 6) + } + + // TODO: this test should not be committed. It is here to isolate some performance hotspots. + test("perf test") { + val n = 10000000 + val rdd1 = sc.parallelize(1 to n).map { idx => + OldVectors.dense(idx.toDouble) + } + val trieouts = 100 + rdd1.cache() + rdd1.count() + val rdd2 = sc.parallelize(1 to n).map { idx => + Vectors.dense(idx.toDouble) + } + rdd2.cache() + rdd2.count() + val df = rdd2.map(Tuple1.apply).toDF("features") + df.cache() + df.count() + val x = df.select( + metrics("mean", "variance", "count", "numNonZeros", "max", "min", "normL1", + "normL2").summary($"features")) + val x1 = df.select(metrics("variance").summary($"features")) + + def print(name: String, l: List[Long]): Unit = { + def f(z: Long) = (1e6 * n.toDouble) / z + val min = f(l.max) + val max = f(l.min) + val med = f(l.sorted.drop(l.size / 2).head) + + // scalastyle:off println + println(s"$name = [$min ~ $med ~ $max] records / milli") + } + + + var times_df: List[Long] = Nil + for (_ <- 1 to trieouts) { + System.gc() + val t21 = System.nanoTime() + x.head() + val t22 = System.nanoTime() + val dt = t22 - t21 + times_df ::= dt + // scalastyle:off + print("Dataframes", times_df) + // scalastyle:on + } + + var times_rdd: List[Long] = Nil +// for (_ <- 1 to trieouts) { +// val t21 = System.nanoTime() +// Statistics.colStats(rdd1) +// val t22 = System.nanoTime() +// times_rdd ::= (t22 - t21) +// } + + var times_df_variance: List[Long] = Nil +// for (_ <- 1 to trieouts) { +// val t21 = System.nanoTime() +// x1.head() +// val t22 = System.nanoTime() +// times_df_variance ::= (t22 - t21) +// } + + + print("Dataframes", times_df) + print("RDD", times_rdd) + print("Dataframes (variance only)", times_df_variance) + } + +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 7c57025f995d..64b94f0a2c10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -101,6 +101,8 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu /** * A projection that returns UnsafeRow. + * + * CAUTION: the returned projection object should *not* be assumed to be thread-safe. */ abstract class UnsafeProjection extends Projection { override def apply(row: InternalRow): UnsafeRow @@ -110,11 +112,15 @@ object UnsafeProjection { /** * Returns an UnsafeProjection for given StructType. + * + * CAUTION: the returned projection object is *not* thread-safe. */ def create(schema: StructType): UnsafeProjection = create(schema.fields.map(_.dataType)) /** * Returns an UnsafeProjection for given Array of DataTypes. + * + * CAUTION: the returned projection object is *not* thread-safe. */ def create(fields: Array[DataType]): UnsafeProjection = { create(fields.zipWithIndex.map(x => BoundReference(x._2, x._1, true))) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 80c25d0b0fb7..3c4c9690f378 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -495,6 +495,12 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate { * Generates the final aggregation result value for current key group with the aggregation buffer * object. * + * Developer note: the only return types accepted by Spark are: + * - primitive types + * - InternalRow and subclasses + * - ArrayData + * - MapData + * * @param buffer aggregation buffer object. * @return The aggregation result of current key group */