Skip to content

Conversation

@anshbansal
Copy link
Collaborator

@anshbansal anshbansal commented Nov 21, 2025

This makes logging much easier to read in the logs
image

@github-actions github-actions bot added the smoke_test Contains changes related to smoke tests label Nov 21, 2025
@anshbansal anshbansal marked this pull request as ready for review November 21, 2025 06:03
@codecov
Copy link

codecov bot commented Nov 21, 2025

❌ 4 Tests Failed:

Tests completed Failed Passed Skipped
457 4 453 6
View the top 3 failed test(s) by shortest run time
tests.cli.graphql_cmd.test_graphql_cli_smoke.TestGraphQLCLIStandalone::test_graphql_schema_discovery
Stack Traces | 0.015s run time
self = <tests.cli.graphql_cmd.test_graphql_cli_smoke.TestGraphQLCLIStandalone object at 0x7f0432384c50>

    def test_graphql_schema_discovery(self):
        """Test GraphQL schema discovery functionality."""
        # This should work even without authentication for schema discovery
>       exit_code, stdout, stderr = self._run_datahub_cli(
            ["graphql", "--list-operations"]
        )

.../cli/graphql_cmd/test_graphql_cli_smoke.py:66: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/graphql_cmd/test_graphql_cli_smoke.py:51: in _run_datahub_cli
    result = run_datahub_cmd(args, input=input_data)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tests/utils.py:239: in run_datahub_cmd
    return runner.invoke(datahub, command, input=input, env=env)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <click.testing.CliRunner object at 0x7f041ef3fb90>, cli = <Group datahub>
args = ['graphql', '--list-operations'], input = None, env = None
catch_exceptions = True, color = False, extra = {}
exc_info = (<class 'SystemExit'>, SystemExit(1), <traceback object at 0x7f04265424c0>)
outstreams = (<click.testing.BytesIOCopy object at 0x7f041db787c0>, <click.testing.BytesIOCopy object at 0x7f041db78810>, <_io.BytesIO object at 0x7f041db78770>)
return_value = None, exception = SystemExit(1), exit_code = 1
prog_name = 'datahub'

    def invoke(
        self,
        cli: Command,
        args: str | cabc.Sequence[str] | None = None,
        input: str | bytes | t.IO[t.Any] | None = None,
        env: cabc.Mapping[str, str | None] | None = None,
        catch_exceptions: bool | None = None,
        color: bool = False,
        **extra: t.Any,
    ) -> Result:
        """Invokes a command in an isolated environment.  The arguments are
        forwarded directly to the command line script, the `extra` keyword
        arguments are passed to the :meth:`~clickpkg.Command.main` function of
        the command.
    
        This returns a :class:`Result` object.
    
        :param cli: the command to invoke
        :param args: the arguments to invoke. It may be given as an iterable
                     or a string. When given as string it will be interpreted
                     as a Unix shell command. More details at
                     :func:`shlex.split`.
        :param input: the input data for `sys.stdin`.
        :param env: the environment overrides.
        :param catch_exceptions: Whether to catch any other exceptions than
                                 ``SystemExit``. If :data:`None`, the value
                                 from :class:`CliRunner` is used.
        :param extra: the keyword arguments to pass to :meth:`main`.
        :param color: whether the output should contain color codes. The
                      application can still override this explicitly.
    
        .. versionadded:: 8.2
            The result object has the ``output_bytes`` attribute with
            the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would
            see it in its terminal.
    
        .. versionchanged:: 8.2
            The result object always returns the ``stderr_bytes`` stream.
    
        .. versionchanged:: 8.0
            The result object has the ``return_value`` attribute with
            the value returned from the invoked command.
    
        .. versionchanged:: 4.0
            Added the ``color`` parameter.
    
        .. versionchanged:: 3.0
            Added the ``catch_exceptions`` parameter.
    
        .. versionchanged:: 3.0
            The result object has the ``exc_info`` attribute with the
            traceback if available.
        """
        exc_info = None
        if catch_exceptions is None:
            catch_exceptions = self.catch_exceptions
    
        with self.isolation(input=input, env=env, color=color) as outstreams:
            return_value = None
            exception: BaseException | None = None
            exit_code = 0
    
            if isinstance(args, str):
                args = shlex.split(args)
    
            try:
                prog_name = extra.pop("prog_name")
            except KeyError:
                prog_name = self.get_default_prog_name(cli)
    
            try:
                return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
            except SystemExit as e:
                exc_info = sys.exc_info()
                e_code = t.cast("int | t.Any | None", e.code)
    
                if e_code is None:
                    e_code = 0
    
                if e_code != 0:
                    exception = e
    
                if not isinstance(e_code, int):
                    sys.stdout.write(str(e_code))
                    sys.stdout.write("\n")
                    e_code = 1
    
                exit_code = e_code
    
            except Exception as e:
                if not catch_exceptions:
                    raise
                exception = e
                exit_code = 1
                exc_info = sys.exc_info()
            finally:
                sys.stdout.flush()
                sys.stderr.flush()
