Skip to content

Commit 5440eff

Browse files
authored
Replace dask_ml.wrappers.Incremental with custom Incremental class (#855)
* Create metrics.py * add incremental functionality * lint and some comments * update more comments * add dask-ml fit function * style fix * DASK_2022_01_0 * add unit tests * style fix * remove scheduler * experiment_class comment * apply Vibhu's suggestions * style fix
1 parent 6844c74 commit 5440eff

File tree

6 files changed

+797
-53
lines changed

6 files changed

+797
-53
lines changed

dask_sql/physical/rel/custom/create_experiment.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,9 @@ class CreateExperimentPlugin(BaseRelPlugin):
3030
* model_class: Full path to the class of the model which has to be tuned.
3131
Any model class with sklearn interface is valid, but might or
3232
might not work well with Dask dataframes.
33-
Have a look into the
34-
[dask-ml documentation](https://ml.dask.org/index.html)
35-
for more information on which models work best.
3633
You might need to install necessary packages to use
3734
the models.
3835
* experiment_class : Full path of the Hyperparameter tuner
39-
from dask_ml, choose dask tuner class carefully based on what you
40-
exactly need (memory vs compute constrains), refer:
41-
[dask-ml documentation](https://ml.dask.org/hyper-parameter-search.html)
42-
(for tuning hyperparameter of the models both model_class and experiment class are
43-
required parameters.)
4436
* tune_parameters:
4537
Key-value of pairs of Hyperparameters to tune, i.e Search Space for
4638
particular model to tune
@@ -64,7 +56,7 @@ class CreateExperimentPlugin(BaseRelPlugin):
6456
6557
CREATE EXPERIMENT my_exp WITH(
6658
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
67-
experiment_class = 'dask_ml.model_selection.GridSearchCV',
59+
experiment_class = 'sklearn.model_selection.GridSearchCV',
6860
tune_parameters = (n_estimators = ARRAY [16, 32, 2],
6961
learning_rate = ARRAY [0.1,0.01,0.001],
7062
max_depth = ARRAY [3,4,5,10]
@@ -174,7 +166,11 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
174166

175167
search = ExperimentClass(model, {**parameters}, **experiment_kwargs)
176168
logger.info(tune_fit_kwargs)
177-
search.fit(X, y, **tune_fit_kwargs)
169+
search.fit(
170+
X.to_dask_array(lengths=True),
171+
y.to_dask_array(lengths=True),
172+
**tune_fit_kwargs,
173+
)
178174
df = pd.DataFrame(search.cv_results_)
179175
df["model_class"] = model_class
180176

dask_sql/physical/rel/custom/create_model.py

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ class CreateModelPlugin(BaseRelPlugin):
3232
* model_class: Full path to the class of the model to train.
3333
Any model class with sklearn interface is valid, but might or
3434
might not work well with Dask dataframes.
35-
Have a look into the
36-
[dask-ml documentation](https://ml.dask.org/index.html)
37-
for more information on which models work best.
3835
You might need to install necessary packages to use
3936
the models.
4037
* target_column: Which column from the data to use as target.
@@ -45,16 +42,12 @@ class CreateModelPlugin(BaseRelPlugin):
4542
want to set this parameter.
4643
* wrap_predict: Boolean flag, whether to wrap the selected
4744
model with a :class:`dask_sql.physical.rel.custom.wrappers.ParallelPostFit`.
48-
Have a look into the
49-
[dask-ml docu](https://ml.dask.org/meta-estimators.html#parallel-prediction-and-transformation)
50-
to learn more about it. Defaults to false. Typically you set
51-
it to true for sklearn models if predicting on big data.
45+
Defaults to false. Typically you set it to true for
46+
sklearn models if predicting on big data.
5247
* wrap_fit: Boolean flag, whether to wrap the selected
53-
model with a :class:`dask_ml.wrappers.Incremental`.
54-
Have a look into the
55-
[dask-ml docu](https://ml.dask.org/incremental.html)
56-
to learn more about it. Defaults to false. Typically you set
57-
it to true for sklearn models if training on big data.
48+
model with a :class:`dask_sql.physical.rel.custom.wrappers.Incremental`.
49+
Defaults to false. Typically you set it to true for
50+
sklearn models if training on big data.
5851
* fit_kwargs: keyword arguments sent to the call to fit().
5952
6053
All other arguments are passed to the constructor of the
@@ -76,7 +69,7 @@ class CreateModelPlugin(BaseRelPlugin):
7669
Examples:
7770
7871
CREATE MODEL my_model WITH (
79-
model_class = 'dask_ml.xgboost.XGBClassifier',
72+
model_class = 'xgboost.XGBClassifier',
8073
target_column = 'target'
8174
) AS (
8275
SELECT x, y, target
@@ -95,11 +88,10 @@ class CreateModelPlugin(BaseRelPlugin):
9588
dask dataframes.
9689
9790
* if you are training on relatively small amounts
98-
of data but predicting on large data samples
99-
(and you are not using a model build for usage with dask
100-
from the dask-ml package), you might want to set
101-
`wrap_predict` to True. With this option,
102-
model interference will be parallelized/distributed.
91+
of data but predicting on large data samples,
92+
you might want to set `wrap_predict` to True.
93+
With this option, model interference will be
94+
parallelized/distributed.
10395
* If you are training on large amounts of data,
10496
you can try setting wrap_fit to True. This will
10597
do the same on the training step, but works only on
@@ -158,10 +150,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
158150

159151
model = ModelClass(**kwargs)
160152
if wrap_fit:
161-
try:
162-
from dask_ml.wrappers import Incremental
163-
except ImportError: # pragma: no cover
164-
raise ValueError("Wrapping requires dask-ml to be installed.")
153+
from dask_sql.physical.rel.custom.wrappers import Incremental
165154

166155
model = Incremental(estimator=model)
167156

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
# Copyright 2017, Dask developers
2+
# Dask-ML project - https://github.com/dask/dask-ml
3+
from typing import Optional, TypeVar
4+
5+
import dask
6+
import dask.array as da
7+
import numpy as np
8+
import sklearn.metrics
9+
import sklearn.utils.multiclass
10+
from dask.array import Array
11+
from dask.utils import derived_from
12+
13+
ArrayLike = TypeVar("ArrayLike", Array, np.ndarray)
14+
15+
16+
def accuracy_score(
17+
y_true: ArrayLike,
18+
y_pred: ArrayLike,
19+
normalize: bool = True,
20+
sample_weight: Optional[ArrayLike] = None,
21+
compute: bool = True,
22+
) -> ArrayLike:
23+
"""Accuracy classification score.
24+
In multilabel classification, this function computes subset accuracy:
25+
the set of labels predicted for a sample must *exactly* match the
26+
corresponding set of labels in y_true.
27+
Read more in the :ref:`User Guide <accuracy_score>`.
28+
Parameters
29+
----------
30+
y_true : 1d array-like, or label indicator array
31+
Ground truth (correct) labels.
32+
y_pred : 1d array-like, or label indicator array
33+
Predicted labels, as returned by a classifier.
34+
normalize : bool, optional (default=True)
35+
If ``False``, return the number of correctly classified samples.
36+
Otherwise, return the fraction of correctly classified samples.
37+
sample_weight : 1d array-like, optional
38+
Sample weights.
39+
.. versionadded:: 0.7.0
40+
Returns
41+
-------
42+
score : scalar dask Array
43+
If ``normalize == True``, return the correctly classified samples
44+
(float), else it returns the number of correctly classified samples
45+
(int).
46+
The best performance is 1 with ``normalize == True`` and the number
47+
of samples with ``normalize == False``.
48+
Notes
49+
-----
50+
In binary and multiclass classification, this function is equal
51+
to the ``jaccard_similarity_score`` function.
52+
53+
"""
54+
55+
if y_true.ndim > 1:
56+
differing_labels = ((y_true - y_pred) == 0).all(1)
57+
score = differing_labels != 0
58+
else:
59+
score = y_true == y_pred
60+
61+
if normalize:
62+
score = da.average(score, weights=sample_weight)
63+
elif sample_weight is not None:
64+
score = da.dot(score, sample_weight)
65+
else:
66+
score = score.sum()
67+
68+
if compute:
69+
score = score.compute()
70+
return score
71+
72+
73+
def _log_loss_inner(
74+
x: ArrayLike, y: ArrayLike, sample_weight: Optional[ArrayLike], **kwargs
75+
):
76+
# da.map_blocks wasn't able to concatenate together the results
77+
# when we reduce down to a scalar per block. So we make an
78+
# array with 1 element.
79+
if sample_weight is not None:
80+
sample_weight = sample_weight.ravel()
81+
return np.array(
82+
[sklearn.metrics.log_loss(x, y, sample_weight=sample_weight, **kwargs)]
83+
)
84+
85+
86+
def log_loss(
87+
y_true, y_pred, eps=1e-15, normalize=True, sample_weight=None, labels=None
88+
):
89+
if not (dask.is_dask_collection(y_true) and dask.is_dask_collection(y_pred)):
90+
return sklearn.metrics.log_loss(
91+
y_true,
92+
y_pred,
93+
eps=eps,
94+
normalize=normalize,
95+
sample_weight=sample_weight,
96+
labels=labels,
97+
)
98+
99+
if y_pred.ndim > 1 and y_true.ndim == 1:
100+
y_true = y_true.reshape(-1, 1)
101+
drop_axis: Optional[int] = 1
102+
if sample_weight is not None:
103+
sample_weight = sample_weight.reshape(-1, 1)
104+
else:
105+
drop_axis = None
106+
107+
result = da.map_blocks(
108+
_log_loss_inner,
109+
y_true,
110+
y_pred,
111+
sample_weight,
112+
chunks=(1,),
113+
drop_axis=drop_axis,
114+
dtype="f8",
115+
eps=eps,
116+
normalize=normalize,
117+
labels=labels,
118+
)
119+
if normalize and sample_weight is not None:
120+
sample_weight = sample_weight.ravel()
121+
block_weights = sample_weight.map_blocks(np.sum, chunks=(1,), keepdims=True)
122+
return da.average(result, 0, weights=block_weights)
123+
elif normalize:
124+
return result.mean()
125+
else:
126+
return result.sum()
127+
128+
129+
def _check_sample_weight(sample_weight: Optional[ArrayLike]):
130+
if sample_weight is not None:
131+
raise ValueError("'sample_weight' is not supported.")
132+
133+
134+
@derived_from(sklearn.metrics)
135+
def mean_squared_error(
136+
y_true: ArrayLike,
137+
y_pred: ArrayLike,
138+
sample_weight: Optional[ArrayLike] = None,
139+
multioutput: Optional[str] = "uniform_average",
140+
squared: bool = True,
141+
compute: bool = True,
142+
) -> ArrayLike:
143+
_check_sample_weight(sample_weight)
144+
output_errors = ((y_pred - y_true) ** 2).mean(axis=0)
145+
146+
if isinstance(multioutput, str) or multioutput is None:
147+
if multioutput == "raw_values":
148+
if compute:
149+
return output_errors.compute()
150+
else:
151+
return output_errors
152+
else:
153+
raise ValueError("Weighted 'multioutput' not supported.")
154+
result = output_errors.mean()
155+
if not squared:
156+
result = da.sqrt(result)
157+
if compute:
158+
result = result.compute()
159+
return result
160+
161+
162+
def _check_reg_targets(
163+
y_true: ArrayLike, y_pred: ArrayLike, multioutput: Optional[str]
164+
):
165+
if multioutput is not None and multioutput != "uniform_average":
166+
raise NotImplementedError("'multioutput' must be 'uniform_average'")
167+
168+
if y_true.ndim == 1:
169+
y_true = y_true.reshape((-1, 1))
170+
if y_pred.ndim == 1:
171+
y_pred = y_pred.reshape((-1, 1))
172+
173+
# TODO: y_type, multioutput
174+
return None, y_true, y_pred, multioutput
175+
176+
177+
@derived_from(sklearn.metrics)
178+
def r2_score(
179+
y_true: ArrayLike,
180+
y_pred: ArrayLike,
181+
sample_weight: Optional[ArrayLike] = None,
182+
multioutput: Optional[str] = "uniform_average",
183+
compute: bool = True,
184+
) -> ArrayLike:
185+
_check_sample_weight(sample_weight)
186+
_, y_true, y_pred, _ = _check_reg_targets(y_true, y_pred, multioutput)
187+
weight = 1.0
188+
189+
numerator = (weight * (y_true - y_pred) ** 2).sum(axis=0, dtype="f8")
190+
denominator = (weight * (y_true - y_true.mean(axis=0)) ** 2).sum(axis=0, dtype="f8")
191+
192+
nonzero_denominator = denominator != 0
193+
nonzero_numerator = numerator != 0
194+
valid_score = nonzero_denominator & nonzero_numerator
195+
output_chunks = getattr(y_true, "chunks", [None, None])[1]
196+
output_scores = da.ones([y_true.shape[1]], chunks=output_chunks)
197+
with np.errstate(all="ignore"):
198+
output_scores[valid_score] = 1 - (
199+
numerator[valid_score] / denominator[valid_score]
200+
)
201+
output_scores[nonzero_numerator & ~nonzero_denominator] = 0.0
202+
203+
result = output_scores.mean(axis=0)
204+
if compute:
205+
result = result.compute()
206+
return result

0 commit comments

Comments
 (0)