Skip to content

Commit fb32c38

Browse files
MechCodermengxr
authored andcommitted
[SPARK-7633] [MLLIB] [PYSPARK] Python bindings for StreamingLogisticRegressionwithSGD
Add Python bindings to StreamingLogisticRegressionwithSGD. No Java wrappers are needed as models are updated directly using train. Author: MechCoder <[email protected]> Closes apache#6849 from MechCoder/spark-3258 and squashes the following commits: b4376a5 [MechCoder] minor d7e5fc1 [MechCoder] Refactor into StreamingLinearAlgorithm Better docs 9c09d4e [MechCoder] [SPARK-7633] Python bindings for StreamingLogisticRegressionwithSGD
1 parent f04b567 commit fb32c38

File tree

2 files changed

+229
-2
lines changed

2 files changed

+229
-2
lines changed

python/pyspark/mllib/classification.py

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@
2121
from numpy import array
2222

2323
from pyspark import RDD
24+
from pyspark.streaming import DStream
2425
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
2526
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
2627
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
2728
from pyspark.mllib.util import Saveable, Loader, inherit_doc
2829

2930

3031
__all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 'LogisticRegressionWithLBFGS',
31-
'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes']
32+
'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes',
33+
'StreamingLogisticRegressionWithSGD']
3234

3335

3436
class LinearClassificationModel(LinearModel):
@@ -583,6 +585,98 @@ def train(cls, data, lambda_=1.0):
583585
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
584586

585587

588+
class StreamingLinearAlgorithm(object):
589+
"""
590+
Base class that has to be inherited by any StreamingLinearAlgorithm.
591+
592+
Prevents reimplementation of methods predictOn and predictOnValues.
593+
"""
594+
def __init__(self, model):
595+
self._model = model
596+
597+
def latestModel(self):
598+
"""
599+
Returns the latest model.
600+
"""
601+
return self._model
602+
603+
def _validate(self, dstream):
604+
if not isinstance(dstream, DStream):
605+
raise TypeError(
606+
"dstream should be a DStream object, got %s" % type(dstream))
607+
if not self._model:
608+
raise ValueError(
609+
"Model must be intialized using setInitialWeights")
610+
611+
def predictOn(self, dstream):
612+
"""
613+
Make predictions on a dstream.
614+
615+
:return: Transformed dstream object.
616+
"""
617+
self._validate(dstream)
618+
return dstream.map(lambda x: self._model.predict(x))
619+
620+
def predictOnValues(self, dstream):
621+
"""
622+
Make predictions on a keyed dstream.
623+
624+
:return: Transformed dstream object.
625+
"""
626+
self._validate(dstream)
627+
return dstream.mapValues(lambda x: self._model.predict(x))
628+
629+
630+
@inherit_doc
631+
class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
632+
"""
633+
Run LogisticRegression with SGD on a stream of data.
634+
635+
The weights obtained at the end of training a stream are used as initial
636+
weights for the next stream.
637+
638+
:param stepSize: Step size for each iteration of gradient descent.
639+
:param numIterations: Number of iterations run for each batch of data.
640+
:param miniBatchFraction: Fraction of data on which SGD is run for each
641+
iteration.
642+
:param regParam: L2 Regularization parameter.
643+
"""
644+
def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01):
645+
self.stepSize = stepSize
646+
self.numIterations = numIterations
647+
self.regParam = regParam
648+
self.miniBatchFraction = miniBatchFraction
649+
self._model = None
650+
super(StreamingLogisticRegressionWithSGD, self).__init__(
651+
model=self._model)
652+
653+
def setInitialWeights(self, initialWeights):
654+
"""
655+
Set the initial value of weights.
656+
657+
This must be set before running trainOn and predictOn.
658+
"""
659+
initialWeights = _convert_to_vector(initialWeights)
660+
661+
# LogisticRegressionWithSGD does only binary classification.
662+
self._model = LogisticRegressionModel(
663+
initialWeights, 0, initialWeights.size, 2)
664+
return self
665+
666+
def trainOn(self, dstream):
667+
"""Train the model on the incoming dstream."""
668+
self._validate(dstream)
669+
670+
def update(rdd):
671+
# LogisticRegressionWithSGD.train raises an error for an empty RDD.
672+
if not rdd.isEmpty():
673+
self._model = LogisticRegressionWithSGD.train(
674+
rdd, self.numIterations, self.stepSize,
675+
self.miniBatchFraction, self._model.weights)
676+
677+
dstream.foreachRDD(update)
678+
679+
586680
def _test():
587681
import doctest
588682
from pyspark import SparkContext

python/pyspark/mllib/tests.py

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
from time import time, sleep
2727
from shutil import rmtree
2828

29-
from numpy import array, array_equal, zeros, inf, all, random
29+
from numpy import (
30+
array, array_equal, zeros, inf, random, exp, dot, all, mean)
3031
from numpy import sum as array_sum
3132
from py4j.protocol import Py4JJavaError
3233

@@ -45,6 +46,7 @@
4546
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
4647
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
4748
from pyspark.mllib.regression import LabeledPoint
49+
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
4850
from pyspark.mllib.random import RandomRDDs
4951
from pyspark.mllib.stat import Statistics
5052
from pyspark.mllib.feature import Word2Vec
@@ -1037,6 +1039,137 @@ def test_dim(self):
10371039
self.assertEqual(len(point.features), 2)
10381040

10391041

