diff --git a/src/neo4j/_work/summary.py b/src/neo4j/_work/summary.py index 3029c53f5..5c81478df 100644 --- a/src/neo4j/_work/summary.py +++ b/src/neo4j/_work/summary.py @@ -139,6 +139,33 @@ def __getattr__(self, key): f"'{self.__class__.__name__}' object has no attribute '{key}'" ) + @staticmethod + def _notification_from_status(status: dict) -> dict: + notification = {} + for notification_key, status_key in ( + ("title", "title"), + ("code", "neo4j_code"), + ("description", "description"), + ): + if status_key in status: + notification[notification_key] = status[status_key] + + if "diagnostic_record" in status: + diagnostic_record = status["diagnostic_record"] + if not isinstance(diagnostic_record, dict): + diagnostic_record = {} + + for notification_key, diag_record_key in ( + ("severity", "_severity"), + ("category", "_classification"), + ("position", "_position"), + ): + if diag_record_key in diagnostic_record: + notification[notification_key] = \ + diagnostic_record[diag_record_key] + + return notification + def _set_notifications(self): if "notifications" in self.metadata: notifications = self.metadata["notifications"] @@ -156,53 +183,10 @@ def _set_notifications(self): return notifications = [] for status in statuses: - if not isinstance(status, dict): - continue - notification = {} - failed = False - for notification_key, status_key in ( - ("title", "title"), - ("code", "neo4j_code"), - ("description", "description"), - ): - value = status.get(status_key) - if not isinstance(value, str) or not value: - failed = True - break - notification[notification_key] = value - if failed: - continue - diagnostic_record = status.get("diagnostic_record") - if not isinstance(diagnostic_record, dict): - continue - for notification_key, diag_record_key in ( - ("severity", "_severity"), - ("category", "_classification"), - ): - value = diagnostic_record.get(diag_record_key) - if not isinstance(value, str) or not value: - failed = True - break - notification[notification_key] = value - if failed: - continue - raw_position = diagnostic_record.get("_position") - if not isinstance(raw_position, dict): - continue - position = {} - for pos_key, raw_pos_key in ( - ("line", "line"), - ("column", "column"), - ("offset", "offset"), - ): - value = raw_position.get(raw_pos_key) - if not isinstance(value, int) or isinstance(value, bool): - failed = True - break - position[pos_key] = value - if failed: + if not (isinstance(status , dict) and "neo4j_code" in status): + # not a notification status continue - notification["position"] = position + notification = self._notification_from_status(status) notifications.append(notification) self.notifications = notifications return diff --git a/tests/unit/common/work/test_summary.py b/tests/unit/common/work/test_summary.py index bb3610716..88ea56cff 100644 --- a/tests/unit/common/work/test_summary.py +++ b/tests/unit/common/work/test_summary.py @@ -23,6 +23,9 @@ if t.TYPE_CHECKING: import typing_extensions as te + _T = t.TypeVar("_T") + _TDict = t.TypeVar("_TDict", bound=dict) + import pytest from neo4j import ( @@ -1499,7 +1502,7 @@ def test_no_notification_from_status(raw_status, summary_args_kwargs) -> None: ), # _position maps to position - # * extra fields are ignored + # * extra fields are copied to raw data ( {}, {"_position": {"line": 1234, "column": 0, "offset": -9999}}, @@ -1512,11 +1515,17 @@ def test_no_notification_from_status(raw_status, summary_args_kwargs) -> None: ), ( {}, - {"_position": - {"line": 1234, "column": 0, "offset": -9999, "extra": "hi :)"} + { + "_position": { + "line": 1234, "column": 0, "offset": -9999, + "extra": "hi :)" + } }, { - "raw_position": {"line": 1234, "column": 0, "offset": -9999}, + "raw_position": { + "line": 1234, "column": 0, "offset": -9999, + "extra": "hi :)" + }, "position": SummaryInputPosition( line=1234, column=0, offset=-9999 ), @@ -1632,6 +1641,338 @@ def test_notification_from_status( == expected_overwrite.get("position", default_position)) +@pytest.mark.parametrize( + "status_in", + ( + None, + 1, + False, + ["neo4j_code"], + "neo4j_code" + ) +) +def test_no_notification_from_wrong_type_status( + status_in, summary_args_kwargs +) -> None: + args, kwargs = summary_args_kwargs + kwargs["metadata"]["statuses"] = [status_in] + + summary = ResultSummary(*args, **kwargs) + notifications = summary.notifications + summary_notifications = summary.summary_notifications + + assert notifications == [] + assert summary_notifications == [] + + +def _get_from_dict( + dict_: _TDict, + key: t.Sequence, + default: t.Any = None, +) -> t.Any: + if not key: + raise ValueError("key must not be empty") + target = dict_ + for key_ in key[:-1]: + if key_ not in target: + return default + target_any = target[key_] + if not isinstance(target, dict): + return default + target = target_any + return target.get(key[-1], default) + + +def _del_from_dict( + dict_: _TDict, + del_key: t.Sequence, +) -> _TDict: + if not del_key: + raise ValueError("del_key must not be empty") + target = dict_ + for i, key in enumerate(del_key[:-1]): + if not isinstance(target.get(key), dict): + raise TypeError( + f"Expected dict at {del_key!r}[{i}], got {target!r}" + ) + target = target[key] + del target[del_key[-1]] + + return dict_ + + +def _set_in_dict( + dict_: _TDict, + set_key: t.Sequence, + set_value: t.Any, + *, + create_if_missing: bool = True, +) -> _TDict: + if not set_key: + raise ValueError("set_key must not be empty") + target = dict_ + for i, key in enumerate(set_key[:-1]): + if create_if_missing: + target = target.setdefault(key, {}) + else: + if key not in target: + return dict_ + target = target[key] + if not isinstance(target, dict): + raise TypeError( + f"Expected dict at {set_key!r}[{i}], got {target!r}" + ) + + if not create_if_missing and set_key[-1] not in target: + return dict_ + target[set_key[-1]] = set_value + + return dict_ + + +def _make_raw_status_obj( + *, + del_keys: t.Iterable[t.Tuple[str, ...]] = (), + replace: t.Optional[t.Dict[t.Tuple[str, ...], t.Any]] = None +) -> t.Dict[str, t.Any]: + raw_status: t.Dict[str, t.Any] = { + "gql_status": "03BAZ", + "status_description": "note: successful completion - custom stuff", + "description": "nice message", + "neo4j_code": "Neo.Foo.Bar.Baz", + "title": "Some cool title which defo is dope!", + "diagnostic_record": { + "OPERATION": "", + "OPERATION_CODE": "0", + "CURRENT_SCHEMA": "/", + "_status_parameters": {}, + "_severity": "INFORMATION", + "_classification": "HINT", + "_position": { + "line": 1337, + "column": 42, + "offset": 420, + }, + } + } + + if replace is None: + replace = {} + for key, value in replace.items(): + _set_in_dict(raw_status, key, value) + + for del_key in del_keys: + _del_from_dict(raw_status, del_key) + + return raw_status + + +def _make_raw_notification_obj( + *, + del_keys: t.Iterable[t.Tuple[str, ...]] = (), + replace: t.Optional[t.Dict[t.Tuple[str, ...], t.Any]] = None +) -> t.Dict[str, t.Any]: + raw_notification_obj: t.Dict[str, t.Any] = { + "code": "Neo.Foo.Bar.Baz", + "description": "nice message", + "title": "Some cool title which defo is dope!", + "severity": "INFORMATION", + "category": "HINT", + "position": { + "line": 1337, + "column": 42, + "offset": 420, + }, + } + + key_translation: t.Dict[ + t.Tuple[str, ...], + t.Tuple[t.Tuple[str, ...], ...] + ] = { + ("neo4j_code",): ( + ("code",), + ), + ("description",): ( + ("description",), + ), + ("title",): ( + ("title",), + ), + ("diagnostic_record", "_severity"): ( + ("severity",), + ), + ("diagnostic_record",): ( + ("severity",), + ("category",), + ("position",), + ), + ("diagnostic_record", "_classification"): ( + ("category",), + ), + ("diagnostic_record", "_position"): ( + ("position",), + ), + } + + # dicts the driver will copy 1-to-1 including unknown keys + copied_dicts = ( + ("diagnostic_record", "_position"), + ) + + del_keys_list = list(del_keys) + + if replace is None: + replace = {} + for key, value in replace.items(): + create_if_missing = False + translated_keys = key_translation.get(key, ()) + if not translated_keys: + for copied_dict in copied_dicts: + if key[:len(copied_dict)] == copied_dict: + create_if_missing = True + translated_keys = tuple( + (*k, *key[len(copied_dict):]) + for k in key_translation[copied_dict] + ) + break + else: + continue + if len(translated_keys) > 1: + # original key points to dict/list + # => cannot be translated by the driver's polyfill + # => expect it to not be present in the polyfilled value + del_keys_list.append(key) + continue + translated_key = translated_keys[0] + _set_in_dict( + raw_notification_obj, + translated_key, + value, + # position is copied as is + create_if_missing=create_if_missing + ) + + for key in del_keys_list: + translated_keys = key_translation.get(key, ()) + if not translated_keys: + for copied_dict in copied_dicts: + if key[:len(copied_dict)] == copied_dict: + translated_keys = tuple( + (*k, *key[len(copied_dict):]) + for k in key_translation[copied_dict] + ) + break + else: + continue + for translated_key in translated_keys: + _del_from_dict(raw_notification_obj, translated_key) + + return raw_notification_obj + + +def test_no_notification_from_status_without_neo4j_code( + summary_args_kwargs +) -> None: + raw_status = _make_raw_status_obj(del_keys=(("neo4j_code",),)) + + args, kwargs = summary_args_kwargs + kwargs["metadata"]["statuses"] = [raw_status] + + summary = ResultSummary(*args, **kwargs) + notifications = summary.notifications + summary_notifications = summary.summary_notifications + + assert notifications == [] + assert summary_notifications == [] + + +@pytest.mark.parametrize( + "del_key", + ( + ("gql_status",), + ("status_description",), + ("description",), + ("title",), + ("diagnostic_record",), + ("diagnostic_record", "OPERATION"), + ("diagnostic_record", "OPERATION_CODE"), + ("diagnostic_record", "CURRENT_SCHEMA"), + ("diagnostic_record", "_status_parameters"), + ("diagnostic_record", "_severity"), + ("diagnostic_record", "_classification"), + ("diagnostic_record", "_position"), + ("diagnostic_record", "_position", "line"), + ("diagnostic_record", "_position", "column"), + ("diagnostic_record", "_position", "offset"), + ) +) +def test_notification_from_incomplete_status( + del_key: t.Tuple[str, ...], summary_args_kwargs +) -> None: + raw_status = _make_raw_status_obj(del_keys=(del_key,)) + raw_notification = _make_raw_notification_obj(del_keys=(del_key,)) + + args, kwargs = summary_args_kwargs + kwargs["metadata"]["statuses"] = [raw_status] + + summary = ResultSummary(*args, **kwargs) + notifications = summary.notifications + summary_notifications = summary.summary_notifications + + assert notifications == [raw_notification] + + assert summary_notifications == [ + SummaryNotification._from_metadata(raw_notification) + ] + + +@pytest.mark.parametrize( + "set_key", + ( + ("neo4j_code",), + ("gql_status",), + ("status_description",), + ("description",), + ("title",), + ("",), + ("diagnostic_record",), + ("diagnostic_record", "OPERATION"), + ("diagnostic_record", "OPERATION_CODE"), + ("diagnostic_record", "CURRENT_SCHEMA"), + ("diagnostic_record", "_status_parameters"), + ("diagnostic_record", "_severity"), + ("diagnostic_record", "_classification"), + ("diagnostic_record", ""), + ("diagnostic_record", "_position"), + ("diagnostic_record", "_position", "line"), + ("diagnostic_record", "_position", "column"), + ("diagnostic_record", "_position", "offset"), + ("diagnostic_record", "_position", ""), + ) +) +@pytest.mark.parametrize("set_value", + (None, 1, 3.14159, False, [], {}, "", "hi :)")) +def test_notification_from_unexpected_status( + set_key: t.Tuple[str, ...], set_value: t.Any, summary_args_kwargs, +) -> None: + replace = {set_key: set_value} + raw_status = _make_raw_status_obj(replace=replace) + raw_notification = _make_raw_notification_obj(replace=replace) + + args, kwargs = summary_args_kwargs + kwargs["metadata"]["statuses"] = [raw_status] + + summary = ResultSummary(*args, **kwargs) + notifications = summary.notifications + summary_notifications = summary.summary_notifications + + assert notifications == [raw_notification] + + assert summary_notifications == [ + SummaryNotification._from_metadata(raw_notification) + ] + + def _test_status(): return { "gql_status": "12345", @@ -1673,60 +2014,10 @@ def _test_status_broken_key(key_path, value): ( None, 1, + False, {}, + [], True, - _test_status_missing_key(("status_description",)), - _test_status_missing_key(("neo4j_code",)), - _test_status_missing_key(("title",)), - _test_status_missing_key(("diagnostic_record",)), - _test_status_missing_key(("diagnostic_record", "_severity")), - _test_status_missing_key(("diagnostic_record", "_classification")), - _test_status_missing_key(("diagnostic_record", "_position")), - _test_status_missing_key(("diagnostic_record", "_position", "line")), - _test_status_missing_key(("diagnostic_record", "_position", "column")), - _test_status_missing_key(("diagnostic_record", "_position", "offset")), - - _test_status_broken_key(("status_description",), - None), - _test_status_broken_key(("neo4j_code",), - None), - _test_status_broken_key(("title",), - None), - _test_status_broken_key(("diagnostic_record",), - None), - _test_status_broken_key(("diagnostic_record", "_severity"), - None), - _test_status_broken_key(("diagnostic_record", "_classification"), - None), - _test_status_broken_key(("diagnostic_record", "_position"), - None), - _test_status_broken_key(("diagnostic_record", "_position", "line"), - None), - _test_status_broken_key(("diagnostic_record", "_position", "column"), - None), - _test_status_broken_key(("diagnostic_record", "_position", "offset"), - None), - - _test_status_broken_key(("status_description",), - False), - _test_status_broken_key(("neo4j_code",), - False), - _test_status_broken_key(("title",), - False), - _test_status_broken_key(("diagnostic_record",), - False), - _test_status_broken_key(("diagnostic_record", "_severity"), - False), - _test_status_broken_key(("diagnostic_record", "_classification"), - False), - _test_status_broken_key(("diagnostic_record", "_position"), - False), - _test_status_broken_key(("diagnostic_record", "_position", "line"), - False), - _test_status_broken_key(("diagnostic_record", "_position", "column"), - False), - _test_status_broken_key(("diagnostic_record", "_position", "offset"), - False), ) ) def test_notification_from_broken_status( @@ -1742,140 +2033,6 @@ def test_notification_from_broken_status( assert notifications == [] -@pytest.mark.parametrize( - "in_status", - ( - _test_status_missing_key(("diagnostic_record",)), - - _test_status_broken_key(("diagnostic_record",), None), - _test_status_broken_key(("diagnostic_record",), 1), - _test_status_broken_key(("diagnostic_record",), True), - _test_status_broken_key(("diagnostic_record",), [None]), - ) -) -def test_broken_diagnostic_record(in_status, summary_args_kwargs) -> None: - args, kwargs = summary_args_kwargs - kwargs["metadata"]["statuses"] = [in_status] - - summary = ResultSummary(*args, **kwargs) - - with pytest.warns(PreviewWarning, match="GQLSTATUS"): - statuses = summary.gql_status_objects - assert len(statuses) == 1 - status = statuses[0] - - assert status.diagnostic_record == {} - assert status.position is None - assert status.raw_classification is None - assert status.classification is NotificationClassification.UNKNOWN - assert status.raw_severity is None - assert status.severity is NotificationSeverity.UNKNOWN - - -@pytest.mark.parametrize( - ("status_overwrite", "diagnostic_record_overwrite"), - ( - *( - ({"description": value}, {}) - for value in t.cast(t.Iterable[t.Any], - ("", None, ..., 1, False, [], {})) - ), - *( - ({"title": value}, {}) - for value in t.cast(t.Iterable[t.Any], - ("", None, ..., 1, False, [], {})) - ), - *( - ({}, {"_severity": value}) - for value in t.cast(t.Iterable[t.Any], - ("", None, ..., 1, False, [], {})) - ), - *( - ({}, {"_classification": value}) - for value in t.cast(t.Iterable[t.Any], - ("", None, ..., 1, False, [], {})) - ), - *( - ({}, {"_position": value}) - for value in t.cast( - t.Iterable[t.Any], ( - None, ..., 1, False, [], {}, - {"column": 1, "offset": 1}, - {"line": 1, "offset": 1}, - {"line": 1, "column": 1}, - {"line": None, "column": 1, "offset": 1}, - {"line": 1, "column": None, "offset": 1}, - {"line": 1, "column": 1, "offset": None}, - {"line": False, "column": 1, "offset": 1}, - {"line": 1, "column": False, "offset": 1}, - {"line": 1, "column": 1, "offset": False}, - {"line": [], "column": 1, "offset": 1}, - {"line": 1, "column": [], "offset": 1}, - {"line": 1, "column": 1, "offset": []}, - {"line": 1.0, "column": 1, "offset": 1}, - {"line": 1, "column": 1.0, "offset": 1}, - {"line": 1, "column": 1, "offset": 1.0}, - {"line": "1", "column": 1, "offset": 1}, - {"line": 1, "column": "1", "offset": 1}, - {"line": 1, "column": 1, "offset": "1"}, - ) - ) - ), - ) -) -def test_no_notification_from_broken_status( - status_overwrite, diagnostic_record_overwrite, summary_args_kwargs -) -> None: - default_status = "03BAZ" - default_status_description = "note: successful completion - custom stuff" - default_description = "some description" - default_code = "Neo.Foo.Bar.Baz" - default_title = "Some cool title which defo is dope!" - default_severity = "INFORMATION" - default_classification = "HINT" - default_position = SummaryInputPosition(line=1337, column=42, offset=420) - raw_status_obj: t.Dict[str, t.Any] = { - "gql_status": default_status, - "description": default_description, - "status_description": default_status_description, - "neo4j_code": default_code, - "title": default_title, - "diagnostic_record": { - "OPERATION": "", - "OPERATION_CODE": "0", - "CURRENT_SCHEMA": "/", - "_status_parameters": {}, - "_severity": default_severity, - "_classification": default_classification, - "_position": { - "line": default_position.line, - "column": default_position.column, - "offset": default_position.offset, - }, - } - } - for key, value in diagnostic_record_overwrite.items(): - if value is ...: - del raw_status_obj["diagnostic_record"][key] - else: - raw_status_obj["diagnostic_record"][key] = value - for key, value in status_overwrite.items(): - if value is ...: - del raw_status_obj[key] - else: - raw_status_obj[key] = value - - args, kwargs = summary_args_kwargs - kwargs["metadata"]["statuses"] = [raw_status_obj] - - summary = ResultSummary(*args, **kwargs) - notifications = summary.notifications - summary_notifications = summary.summary_notifications - - assert notifications == [] - assert summary_notifications == [] - - def test_notifications_from_statuses_keep_order( summary_args_kwargs, ) -> None: @@ -1912,3 +2069,167 @@ def test_notifications_from_statuses_keep_order( 3, "INFORMATION") helper.assert_parsed_notification_is_status(summary_notifications[3], 1, "WARNING") + + +def _make_summary_notification( + overwrite: t.Optional[t.Dict[str, t.Any]] = None +) -> SummaryNotification: + if overwrite is None: + overwrite = {} + return SummaryNotification( + title=overwrite.get("title", ""), + code=overwrite.get("code", ""), + description=overwrite.get("description", ""), + severity_level=overwrite.get( + "severity_level", NotificationSeverity.UNKNOWN + ), + category=overwrite.get( + "category", NotificationCategory.UNKNOWN + ), + raw_severity_level=overwrite.get("raw_severity_level", ""), + raw_category=overwrite.get("raw_category", ""), + position=overwrite.get("position") + ) + + +@pytest.mark.parametrize( + ("meta", "expected"), + ( + # empty meta data + ({}, _make_summary_notification()), + + # all string fields + *( + ( + {key: str_value}, + _make_summary_notification({key: str_value}) + ) + for key in ("title", "code", "description") + for str_value in ("", " ", "Foo Bar Baz") + ), + *( + ( + {key: title}, + _make_summary_notification() + ) + for key in ("title", "code", "description") + for title in t.cast(t.Iterable, + (True, False, 1, [], {}, 1.2345, None)) + ), + + # severity + *( + ( + {"severity": raw_value}, + _make_summary_notification({ + "raw_severity_level": raw_value, + "severity_level": parsed_value, + }) + ) + for raw_value, parsed_value in ( + ("", NotificationSeverity.UNKNOWN), + (" ", NotificationSeverity.UNKNOWN), + ("Foo Bar", NotificationSeverity.UNKNOWN), + ("WARNING", NotificationSeverity.WARNING), + ("INFORMATION", NotificationSeverity.INFORMATION), + ) + ), + *( + ( + {"severity": value}, + _make_summary_notification() + ) + for value in t.cast(t.Iterable, + (True, False, 1, [], {}, 1.2345, None)) + ), + + # category + *( + ( + {"category": raw_value}, + _make_summary_notification({ + "raw_category": raw_value, + "category": parsed_value, + }) + ) + for raw_value, parsed_value in ( + ("", NotificationCategory.UNKNOWN), + (" ", NotificationCategory.UNKNOWN), + ("Foo Bar", NotificationCategory.UNKNOWN), + ("HINT", NotificationCategory.HINT), + ("UNRECOGNIZED", NotificationCategory.UNRECOGNIZED), + ("UNSUPPORTED", NotificationCategory.UNSUPPORTED), + ("PERFORMANCE", NotificationCategory.PERFORMANCE), + ("DEPRECATION", NotificationCategory.DEPRECATION), + ("GENERIC", NotificationCategory.GENERIC), + ("SECURITY", NotificationCategory.SECURITY), + ("TOPOLOGY", NotificationCategory.TOPOLOGY), + ) + ), + *( + ( + {"category": value}, + _make_summary_notification() + ) + for value in t.cast(t.Iterable, + (True, False, 1, [], {}, 1.2345, None)) + ), + + # position + *( + ( + {"position": raw_value}, + _make_summary_notification({"position": parsed_value}) + ) + for raw_value, parsed_value in ( + ( + {"line": 1, "column": 2, "offset": 3}, + SummaryInputPosition(line=1, column=2, offset=3) + ), + ( + {"line": -100, "column": -200, "offset": -300}, + SummaryInputPosition(line=-100, column=-200, offset=-300) + ), + ( + {"line": 0, "column": 0, "offset": 0}, + SummaryInputPosition(line=0, column=0, offset=0) + ), + # extraneous values are ignored + *( + ( + {"line": 1, "column": 2, "offset": 3, "extra": extra}, + SummaryInputPosition(line=1, column=2, offset=3) + ) + for extra in t.cast( + t.Iterable, + (None, 1, False, [], {}, "hi :)", "5", 1.2345) + ) + ), + ) + ), + # missing field/field of wrong type => position is default value + *( + ( + {"position": value}, + _make_summary_notification() + ) + for value in ( + *( + {"line": 1, "column": 2, "offset": 3, key: value} + for key in ("line", "column", "offset") + for value in t.cast(t.Iterable, + (1.2, None, False, True, [], {}, "1")) + ), + {"column": 2, "offset": 3}, + {"line": 1, "offset": 3}, + {"line": 1, "column": 2}, + ) + ), + ) +) +def test_notification_from_meta_data( + meta: t.Dict, + expected: SummaryNotification, +) -> None: + notification = SummaryNotification._from_metadata(meta) + assert notification == expected