Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ DataFrame
DataFrame.crossJoin
DataFrame.crosstab
DataFrame.cube
DataFrame.debug
DataFrame.describe
DataFrame.distinct
DataFrame.drop
Expand Down
60 changes: 60 additions & 0 deletions python/docs/source/user_guide/connect_execution_info_and_debug.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
===========
Spark Connect - Execution Info and Debug
===========
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have =========== to match with its size - otherwise Sphinx warns and complains about it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done



Execution Info
--------------

The ``executionInfo`` property of the DataFrame allows users to access execution
metrics about a previously executed operation. In Spark Connect mode, the
plan metrics of the execution are always submitted as the last elements of the
response allowing users an easy way to present this information.

.. code-block:: python
df = spark.range(100)
df.collect()
ei = df.executionInfo
# Access the execution metrics:
metrics = ei.metrics
print(metrics.toText())
Debugging DataFrame Data Flows
-------------------------------
Sometimes it is useful to understand the data flow of a DataFrame operation. Whereas
metrics allow to track row counts between different operators, the execution plan
does not always resemble the semantic execution.

The ``debug`` method allows users to inject predefiend observation points into the
query execution. After execution the user can access the observations and access
the associated metrics.

By default, calling ``debug()`` will inject a single observation that counts the number
of rows flowing out of the DataFrame.


.. code-block:: python
df = spark.range(100).debug()
filtered = df.where(df.id < 10).debug()
filtered.collect()
ei = df.executionInfo
for op in ei.observations:
print(op.debugString())
1 change: 1 addition & 0 deletions python/docs/source/user_guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ PySpark specific user guides are available here:
python_packaging
sql/index
pandas_on_spark/index
connect_execution_info_and_debug

There are also basic programming guides covering multiple languages available in
`the Spark documentation <https://spark.apache.org/docs/latest/index.html#where-to-go-from-here>`_, including these:
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,11 @@
"Unknown value for `<var>`."
]
},
"UNSUPPORTED_DATADEBUGOP": {
"message": [
"Argument `<arg_name>` should be a DataDebugOp, got <arg_type>."
]
},
"UNSUPPORTED_DATA_TYPE": {
"message": [
"Unsupported DataType `<data_type>`."
Expand Down
55 changes: 42 additions & 13 deletions python/pyspark/errors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import inspect
import os
import threading
from typing import Any, Callable, Dict, Match, TypeVar, Type, Optional, TYPE_CHECKING
from typing import Any, Callable, Dict, Match, TypeVar, Type, Optional, TYPE_CHECKING, List
import pyspark
from pyspark.errors.error_classes import ERROR_CLASSES_MAP

Expand Down Expand Up @@ -165,19 +165,11 @@ def _capture_call_site(spark_session: "SparkSession", depth: int) -> str:
The call site information is used to enhance error messages with the exact location
in the user code that led to the error.
"""
# Filtering out PySpark code and keeping user code only
pyspark_root = os.path.dirname(pyspark.__file__)
stack = [
frame_info for frame_info in inspect.stack() if pyspark_root not in frame_info.filename
]

selected_frames = stack[:depth]

# We try import here since IPython is not a required dependency
selected_frames = call_site_stack(depth)
try:
from IPython import get_ipython
import IPython

ipython = get_ipython()
ipython = IPython.get_ipython()
except ImportError:
ipython = None

Expand All @@ -189,7 +181,6 @@ def _capture_call_site(spark_session: "SparkSession", depth: int) -> str:
else:
call_sites = [f"{frame.filename}:{frame.lineno}" for frame in selected_frames]
call_sites_str = "\n".join(call_sites)

return call_sites_str


Expand Down Expand Up @@ -257,3 +248,41 @@ def with_origin_to_class(cls: Type[T]) -> Type[T]:
):
setattr(cls, name, _with_origin(method))
return cls


def call_site_stack(depth: int = 10) -> List[inspect.FrameInfo]:
"""
Capture the call site stack and filter out all stack frames that are not user code.

This function will return the call stack above all PySpark code and IPython code. Usually
the first frame will be the place where the user code reached the PySpark API.

If SPARK_TESTING is set in the environment, all frames will be returned.

Parameters
----------
depth : int
How many stack frames to select

"""

# Filtering out PySpark code and keeping user code only
pyspark_root = os.path.dirname(pyspark.__file__)
stack = [
frame_info
for frame_info in inspect.stack()
if pyspark_root not in frame_info.filename or "SPARK_TESTING" in os.environ
]

selected_frames = stack[:depth]

# We try import here since IPython is not a required dependency
try:
import IPython

ipy_root = IPython.__file__
selected_frames = [f for f in selected_frames if ipy_root not in f.filename]
except ImportError:
pass

return selected_frames
6 changes: 6 additions & 0 deletions python/pyspark/sql/classic/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,12 @@ def executionInfo(self) -> Optional["ExecutionInfo"]:
message_parameters={"member": "queryExecution"},
)

def debug(self) -> "DataFrame":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Suggested change
def debug(self) -> "DataFrame":
def debug(self, *other: "DataDebugOp") -> "DataFrame":

raise PySparkValueError(
error_class="CLASSIC_OPERATION_NOT_SUPPORTED_ON_DF",
message_parameters={"member": "debug"},
)


def _to_scala_map(sc: "SparkContext", jm: Dict) -> "JavaObject":
"""
Expand Down
35 changes: 34 additions & 1 deletion python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
import warnings
from collections.abc import Iterable
import functools
from uuid import uuid4

from pyspark import _NoValue
from pyspark.errors.utils import call_site_stack
from pyspark._globals import _NoValueType
from pyspark.util import is_remote_only
from pyspark.sql.types import Row, StructType, _create_row
Expand Down Expand Up @@ -84,6 +86,7 @@
from pyspark.sql.connect.functions import builtin as F
from pyspark.sql.pandas.types import from_arrow_schema, to_arrow_schema
from pyspark.sql.pandas.functions import _validate_pandas_udf # type: ignore[attr-defined]
from pyspark.sql.metrics import DataDebugOp


if TYPE_CHECKING:
Expand All @@ -101,7 +104,7 @@
from pyspark.sql.connect.observation import Observation
from pyspark.sql.connect.session import SparkSession
from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
from pyspark.sql.metrics import ExecutionInfo
from pyspark.sql.metrics import ExecutionInfo, DataDebugOp


class DataFrame(ParentDataFrame):
Expand Down Expand Up @@ -2227,8 +2230,38 @@ def rdd(self) -> "RDD[Row]":

@property
def executionInfo(self) -> Optional["ExecutionInfo"]:
# Update the observations if needed.
if self._plan.observations:
if self._execution_info and not self._execution_info.observations:
self._execution_info.setObservations(self._plan.observations)
return self._execution_info

def debug(self, *other: List["DataDebugOp"]) -> "DataFrame":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the usage is:

spark.range(100).debug(DataDebugOp.max_value("id"), DataDebugOp.count_null_values("id"))

instead of:

spark.range(100).debug([DataDebugOp.max_value("id"), DataDebugOp.count_null_values("id")])
Suggested change
def debug(self, *other: List["DataDebugOp"]) -> "DataFrame":
def debug(self, *other: "DataDebugOp") -> "DataFrame":

# Needs to be imported here to avoid the recursive import.
from pyspark.sql.connect.observation import Observation

# Extract the stack
stack = call_site_stack(depth=10)
frames = [f"{s.filename}:{s.lineno}@{s.function}" for s in stack]

# Check that all elements are of type 'DataDebugOp'
for op in other:
if not isinstance(op, DataDebugOp):
raise PySparkTypeError(
error_class="UNSUPPORTED_DATADEBUGOP",
message_parameters={"arg_name": "other", "arg_type": type(op).__name__},
)

# Capture the expressions for the debug op.
ops: List[DataDebugOp] = [
DataDebugOp.count_values(),
] + list(other)
exprs = list(map(lambda x: x(), ops))

# Create the Observation that captures all the expressions for this "debug" op.
obs = Observation(name=f"debug:{uuid4()}", call_site=frames, plan_id=self._plan.plan_id)
return self.observe(obs, *exprs)


class DataFrameNaFunctions(ParentDataFrameNaFunctions):
def __init__(self, df: ParentDataFrame):
Expand Down
24 changes: 22 additions & 2 deletions python/pyspark/sql/connect/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List
import uuid

from pyspark.errors import (
Expand All @@ -33,7 +33,9 @@


class Observation:
def __init__(self, name: Optional[str] = None) -> None:
def __init__(
self, name: Optional[str] = None, call_site: Optional[List[str]] = None, plan_id: int = -1
) -> None:
if name is not None:
if not isinstance(name, str):
raise PySparkTypeError(
Expand All @@ -47,9 +49,19 @@ def __init__(self, name: Optional[str] = None) -> None:
)
self._name = name
self._result: Optional[Dict[str, Any]] = None
self._call_site = call_site
self._plan_id = plan_id

__init__.__doc__ = PySparkObservation.__init__.__doc__

@property
def callSite(self) -> Optional[List[str]]:
return self._call_site

@property
def planId(self) -> int:
return self._plan_id

def _on(self, df: DataFrame, *exprs: Column) -> DataFrame:
if self._result is not None:
raise PySparkAssertionError(error_class="REUSE_OBSERVATION", message_parameters={})
Expand All @@ -75,6 +87,14 @@ def get(self) -> Dict[str, Any]:

return self._result

def debugString(self) -> str:
call_site = self._call_site[0] if self._call_site is not None else ""
metrics = ", ".join([f"{k}={v}" for k, v in self.get.items()])
return (
f"Observation(name={self._name}, planId={self._plan_id},"
f" metrics={metrics}, callSite={call_site})"
)

get.__doc__ = PySparkObservation.get.__doc__


Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/sql/connect/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ def _fresh_plan_id() -> int:
assert plan_id is not None
return plan_id

@property
def plan_id(self) -> int:
return self._plan_id

def _create_proto_relation(self) -> proto.Relation:
plan = proto.Relation()
plan.common.plan_id = self._plan_id
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6307,6 +6307,21 @@ def executionInfo(self) -> Optional["ExecutionInfo"]:
"""
...

def debug(self) -> "DataFrame":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature should all the same:

Suggested change
def debug(self) -> "DataFrame":
def debug(self, *other: "DataDebugOp") -> "DataFrame":

"""
Helper function that allows to debug the query execution with customer observations.

Essentially, this method is a wrapper around the `observe()` method, but simplifies
the usage. In addition, it makes sure that the captured metrics are properly collected
as part of the execution info.

.. versionadded:: 4.0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.. versionadded:: 4.0.0
.. versionadded:: 4.0.0

Otherwise the HTML output is malformed

Returns
-------
DataFrame instance with the observations added
"""
...


class DataFrameNaFunctions:
"""Functionality for working with missing data in :class:`DataFrame`.
Expand Down
Loading