>               stdout = outstreams[0].getvalue()
                         ^^^^^^^^^^^^^^^^^^^^^^^^
E               ValueError: I/O operation on closed file.

venv/lib/python3.11.../site-packages/click/testing.py:531: ValueError
tests.cli.dataset_cmd.test_dataset_command::test_dataset_sync_bidirectional
Stack Traces | 0.249s run time
setup_teardown_dataset = None
graph_client = DataHubGraph: configured to talk to http://localhost:8080 with token: eyJh**********wmv4
auth_session = <tests.utils.TestSessionWrapper object at 0x7f1b4a8ce0d0>

    def test_dataset_sync_bidirectional(
        setup_teardown_dataset, graph_client: DataHubGraph, auth_session
    ):
        """Test bidirectional sync with modifications on both sides"""
        with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
            temp_file_path = Path(tmp.name)
            try:
                # 1. Create initial dataset in YAML
                create_dataset_yaml(temp_file_path)
    
                # 2. Sync to DataHub
>               run_cli_command(
                    f"dataset sync -f {temp_file_path} --to-datahub", auth_session
                )

.../cli/dataset_cmd/test_dataset_command.py:191: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/dataset_cmd/test_dataset_command.py:74: in run_cli_command
    result = run_datahub_cmd(
tests/utils.py:239: in run_datahub_cmd
    return runner.invoke(datahub, command, input=input, env=env)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <click.testing.CliRunner object at 0x7f1b3eda2150>, cli = <Group datahub>
args = ['dataset', 'sync', '-f', '/tmp/tmpsjc2fluy.yml', '--to-datahub']
input = None
env = {'DATAHUB_GMS_TOKEN': 'eyJhbGciOiJIUzI1NiJ9.eyJhY3RvclR5cGUiOiJVU0VSIiwiYWN0b3JJZCI6ImRhdGFodWIiLCJ0eXBlIjoiUEVSU09OQU...odWItbWV0YWRhdGEtc2VydmljZSJ9.uGdIgTnM3fuRBBi3TqMjkkvvo9f9UWjFau6krUVwmv4', 'DATAHUB_GMS_URL': 'http://localhost:8080'}
catch_exceptions = True, color = False, extra = {}
exc_info = (<class 'SystemExit'>, SystemExit(0), <traceback object at 0x7f1b4019b500>)
outstreams = (<click.testing.BytesIOCopy object at 0x7f1b3c8e3fb0>, <click.testing.BytesIOCopy object at 0x7f1b3c8e3e20>, <_io.BytesIO object at 0x7f1b3c8e3c40>)
return_value = None, exception = None, exit_code = 0, prog_name = 'datahub'

    def invoke(
        self,
        cli: Command,
        args: str | cabc.Sequence[str] | None = None,
        input: str | bytes | t.IO[t.Any] | None = None,
        env: cabc.Mapping[str, str | None] | None = None,
        catch_exceptions: bool | None = None,
        color: bool = False,
        **extra: t.Any,
    ) -> Result:
        """Invokes a command in an isolated environment.  The arguments are
        forwarded directly to the command line script, the `extra` keyword
        arguments are passed to the :meth:`~clickpkg.Command.main` function of
        the command.
    
        This returns a :class:`Result` object.
    
        :param cli: the command to invoke
        :param args: the arguments to invoke. It may be given as an iterable
                     or a string. When given as string it will be interpreted
                     as a Unix shell command. More details at
                     :func:`shlex.split`.
        :param input: the input data for `sys.stdin`.
        :param env: the environment overrides.
        :param catch_exceptions: Whether to catch any other exceptions than
                                 ``SystemExit``. If :data:`None`, the value
                                 from :class:`CliRunner` is used.
        :param extra: the keyword arguments to pass to :meth:`main`.
        :param color: whether the output should contain color codes. The
                      application can still override this explicitly.
    
        .. versionadded:: 8.2
            The result object has the ``output_bytes`` attribute with
            the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would
            see it in its terminal.
    
        .. versionchanged:: 8.2
            The result object always returns the ``stderr_bytes`` stream.
    
        .. versionchanged:: 8.0
            The result object has the ``return_value`` attribute with
            the value returned from the invoked command.
    
        .. versionchanged:: 4.0
            Added the ``color`` parameter.
    
        .. versionchanged:: 3.0
            Added the ``catch_exceptions`` parameter.
    
        .. versionchanged:: 3.0
            The result object has the ``exc_info`` attribute with the
            traceback if available.
        """
        exc_info = None
        if catch_exceptions is None:
            catch_exceptions = self.catch_exceptions
    
        with self.isolation(input=input, env=env, color=color) as outstreams:
            return_value = None
            exception: BaseException | None = None
            exit_code = 0
    
            if isinstance(args, str):
                args = shlex.split(args)
    
            try:
                prog_name = extra.pop("prog_name")
            except KeyError:
                prog_name = self.get_default_prog_name(cli)
    
            try:
                return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
            except SystemExit as e:
                exc_info = sys.exc_info()
                e_code = t.cast("int | t.Any | None", e.code)
    
                if e_code is None:
                    e_code = 0
    
                if e_code != 0:
                    exception = e
    
                if not isinstance(e_code, int):
                    sys.stdout.write(str(e_code))
                    sys.stdout.write("\n")
                    e_code = 1
    
                exit_code = e_code
    
            except Exception as e:
                if not catch_exceptions:
                    raise
                exception = e
                exit_code = 1
                exc_info = sys.exc_info()
            finally:
                sys.stdout.flush()
                sys.stderr.flush()
>               stdout = outstreams[0].getvalue()
                         ^^^^^^^^^^^^^^^^^^^^^^^^
E               ValueError: I/O operation on closed file.

venv/lib/python3.11.../site-packages/click/testing.py:531: ValueError
tests.cli.dataset_cmd.test_dataset_command::test_dataset_sync_to_datahub
Stack Traces | 0.382s run time
setup_teardown_dataset = None
graph_client = DataHubGraph: configured to talk to http://localhost:8080 with token: eyJh**********wmv4
auth_session = <tests.utils.TestSessionWrapper object at 0x7f1b4a8ce0d0>

    def test_dataset_sync_to_datahub(
        setup_teardown_dataset, graph_client: DataHubGraph, auth_session
    ):
        """Test syncing dataset from YAML to DataHub"""
        with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
            temp_file_path = Path(tmp.name)
            try:
                # Create a dataset YAML file
                create_dataset_yaml(temp_file_path)
    
                # Run the CLI command to sync to DataHub
                cmd = f"dataset sync -f {temp_file_path} --to-datahub"
>               result = run_cli_command(cmd, auth_session)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.../cli/dataset_cmd/test_dataset_command.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/dataset_cmd/test_dataset_command.py:74: in run_cli_command
    result = run_datahub_cmd(
tests/utils.py:239: in run_datahub_cmd
    return runner.invoke(datahub, command, input=input, env=env)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <click.testing.CliRunner object at 0x7f1b3c73d910>, cli = <Group datahub>
args = ['dataset', 'sync', '-f', '/tmp/tmpoo5hju5a.yml', '--to-datahub']
input = None
env = {'DATAHUB_GMS_TOKEN': 'eyJhbGciOiJIUzI1NiJ9.eyJhY3RvclR5cGUiOiJVU0VSIiwiYWN0b3JJZCI6ImRhdGFodWIiLCJ0eXBlIjoiUEVSU09OQU...odWItbWV0YWRhdGEtc2VydmljZSJ9.uGdIgTnM3fuRBBi3TqMjkkvvo9f9UWjFau6krUVwmv4', 'DATAHUB_GMS_URL': 'http://localhost:8080'}
catch_exceptions = True, color = False, extra = {}
exc_info = (<class 'SystemExit'>, SystemExit(0), <traceback object at 0x7f1b40933cc0>)
outstreams = (<click.testing.BytesIOCopy object at 0x7f1b3f108680>, <click.testing.BytesIOCopy object at 0x7f1b3f10bba0>, <_io.BytesIO object at 0x7f1b3f108540>)
return_value = None, exception = None, exit_code = 0, prog_name = 'datahub'

    def invoke(
        self,
        cli: Command,
        args: str | cabc.Sequence[str] | None = None,
        input: str | bytes | t.IO[t.Any] | None = None,
        env: cabc.Mapping[str, str | None] | None = None,
        catch_exceptions: bool | None = None,
        color: bool = False,
        **extra: t.Any,
    ) -> Result:
        """Invokes a command in an isolated environment.  The arguments are
        forwarded directly to the command line script, the `extra` keyword
        arguments are passed to the :meth:`~clickpkg.Command.main` function of
        the command.
    
        This returns a :class:`Result` object.
    
        :param cli: the command to invoke
        :param args: the arguments to invoke. It may be given as an iterable
                     or a string. When given as string it will be interpreted
                     as a Unix shell command. More details at
                     :func:`shlex.split`.
        :param input: the input data for `sys.stdin`.
        :param env: the environment overrides.
        :param catch_exceptions: Whether to catch any other exceptions than
                                 ``SystemExit``. If :data:`None`, the value
                                 from :class:`CliRunner` is used.
        :param extra: the keyword arguments to pass to :meth:`main`.
        :param color: whether the output should contain color codes. The
                      application can still override this explicitly.
    
        .. versionadded:: 8.2
            The result object has the ``output_bytes`` attribute with
            the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would
            see it in its terminal.
    
        .. versionchanged:: 8.2
            The result object always returns the ``stderr_bytes`` stream.
    
        .. versionchanged:: 8.0
            The result object has the ``return_value`` attribute with
            the value returned from the invoked command.
    
        .. versionchanged:: 4.0
            Added the ``color`` parameter.
    
        .. versionchanged:: 3.0
            Added the ``catch_exceptions`` parameter.
    
        .. versionchanged:: 3.0
            The result object has the ``exc_info`` attribute with the
            traceback if available.
        """
        exc_info = None
        if catch_exceptions is None:
            catch_exceptions = self.catch_exceptions
    
        with self.isolation(input=input, env=env, color=color) as outstreams:
            return_value = None
            exception: BaseException | None = None
            exit_code = 0
    
            if isinstance(args, str):
                args = shlex.split(args)
    
            try:
                prog_name = extra.pop("prog_name")
            except KeyError:
                prog_name = self.get_default_prog_name(cli)
    
            try:
                return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
            except SystemExit as e:
                exc_info = sys.exc_info()
                e_code = t.cast("int | t.Any | None", e.code)
    
                if e_code is None:
                    e_code = 0
    
                if e_code != 0:
                    exception = e
    
                if not isinstance(e_code, int):
                    sys.stdout.write(str(e_code))
                    sys.stdout.write("\n")
                    e_code = 1
    
                exit_code = e_code
    
            except Exception as e:
                if not catch_exceptions:
                    raise
                exception = e
                exit_code = 1
                exc_info = sys.exc_info()
            finally:
                sys.stdout.flush()
                sys.stderr.flush()
>               stdout = outstreams[0].getvalue()
                         ^^^^^^^^^^^^^^^^^^^^^^^^
E               ValueError: I/O operation on closed file.

venv/lib/python3.11.../site-packages/click/testing.py:531: ValueError
tests.cli.delete_cmd.test_timeseries_delete::test_timeseries_delete
Stack Traces | 36.6s run time
auth_session = <tests.utils.TestSessionWrapper object at 0x7f40d239ccd0>

    def test_timeseries_delete(auth_session: Any) -> None:
        num_test_profiles: int = 10
        verification_batch_size: int = int(num_test_profiles / 2)
        num_latest_profiles_to_delete = 2
        expected_profile_after_latest_deletion: DatasetProfileClass
        delete_ts_start: str
        delete_ts_end: str
        # 1. Ingest `num_test_profiles` datasetProfile aspects against the test_dataset_urn via put
        # and validate using get.
        for i, dataset_profile in enumerate(gen_dataset_profiles(num_test_profiles)):
            # Use put command to ingest the aspect value.
            datahub_put_profile(auth_session, dataset_profile)
            # Validate against all ingested values once every verification_batch_size to reduce overall test time. Since we
            # are ingesting  the aspects in the ascending order of timestampMillis, get should return the one just put.
            if (i % verification_batch_size) == 0:
>               datahub_get_and_verify_profile(auth_session, dataset_profile)

.../cli/delete_cmd/test_timeseries_delete.py:123: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

auth_session = <tests.utils.TestSessionWrapper object at 0x7f40d239ccd0>
expected_profile = DatasetProfileClass({'timestampMillis': 1762905600000, 'eventGranularity': TimeWindowSizeClass({'unit': 'DAY', 'multip... 'quantiles': None, 'distinctValueFrequencies': None, 'histogram': None, 'sampleValues': None})], 'sizeInBytes': None})

    def datahub_get_and_verify_profile(
        auth_session,
        expected_profile: Optional[DatasetProfileClass],
    ) -> None:
        # Wait for writes to stabilize in elastic
        sync_elastic()
        get_args: List[str] = ["get", "--urn", test_dataset_urn, "-a", test_aspect_name]
        get_result = run_datahub_cmd(
            get_args,
            env={
                "DATAHUB_GMS_URL": auth_session.gms_url(),
                "DATAHUB_GMS_TOKEN": auth_session.gms_token(),
            },
        )
    
        if expected_profile is None:
            assert get_result.exit_code != 0
            assert (
                test_dataset_urn in get_result.stderr and "not found" in get_result.stderr
            ), f"Got stderr of {get_result.stderr} in get_and_verify_profile"
        else:
            assert get_result.exit_code == 0
            try:
                get_result_output_obj: Dict = json.loads(get_result.stdout)
            except JSONDecodeError as e:
                print("Failed to decode: " + get_result.stdout, file=sys.stderr)
                raise e
    
            profile_from_get = DatasetProfileClass.from_obj(
                get_result_output_obj["datasetProfile"]
            )
>           assert profile_from_get == expected_profile
E           AssertionError: assert DatasetProfileClass({'timestampMillis': 1763510400000, 'eventGranularity': TimeWindowSizeClass({'unit': 'DAY', 'multiple': 1}), 'partitionSpec': PartitionSpecClass({'partition': 'FULL_TABLE_SNAPSHOT', 'timePartition': None, 'type': 'FULL_TABLE'}), 'messageId': None, 'rowCount': 800, 'columnCount': 1, 'fieldProfiles': [DatasetFieldProfileClass({'fieldPath': 'test_column', 'uniqueCount': 450, 'uniqueProportion': 0.5, 'nullCount': 90, 'nullProportion': 0.1, 'min': '10', 'max': '20', 'mean': '15', 'median': '12', 'stdev': '3', 'quantiles': None, 'distinctValueFrequencies': None, 'histogram': None, 'sampleValues': None})], 'sizeInBytes': None}) == DatasetProfileClass({'timestampMillis': 1762905600000, 'eventGranularity': TimeWindowSizeClass({'unit': 'DAY', 'multiple': 1}), 'partitionSpec': PartitionSpecClass({'partition': 'FULL_TABLE_SNAPSHOT', 'timePartition': None, 'type': 'FULL_TABLE'}), 'messageId': None, 'rowCount': 100, 'columnCount': 1, 'fieldProfiles': [DatasetFieldProfileClass({'fieldPath': 'test_column', 'uniqueCount': 100, 'uniqueProportion': 0.5, 'nullCount': 20, 'nullProportion': 0.1, 'min': '10', 'max': '20', 'mean': '15', 'median': '12', 'stdev': '3', 'quantiles': None, 'distinctValueFrequencies': None, 'histogram': None, 'sampleValues': None})], 'sizeInBytes': None})

.../cli/delete_cmd/test_timeseries_delete.py:84: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Nov 21, 2025
@anshbansal anshbansal force-pushed the ab-2025-nov-21-fix-logging branch from 6664e94 to 96729c0 Compare November 21, 2025 06:19
@datahub-cyborg datahub-cyborg bot added pending-submitter-merge and removed needs-review Label for PRs that need review from a maintainer. labels Nov 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pending-submitter-merge smoke_test Contains changes related to smoke tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants