From a6bf4cd0e7ed86a4ca19d4d240ae87c95d54642e Mon Sep 17 00:00:00 2001 From: jyotiska Date: Thu, 27 Feb 2014 19:43:19 +0530 Subject: [PATCH 1/2] added callsite info for context.py --- python/pyspark/context.py | 14 +++++++++++++- python/pyspark/rdd.py | 19 ++++++++++++------- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 93faa2e3857ed..23c8d4be2ffe4 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -29,6 +29,7 @@ from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel +from pyspark import rdd from pyspark.rdd import RDD from py4j.java_collections import ListConverter @@ -83,6 +84,10 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, ... ValueError:... """ + if rdd._extract_concise_traceback() is not None: + self._callsite = rdd._extract_concise_traceback() + else: + self._callsite = {"function": None, "file": None, "line": None} SparkContext._ensure_initialized(self, gateway=gateway) self.environment = environment or {} @@ -169,7 +174,14 @@ def _ensure_initialized(cls, instance=None, gateway=None): if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: - raise ValueError("Cannot run multiple SparkContexts at once") + currentMaster = SparkContext._active_spark_context.master + currentAppName = SparkContext._active_spark_context.appName + callsite = SparkContext._active_spark_context._callsite + + # Raise error if there is already a running Spark context + raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \ + " created by %s at %s:%s " \ + % (currentAppName, currentMaster, callsite['function'], callsite['file'], callsite['line'])) else: SparkContext._active_spark_context = instance diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1330e6146800c..315aadceecb77 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -42,12 +42,13 @@ __all__ = ["RDD"] def _extract_concise_traceback(): + """ + This function returns the traceback info for a callsite, returns a dict + with function name, file name and line number + """ tb = traceback.extract_stack() if len(tb) == 0: - return "I'm lost!" - # HACK: This function is in a file called 'rdd.py' in the top level of - # everything PySpark. Just trim off the directory name and assume - # everything in that tree is PySpark guts. + return None file, line, module, what = tb[len(tb) - 1] sparkpath = os.path.dirname(file) first_spark_frame = len(tb) - 1 @@ -58,16 +59,20 @@ def _extract_concise_traceback(): break if first_spark_frame == 0: file, line, fun, what = tb[0] - return "%s at %s:%d" % (fun, file, line) + return {"function": fun, "file": file, "line": line} sfile, sline, sfun, swhat = tb[first_spark_frame] ufile, uline, ufun, uwhat = tb[first_spark_frame-1] - return "%s at %s:%d" % (sfun, ufile, uline) + return {"function": sfun, "file": ufile, "line": uline} _spark_stack_depth = 0 class _JavaStackTrace(object): def __init__(self, sc): - self._traceback = _extract_concise_traceback() + tb = _extract_concise_traceback() + if tb is not None: + self._traceback = "%s at %s:%s" % (tb["function"], tb["file"], tb["line"]) + else: + self._traceback = "Error! Could not extract traceback info" self._context = sc def __enter__(self): From c9439bef66009a90a9a1433e3b7801b76e5f0f0e Mon Sep 17 00:00:00 2001 From: jyotiska Date: Wed, 5 Mar 2014 20:01:55 +0530 Subject: [PATCH 2/2] replaced dict with namedtuple --- python/pyspark/context.py | 6 ++++-- python/pyspark/rdd.py | 8 +++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 23c8d4be2ffe4..c3334f2e9ec84 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -20,6 +20,7 @@ import sys from threading import Lock from tempfile import NamedTemporaryFile +from collections import namedtuple from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -87,7 +88,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, if rdd._extract_concise_traceback() is not None: self._callsite = rdd._extract_concise_traceback() else: - self._callsite = {"function": None, "file": None, "line": None} + tempNamedTuple = namedtuple("Callsite", "function file linenum") + self._callsite = tempNamedTuple(function=None, file=None, linenum=None) SparkContext._ensure_initialized(self, gateway=gateway) self.environment = environment or {} @@ -181,7 +183,7 @@ def _ensure_initialized(cls, instance=None, gateway=None): # Raise error if there is already a running Spark context raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \ " created by %s at %s:%s " \ - % (currentAppName, currentMaster, callsite['function'], callsite['file'], callsite['line'])) + % (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum)) else: SparkContext._active_spark_context = instance diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 315aadceecb77..5138e6f62b8aa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -18,6 +18,7 @@ from base64 import standard_b64encode as b64enc import copy from collections import defaultdict +from collections import namedtuple from itertools import chain, ifilter, imap import operator import os @@ -47,6 +48,7 @@ def _extract_concise_traceback(): with function name, file name and line number """ tb = traceback.extract_stack() + callsite = namedtuple("Callsite", "function file linenum") if len(tb) == 0: return None file, line, module, what = tb[len(tb) - 1] @@ -59,10 +61,10 @@ def _extract_concise_traceback(): break if first_spark_frame == 0: file, line, fun, what = tb[0] - return {"function": fun, "file": file, "line": line} + return callsite(function=fun, file=file, linenum=line) sfile, sline, sfun, swhat = tb[first_spark_frame] ufile, uline, ufun, uwhat = tb[first_spark_frame-1] - return {"function": sfun, "file": ufile, "line": uline} + return callsite(function=sfun, file=ufile, linenum=uline) _spark_stack_depth = 0 @@ -70,7 +72,7 @@ class _JavaStackTrace(object): def __init__(self, sc): tb = _extract_concise_traceback() if tb is not None: - self._traceback = "%s at %s:%s" % (tb["function"], tb["file"], tb["line"]) + self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum) else: self._traceback = "Error! Could not extract traceback info" self._context = sc