1042+
class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
1043+
1044+
@staticmethod
1045+
def generateLogisticInput(offset, scale, nPoints, seed):
1046+
"""
1047+
Generate 1 / (1 + exp(-x * scale + offset))
1048+
1049+
where,
1050+
x is randomnly distributed and the threshold
1051+
and labels for each sample in x is obtained from a random uniform
1052+
distribution.
1053+
"""
1054+
rng = random.RandomState(seed)
1055+
x = rng.randn(nPoints)
1056+
sigmoid = 1. / (1 + exp(-(dot(x, scale) + offset)))
1057+
y_p = rng.rand(nPoints)
1058+
cut_off = y_p <= sigmoid
1059+
y_p[cut_off] = 1.0
1060+
y_p[~cut_off] = 0.0
1061+
return [
1062+
LabeledPoint(y_p[i], Vectors.dense([x[i]]))
1063+
for i in range(nPoints)]
1064+
1065+
def test_parameter_accuracy(self):
1066+
"""
1067+
Test that the final value of weights is close to the desired value.
1068+
"""
1069+
input_batches = [
1070+
self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
1071+
for i in range(20)]
1072+
input_stream = self.ssc.queueStream(input_batches)
1073+
1074+
slr = StreamingLogisticRegressionWithSGD(
1075+
stepSize=0.2, numIterations=25)
1076+
slr.setInitialWeights([0.0])
1077+
slr.trainOn(input_stream)
1078+
1079+
t = time()
1080+
self.ssc.start()
1081+
self._ssc_wait(t, 20.0, 0.01)
1082+
rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
1083+
self.assertAlmostEqual(rel, 0.1, 1)
1084+
1085+
def test_convergence(self):
1086+
"""
1087+
Test that weights converge to the required value on toy data.
1088+
"""
1089+
input_batches = [
1090+
self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
1091+
for i in range(20)]
1092+
input_stream = self.ssc.queueStream(input_batches)
1093+
models = []
1094+
1095+
slr = StreamingLogisticRegressionWithSGD(
1096+
stepSize=0.2, numIterations=25)
1097+
slr.setInitialWeights([0.0])
1098+
slr.trainOn(input_stream)
1099+
input_stream.foreachRDD(
1100+
lambda x: models.append(slr.latestModel().weights[0]))
1101+
1102+
t = time()
1103+
self.ssc.start()
1104+
self._ssc_wait(t, 15.0, 0.01)
1105+
t_models = array(models)
1106+
diff = t_models[1:] - t_models[:-1]
1107+
1108+
# Test that weights improve with a small tolerance,
1109+
self.assertTrue(all(diff >= -0.1))
1110+
self.assertTrue(array_sum(diff > 0) > 1)
1111+
1112+
@staticmethod
1113+
def calculate_accuracy_error(true, predicted):
1114+
return sum(abs(array(true) - array(predicted))) / len(true)
1115+
1116+
def test_predictions(self):
1117+
"""Test predicted values on a toy model."""
1118+
input_batches = []
1119+
for i in range(20):
1120+
batch = self.sc.parallelize(
1121+
self.generateLogisticInput(0, 1.5, 100, 42 + i))
1122+
input_batches.append(batch.map(lambda x: (x.label, x.features)))
1123+
input_stream = self.ssc.queueStream(input_batches)
1124+
1125+
slr = StreamingLogisticRegressionWithSGD(
1126+
stepSize=0.2, numIterations=25)
1127+
slr.setInitialWeights([1.5])
1128+
predict_stream = slr.predictOnValues(input_stream)
1129+
true_predicted = []
1130+
predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect()))
1131+
t = time()
1132+
self.ssc.start()
1133+
self._ssc_wait(t, 5.0, 0.01)
1134+
1135+
# Test that the accuracy error is no more than 0.4 on each batch.
1136+
for batch in true_predicted:
1137+
true, predicted = zip(*batch)
1138+
self.assertTrue(
1139+
self.calculate_accuracy_error(true, predicted) < 0.4)
1140+
1141+
def test_training_and_prediction(self):
1142+
"""Test that the model improves on toy data with no. of batches"""
1143+
input_batches = [
1144+
self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
1145+
for i in range(20)]
1146+
predict_batches = [
1147+
b.map(lambda lp: (lp.label, lp.features)) for b in input_batches]
1148+
1149+
slr = StreamingLogisticRegressionWithSGD(
1150+
stepSize=0.01, numIterations=25)
1151+
slr.setInitialWeights([-0.1])
1152+
errors = []
1153+
1154+
def collect_errors(rdd):
1155+
true, predicted = zip(*rdd.collect())
1156+
errors.append(self.calculate_accuracy_error(true, predicted))
1157+
1158+
true_predicted = []
1159+
input_stream = self.ssc.queueStream(input_batches)
1160+
predict_stream = self.ssc.queueStream(predict_batches)
1161+
slr.trainOn(input_stream)
1162+
ps = slr.predictOnValues(predict_stream)
1163+
ps.foreachRDD(lambda x: collect_errors(x))
1164+
1165+
t = time()
1166+
self.ssc.start()
1167+
self._ssc_wait(t, 20.0, 0.01)
1168+
1169+
# Test that the improvement in error is atleast 0.3
1170+
self.assertTrue(errors[1] - errors[-1] > 0.3)
1171+
1172+
10401173
if __name__ == "__main__":
10411174
if not _have_scipy:
10421175
print("NOTE: Skipping SciPy tests as it does not seem to be installed")

0 commit comments

Comments
 (0)