-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-10780][ML] Support initial model for KMeans. #17117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #73682 has finished for PR 17117 at commit
|
| override protected def saveImpl(path: String): Unit = { | ||
| DefaultParamsWriter.saveInitialModel(instance, path) | ||
| DefaultParamsWriter.saveMetadata(instance, path, sc) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to move saveInitialModel into saveMetadata and making it more succinct. We can do this for MLWriter, but it's hard for MLReader[T]. Since we need to explicitly pass the type of initialModel as well, so we need refactor MLReader[T] to MLReader[T, M]. However, I think lots of estimators/transformers will not use initialModel, so the extra type [M] does not make sense.
|
Test build #73686 has finished for PR 17117 at commit
|
|
cc @dbtsai |
sethah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this over @yanboliang! Made a first pass.
| override def transform(dataset: Dataset[_]): DataFrame = { | ||
| transformSchema(dataset.schema, logging = true) | ||
| val predictUDF = udf((vector: Vector) => predict(vector)) | ||
| val tmpParent: MLlibKMeansModel = parentModel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change it to localParent? That's the convention we have taken elsewhere when we want to get a separate pointer to a class member.
| * @group param | ||
| */ | ||
| @Since("2.2.0") | ||
| final val initialModel: Param[KMeansModel] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer doing this in the same way that ALS does it. By having separate param traits KMeansParams extends KMeansModelParams with HasInitialModel. It's more explicit since now our KMeans class would have extra params on top of KMeansParams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, I refactored them as ALSParams and ALSModelParams.
|
|
||
| @Since("1.5.0") | ||
| override def transformSchema(schema: StructType): StructType = { | ||
| if ($(initMode) == MLlibKMeans.K_MEANS_INITIAL_MODEL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nice to factor this logic out into a method like assertInitialModelValid or something similar. Actually, we could add an abstract method to the HasInitialModel trait that each subclass can implement differently.
| val instance = new KMeans(metadata.uid) | ||
|
|
||
| DefaultParamsReader.getAndSetParams(instance, metadata) | ||
| DefaultParamsReader.loadInitialModel[KMeansModel](path, sc) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done as:
DefaultParamsReader.loadInitialModel[KMeansModel](path, sc).foreach(instance.setInitialModel)I think it's nicer, but I'm not sure if there is a universal preference for side effects with options in Spark, so I'll leave it to you to decide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, your suggestion can work well, but I'm more prefer to my way, since it's more clear for developer to understand what happened.
| @Since("0.8.0") | ||
| val K_MEANS_PARALLEL = "k-means||" | ||
| @Since("2.2.0") | ||
| val K_MEANS_INITIAL_MODEL = "initialModel" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be private I think. That, or we should update the valid options for the setInitializationMode doc. But I think it's best to make it private.
| super.beforeAll() | ||
|
|
||
| dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k) | ||
| rData = GaussianMixtureSuite.rData.map(GaussianMixtureSuite.FeatureData).toDF() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GaussianMixtureSuite.rData.map(Tuple1.apply).toDF() ? Mapping the dummy case class from another test suite is less clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
| val kmeans = new KMeans().setK(k).setSeed(1).setMaxIter(1) | ||
|
|
||
| // Sets initMode with 'initialModel', but does not specify initial model. | ||
| intercept[IllegalArgumentException] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I agree with the behavior. We discussed it quite a bit in the other PR - maybe you can summarize the reason you went away from the previous decisions? At any rate, it seems currently we have the following behavior:
| k | initMode | initialModel | result |
|---|---|---|---|
| ? | not set | set | ignore InitialModel |
| ? | set | not set | error |
| set (k != initialModelK) | set | set | error |
| set (k == initialModelK) | set | set | use initialModel |
If we keep this behavior, we should add a test for the first case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree the way in the other PR, the reason is:
In that PR, if users setInitialModel(model), it will call set(initMode, "initialModel"). Take the following scenarios:
val kmeans = new KMeans().setInitialModel(initialModel) // Users want to start with an initial model.
val model1 = kmeans.fit(dataset) // The model was fitted by warm start.
// Then they want to try another starting way, for example, starting with "k-means||".
val model2 = kmeans.setInitMode("k-means||") // But in #11119 's code route, it will still start with initial model, since the "initialModel" still exists. Though we can correct this by modify the code in mllib.clustering.KMeans, but I still think it's confused.
Another scenario is users set initialModel by mistake, but they still want to start with random mode, they will confused what happened. Why I choose start with random mode, but you give me a model by warm start.
So I'm more prefer to let users set initMode to initialModel explicitly, and set initialModel to corresponding model. Otherwise, we just throw exceptions to let users correct their setting. I'm OK to add a test for the first case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My 2cents is the latter configuration should be able to overwrite the former settings and related settings with warning messages.
In your example, when kmeans.setInitMode("k-means||") is performed, the first setInitialModel should be ignored with warning message.
Even we do setK(k =3), and later we do .setInitialModel(initialModel), we should ignore the first setK(k =3) with warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think the general idea laid out in the previous PR is preferable. As you and DB say, if you'd like to make the second .setInitMode overwrite the initial model then that is fine. With that change, then the behavior I would prefer is:
test("params") {
val initialK = 3
val initialEstimator = new KMeans()
.setK(initialK)
val initialModel = initialEstimator.fit(dataset)
val km = new KMeans()
.setK(initialK + 1)
.setInitMode(MLlibKMeans.RANDOM)
assert(km.getK === initialK + 1)
assert(km.getInitMode === MLlibKMeans.RANDOM)
km.setInitialModel(initialModel)
// initialModel sets k and init mode
assert(km.getInitMode === MLlibKMeans.K_MEANS_INITIAL_MODEL)
assert(km.getK === initialK)
assert(km.getInitialModel.getK === initialK)
// setting k is ignored
km.setK(initialK + 1)
assert(km.getK === initialK)
// this should work since we already set initialModel
km.setInitMode(MLlibKMeans.K_MEANS_INITIAL_MODEL)
// changing initMode clears the initial model
km.setInitMode(MLlibKMeans.RANDOM)
assert(km.getInitMode === MLlibKMeans.RANDOM)
assert(!km.isSet(km.initialModel))
// k is retained from initial model
assert(km.getK === initialK)
// now k can be set
km.setK(initialK + 1)
assert(km.getK === initialK + 1)
// kmeans should throw an error since we shouldn't be allowed to set init mode to "initialModel"
intercept[IllegalArgumentException] {
km.setInitMode(MLlibKMeans.K_MEANS_INITIAL_MODEL)
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can not override param in any set*** function, see reason at here. This is why I didn't follow the idea of previous PR. If we prefer the previous way, we must to handle override in fit function, I'll update following this idea tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sethah +1 on the behavior you purpose. The only thing I would like to add on is setK should throw IllegalArgumentException.
// initialModel sets k and init mode
assert(km.getInitMode === MLlibKMeans.K_MEANS_INITIAL_MODEL)
assert(km.getK === initialK)
assert(km.getInitialModel.getK === initialK)
// setting k will throw exception.
intercept[IllegalArgumentException] {
km.setK(initialK + 1)
}| val param = estimator.getParam(p) | ||
| assert(estimator.get(param).get === estimator2.get(param).get) | ||
| if (param.name == "initialModel") { | ||
| // Estimator's `initialModel` has same type as the model produced by this estimator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an assumption, and is not enforced by the compiler. There is nothing in the trait HasInitialModel[T <: Model[T]]that prevents us from creating an estimator with an initialModel type that is not the same type of the model that the estimator produces. We can discuss whether or not we'd like to enforce this assumption, but if we do not then this method should probably be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's merged #17151 firstly, then I will update this accordingly.
| "maxIter" -> 2, | ||
| "tol" -> 0.01 | ||
| "tol" -> 0.01, | ||
| "initialModel" -> generateRandomKMeansModel(3, 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nicer to change testEstimatorAndModelReadWrite to accept estimatorTestParams and modelTestParams separately so we don't have to hard code certain params to be filtered out inside that method. Though we wouldn't have to that in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I sent #17151 and feel free to comment it.
|
Test build #73850 has finished for PR 17117 at commit
|
|
Test build #73852 has finished for PR 17117 at commit
|
| */ | ||
| private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFeaturesCol | ||
| private[clustering] trait KMeansModelParams extends Params with HasMaxIter with HasFeaturesCol | ||
| with HasSeed with HasPredictionCol with HasTol { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, KMeansModel mixes KMeansModelParams, does it mean in the model level, we can not get the information of the initiModel? Also, in the model, why do we need to mix the seed in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we decided in the previous discussion to not store the initial model in the produced model, for several reasons, including model serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough.
|
|
||
| /** @group setParam */ | ||
| @Since("2.2.0") | ||
| def setInitialModel(value: KMeansModel): this.type = set(initialModel, value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
def setInitialModel(value: KMeansModel): this.type = {
if (getK ~= value.getK) {
log the warning
set(k, value)
}
set(initMode, MLlibKMeans.K_MEANS_INITIAL_MODEL) // We may log, but I don't really care for this one.
set(initialModel, value)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can not override param in any set*** function, since ML pipeline API supports other param setting method like:
def fit(dataset: Dataset[_], paramMap: ParamMap): M = {
copy(paramMap).fit(dataset)
}
Users should get the same model regardless of the way to set param. I think the only way to override param is in the start of fit function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate this? I don't fully understand why we can not overwrite setting in set method. Thanks.
|
|
||
| @Since("1.5.0") | ||
| override def transformSchema(schema: StructType): StructType = { | ||
| assertInitialModelValid() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this is not checked in fit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transformSchema will be called in the fit method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transformSchema is called in transform method, and model.transform is called in computing the summary. I think we should fail it earlier instead of checking it in the end. Also, it's implicit that it's being checked when computing summary. We should explicitly check it.
If we have small logic in checking, I'll have those checking code in fit method.
| if ($(initMode) == MLlibKMeans.K_MEANS_INITIAL_MODEL) { | ||
| if (isSet(initialModel)) { | ||
| val initialModelK = $(initialModel).parentModel.k | ||
| if (initialModelK != $(k)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this check is needed if we overwrite k when initialModel is set.
| "'initialModel' as the initialization algorithm.") | ||
| } | ||
| } else { | ||
| if (isSet(initialModel)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this is not needed if we do the overwriting work in setInitialModel.
| /** | ||
| * Check validity for interactions between parameters. | ||
| */ | ||
| private def assertInitialModelValid(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with overwriting above, the only thing we need to check will be
if ($(initMode) == MLlibKMeans.K_MEANS_INITIAL_MODEL && !isSet(initialModel)) {
throw new IllegalArgumentException("Users must set param initialModel if you choose " +
"'initialModel' as the initialization.")
}we can just have it in the body of fit method.
What changes were proposed in this pull request?
Support initial model for
KMeans.KMeans(a.k.a the estimator) extends fromHasInitialModel, theKMeansModeldoes not. SoKMeansModeldoes not have paraminitialModel. Spark ML allow estimators and models don’t share all params (such asALSandALSModel). The is because we don't like to make model too big, it should be shipped more easy.initialModelfor paraminitMode. If users would like to start with an initial model, they should setinitModewithinitialModel, and setinitialModelto corresponding instance.initModewithrandomorkmeans||, even they setinitialModelto a model instance, we don’t use it when training model. Since users explicitly tell us they would not warm start, but we will output warning log for this case.initialModel’s dimension should match training dataset’s number of features, otherwise, throw illegal argument exception.initialModel’s cluster count (a.k.ak) should match paramk, otherwise, throw illegal argument exception. The old MLlibKMeansdoes not allowkmismatched, so we keep consistent with it.Note: This implementation is inspired by #11119, thanks @yinxusen for the initial work.
How was this patch tested?
Add unit tests.