From 9ae9bd4fb59d213def204222a2f42469b71d8eb0 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 16 Dec 2015 14:11:36 -0800 Subject: [PATCH 1/2] use SQLContext.getOrCreate in mllib --- python/pyspark/mllib/common.py | 6 +++--- python/pyspark/mllib/evaluation.py | 10 +++++----- python/pyspark/mllib/feature.py | 3 +-- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py index a439a488de5cc..9fda1b1682f57 100644 --- a/python/pyspark/mllib/common.py +++ b/python/pyspark/mllib/common.py @@ -102,7 +102,7 @@ def _java2py(sc, r, encoding="bytes"): return RDD(jrdd, sc) if clsName == 'DataFrame': - return DataFrame(r, SQLContext(sc)) + return DataFrame(r, SQLContext.getOrCreate(sc)) if clsName in _picklable_classes: r = sc._jvm.SerDe.dumps(r) @@ -125,7 +125,7 @@ def callJavaFunc(sc, func, *args): def callMLlibFunc(name, *args): """ Call API in PythonMLLibAPI """ - sc = SparkContext._active_spark_context + sc = SparkContext.getOrCreate() api = getattr(sc._jvm.PythonMLLibAPI(), name) return callJavaFunc(sc, api, *args) @@ -135,7 +135,7 @@ class JavaModelWrapper(object): Wrapper for the model in JVM """ def __init__(self, java_model): - self._sc = SparkContext._active_spark_context + self._sc = SparkContext.getOrCreate() self._java_model = java_model def __del__(self): diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py index 8c87ee9df2132..22e68ea5b4511 100644 --- a/python/pyspark/mllib/evaluation.py +++ b/python/pyspark/mllib/evaluation.py @@ -44,7 +44,7 @@ class BinaryClassificationMetrics(JavaModelWrapper): def __init__(self, scoreAndLabels): sc = scoreAndLabels.ctx - sql_ctx = SQLContext(sc) + sql_ctx = SQLContext.getOrCreate(sc) df = sql_ctx.createDataFrame(scoreAndLabels, schema=StructType([ StructField("score", DoubleType(), nullable=False), StructField("label", DoubleType(), nullable=False)])) @@ -103,7 +103,7 @@ class RegressionMetrics(JavaModelWrapper): def __init__(self, predictionAndObservations): sc = predictionAndObservations.ctx - sql_ctx = SQLContext(sc) + sql_ctx = SQLContext.getOrCreate(sc) df = sql_ctx.createDataFrame(predictionAndObservations, schema=StructType([ StructField("prediction", DoubleType(), nullable=False), StructField("observation", DoubleType(), nullable=False)])) @@ -197,7 +197,7 @@ class MulticlassMetrics(JavaModelWrapper): def __init__(self, predictionAndLabels): sc = predictionAndLabels.ctx - sql_ctx = SQLContext(sc) + sql_ctx = SQLContext.getOrCreate(sc) df = sql_ctx.createDataFrame(predictionAndLabels, schema=StructType([ StructField("prediction", DoubleType(), nullable=False), StructField("label", DoubleType(), nullable=False)])) @@ -338,7 +338,7 @@ class RankingMetrics(JavaModelWrapper): def __init__(self, predictionAndLabels): sc = predictionAndLabels.ctx - sql_ctx = SQLContext(sc) + sql_ctx = SQLContext.getOrCreate(sc) df = sql_ctx.createDataFrame(predictionAndLabels, schema=sql_ctx._inferSchema(predictionAndLabels)) java_model = callMLlibFunc("newRankingMetrics", df._jdf) @@ -424,7 +424,7 @@ class MultilabelMetrics(JavaModelWrapper): def __init__(self, predictionAndLabels): sc = predictionAndLabels.ctx - sql_ctx = SQLContext(sc) + sql_ctx = SQLContext.getOrCreate(sc) df = sql_ctx.createDataFrame(predictionAndLabels, schema=sql_ctx._inferSchema(predictionAndLabels)) java_class = sc._jvm.org.apache.spark.mllib.evaluation.MultilabelMetrics diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index 7254679ebb533..9f1ba3a9b442f 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -100,8 +100,7 @@ def transform(self, vector): :return: normalized vector. If the norm of the input is zero, it will return the input vector. """ - sc = SparkContext._active_spark_context - assert sc is not None, "SparkContext should be initialized first" + sc = SparkContext.getOrCreate() if isinstance(vector, RDD): vector = vector.map(_convert_to_vector) else: From 518e37c409334cbc0bd398aba83c77184e53f75a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 16 Dec 2015 15:04:46 -0800 Subject: [PATCH 2/2] address comments --- python/pyspark/mllib/feature.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index 9f1ba3a9b442f..acd7ec57d69da 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -30,7 +30,7 @@ from py4j.protocol import Py4JJavaError -from pyspark import SparkContext, since +from pyspark import since from pyspark.rdd import RDD, ignore_unicode_prefix from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper from pyspark.mllib.linalg import ( @@ -100,7 +100,6 @@ def transform(self, vector): :return: normalized vector. If the norm of the input is zero, it will return the input vector. """ - sc = SparkContext.getOrCreate() if isinstance(vector, RDD): vector = vector.map(_convert_to_vector) else: