Skip to content

Conversation

@grundprinzip
Copy link
Contributor

@grundprinzip grundprinzip commented Jun 30, 2024

What changes were proposed in this pull request?

At times users want to evaluate the properties of their data flow graph and understand how certain transformations behave. Today this is more complex than necessary. Even though the df.observe() API has been around in Spark since Spark 3.3, it's usage is not widespread.

To give users a more visible API for understanding the data flow execution in Spark, this patch adds a new method to the DataFrame API called df.debug(). Debug will by default do the following:

  1. Create a new observation with the name debug:<uuid>
  2. Add a count(1) observation to it
  3. Capture the call stack and attach it to the observation.

After the execution, users can now access the observation using the execution info property of the DataFrame.

df = spark.range(100).debug()
df = df.filter(df.id < 10).debug()
df.collect()
ei = df.executionInfo
for o in ei.observations:
    print(o.debugString())

The debug String contains the reference to the observation, the call site and the values.

Observation(name=debug:945125d7-2c0b-495c-a94b-3fdd56560b2f, planId=0, metrics=count_values=100, callSite=<ipython-input-1-df562d17bba7>:1@<module>)
Observation(name=debug:d8a11a55-58ed-4b62-80ef-14a862c1f446, planId=2, metrics=count_values=10, callSite=<ipython-input-2-4a176498c5c4>:1@<module>)
image

In addition to the count, we have defined several useful additional debug observations that can be easily injected.

from pyspark.sql.metrics import DataDebugOp
df = spark.range(100).debug(DataDebugOp.max_value("id"))
df = df.filter(df.id < 10).debug(DataDebugOp.max_value("id"))
df.collect()
ei = df.executionInfo
for o in ei.observations:
     print(o.debugString())

Produces the following output:

Observation(name=debug:a682f617-ddde-45bc-8263-8c3220cf9887, planId=4, metrics=count_values=100, max_value_id=99, callSite=<ipython-input-7-a374a4f978f1>:2@<module>)
Observation(name=debug:a82808b6-039e-406d-acc5-c9922be84893, planId=6, metrics=count_values=10, max_value_id=9, callSite=<ipython-input-7-a374a4f978f1>:3@<module>)
image

Why are the changes needed?

User-support

Does this PR introduce any user-facing change?

Adds new method.

How was this patch tested?

New UT

Was this patch authored or co-authored using generative AI tooling?

No

@grundprinzip grundprinzip changed the title [SPARK-48756] Support for df.debug() in Connect Mode [SPARK-48756][CONNECT][PYTHON] Support for df.debug() in Connect Mode Jun 30, 2024
@grundprinzip grundprinzip changed the title [SPARK-48756][CONNECT][PYTHON] Support for df.debug() in Connect Mode [SPARK-48756][CONNECT][PYTHON]Support for df.debug() in Connect Mode Jun 30, 2024
===========
Spark Connect - Execution Info and Debug
===========
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have =========== to match with its size - otherwise Sphinx warns and complains about it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jun 30, 2024

cc @itholic and @ueshin

the usage. In addition, it makes sure that the captured metrics are properly collected
as part of the execution info.
.. versionadded:: 4.0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.. versionadded:: 4.0.0
.. versionadded:: 4.0.0

Otherwise the HTML output is malformed

from pyspark.errors import PySparkValueError
from pyspark.errors import PySparkValueError, PySparkTypeError
from pyspark.sql import Observation, Column

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

per PEP8


@classmethod
def count_values(cls) -> "DataDebugOp":
return DataDebugOp("count_values", F.count(F.lit(1)).alias("count_values"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a wrapper of observe API. I think it does not simplify a lot vs the existing uscase ..

observation = Observation("my metrics")
observed_df = df.observe(Observation("my metrics"), count(lit(1)).alias("count"), max(col("age")))
observation.get()

and this won't work for streaming.

Copy link
Contributor

@itholic itholic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Let me address _capture_call_site as well

self._execution_info.setObservations(self._plan.observations)
return self._execution_info

def debug(self, *other: List["DataDebugOp"]) -> "DataFrame":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the usage is:

spark.range(100).debug(DataDebugOp.max_value("id"), DataDebugOp.count_null_values("id"))

instead of:

spark.range(100).debug([DataDebugOp.max_value("id"), DataDebugOp.count_null_values("id")])
Suggested change
def debug(self, *other: List["DataDebugOp"]) -> "DataFrame":
def debug(self, *other: "DataDebugOp") -> "DataFrame":

"""
...

def debug(self) -> "DataFrame":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature should all the same:

Suggested change
def debug(self) -> "DataFrame":
def debug(self, *other: "DataDebugOp") -> "DataFrame":

message_parameters={"member": "queryExecution"},
)

def debug(self) -> "DataFrame":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Suggested change
def debug(self) -> "DataFrame":
def debug(self, *other: "DataDebugOp") -> "DataFrame":

data debug operations.
"""

@classmethod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: @staticmethod if cls is not used?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 21, 2024
@github-actions github-actions bot closed this Oct 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants