Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,15 @@ class LinearSVC @Since("2.2.0") (
val instances = extractInstances(dataset)
.setName("training instances")

val (summarizer, labelSummarizer) = if ($(blockSize) == 1) {
if (dataset.storageLevel == StorageLevel.NONE) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}
Summarizer.getClassificationSummarizers(instances, $(aggregationDepth))
} else {
// instances will be standardized and converted to blocks, so no need to cache instances.
Summarizer.getClassificationSummarizers(instances, $(aggregationDepth),
Seq("mean", "std", "count", "numNonZeros"))
if (dataset.storageLevel == StorageLevel.NONE && $(blockSize) == 1) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}

var requestedMetrics = Seq("mean", "std", "count")
if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros"
val (summarizer, labelSummarizer) = Summarizer
.getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics)

val histogram = labelSummarizer.histogram
val numInvalid = labelSummarizer.countInvalid
val numFeatures = summarizer.mean.size
Expand Down Expand Up @@ -316,7 +314,7 @@ class LinearSVC @Since("2.2.0") (
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")
.setName(s"training blocks (blockSize=${$(blockSize)})")

val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,17 +517,18 @@ class LogisticRegression @Since("1.2.0") (
probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol,
fitIntercept, blockSize)

val instances = extractInstances(dataset).setName("training instances")
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
val instances = extractInstances(dataset)
.setName("training instances")

val (summarizer, labelSummarizer) = if ($(blockSize) == 1) {
Summarizer.getClassificationSummarizers(instances, $(aggregationDepth))
} else {
// instances will be standardized and converted to blocks, so no need to cache instances.
Summarizer.getClassificationSummarizers(instances, $(aggregationDepth),
Seq("mean", "std", "count", "numNonZeros"))
if (handlePersistence && $(blockSize) == 1) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}

var requestedMetrics = Seq("mean", "std", "count")
if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros"
val (summarizer, labelSummarizer) = Summarizer
.getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics)

val numFeatures = summarizer.mean.size
val histogram = labelSummarizer.histogram
val numInvalid = labelSummarizer.countInvalid
Expand Down Expand Up @@ -591,7 +592,7 @@ class LogisticRegression @Since("1.2.0") (
} else {
Vectors.dense(if (numClasses == 2) Double.PositiveInfinity else Double.NegativeInfinity)
}
if (handlePersistence) instances.unpersist()
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
return createModel(dataset, numClasses, coefMatrix, interceptVec, Array.empty)
}

Expand Down Expand Up @@ -650,7 +651,7 @@ class LogisticRegression @Since("1.2.0") (
trainOnBlocks(instances, featuresStd, numClasses, initialCoefWithInterceptMatrix,
regularization, optimizer)
}
if (handlePersistence) instances.unpersist()
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()

if (allCoefficients == null) {
val msg = s"${optimizer.getClass.getName} failed."
Expand Down Expand Up @@ -1002,7 +1003,7 @@ class LogisticRegression @Since("1.2.0") (
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")
.setName(s"training blocks (blockSize=${$(blockSize)})")

val getAggregatorFunc = new BlockLogisticAggregator(numFeatures, numClasses, $(fitIntercept),
checkMultinomial(numClasses))(_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,102 @@ private[ml] class AFTAggregator(
}
gradientSumArray(dim - 2) += { if (fitIntercept) multiplier else 0.0 }
gradientSumArray(dim - 1) += delta + multiplier * sigma * epsilon

weightSum += 1.0

this
}
}


/**
* BlockAFTAggregator computes the gradient and loss as used in AFT survival regression
* for blocks in sparse or dense matrix in an online fashion.
*
* Two BlockAFTAggregators can be merged together to have a summary of loss and gradient of
* the corresponding joint dataset.
*
* NOTE: The feature values are expected to be standardized before computation.
*
* @param bcCoefficients The coefficients corresponding to the features.
* @param fitIntercept Whether to fit an intercept term.
*/
private[ml] class BlockAFTAggregator(
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[(Matrix, Array[Double], Array[Double]),
BlockAFTAggregator] {

protected override val dim: Int = bcCoefficients.value.size
private val numFeatures = dim - 2

@transient private lazy val coefficientsArray = bcCoefficients.value match {
case DenseVector(values) => values
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
s" but got type ${bcCoefficients.value.getClass}.")
}

@transient private lazy val linear = Vectors.dense(coefficientsArray.take(numFeatures))

/**
* Add a new training instance block to this BlockAFTAggregator, and update the loss and
* gradient of the objective function.
*
* @return This BlockAFTAggregator object.
*/
def add(block: (Matrix, Array[Double], Array[Double])): this.type = {
val (matrix, labels, censors) = block
require(matrix.isTransposed)
require(numFeatures == matrix.numCols, s"Dimensions mismatch when adding new " +
s"instance. Expecting $numFeatures but got ${matrix.numCols}.")
require(labels.forall(_ > 0.0), "The lifetime or label should be greater than 0.")

val size = matrix.numRows
require(labels.length == size && censors.length == size)

val intercept = coefficientsArray(dim - 2)
// sigma is the scale parameter of the AFT model
val sigma = math.exp(coefficientsArray(dim - 1))

// vec here represents margins
val vec = if (fitIntercept) {
Vectors.dense(Array.fill(size)(intercept)).toDense
} else {
Vectors.zeros(size).toDense
}
BLAS.gemv(1.0, matrix, linear, 1.0, vec)

// in-place convert margins to gradient scales
// then, vec represents gradient scales
var i = 0
var sigmaGradSum = 0.0
while (i < size) {
val ti = labels(i)
val delta = censors(i)
val margin = vec(i)
val epsilon = (math.log(ti) - margin) / sigma
val expEpsilon = math.exp(epsilon)
lossSum += delta * math.log(sigma) - delta * epsilon + expEpsilon
val multiplier = (delta - expEpsilon) / sigma
vec.values(i) = multiplier
sigmaGradSum += delta + multiplier * sigma * epsilon
i += 1
}

matrix match {
case dm: DenseMatrix =>
BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
vec.values, 1, 1.0, gradientSumArray, 1)

case sm: SparseMatrix =>
val linearGradSumVec = Vectors.zeros(numFeatures).toDense
BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
gradientSumArray, 1)
}

if (fitIntercept) gradientSumArray(dim - 2) += vec.values.sum
gradientSumArray(dim - 1) += sigmaGradSum
weightSum += size

this
}
}
Loading