diff --git a/nutkit/frontend/driver.py b/nutkit/frontend/driver.py index 6a39eb378..cca43bd69 100644 --- a/nutkit/frontend/driver.py +++ b/nutkit/frontend/driver.py @@ -67,6 +67,26 @@ def send_and_receive(self, req, timeout=None, hooks=None, *, return self.receive(timeout=timeout, hooks=hooks, allow_resolution=allow_resolution) + def execute_query(self, cypher, params=None, + routing=None, database=None, + impersonated_user=None, bookmark_manager=...): + config = { + "routing": routing, + "database": database, + "impersonatedUser": impersonated_user + } + + if bookmark_manager is None: + config["bookmarkManagerId"] = -1 + elif bookmark_manager is not Ellipsis: + config["bookmarkManagerId"] = bookmark_manager.id + + req = protocol.ExecuteQuery(self._driver.id, cypher, params, config) + res = self.send_and_receive(req, allow_resolution=True) + if not isinstance(res, protocol.EagerResult): + raise Exception("Should be EagerResult but was: %s" % res) + return res + def verify_connectivity(self): req = protocol.VerifyConnectivity(self._driver.id) res = self.send_and_receive(req, allow_resolution=True) diff --git a/nutkit/protocol/feature.py b/nutkit/protocol/feature.py index b602dad14..91cad48e3 100644 --- a/nutkit/protocol/feature.py +++ b/nutkit/protocol/feature.py @@ -13,6 +13,9 @@ class Feature(Enum): # is picked up instead or we need to wait until the full pool depletes. API_CONNECTION_ACQUISITION_TIMEOUT = \ "Feature:API:ConnectionAcquisitionTimeout" + # The driver offers a method to run a query in a retryable context at the + # driver object level. + API_DRIVER_EXECUTE_QUERY = "Feature:API:Driver.ExecuteQuery" # The driver offers a method for checking if a connection to the remote # server of cluster can be established and retrieve the server info of the # reached remote. diff --git a/nutkit/protocol/requests.py b/nutkit/protocol/requests.py index 8519eecc7..f620bac97 100644 --- a/nutkit/protocol/requests.py +++ b/nutkit/protocol/requests.py @@ -592,3 +592,30 @@ def __init__(self, result_id, record_key, type_name, field_id): self.recordKey = record_key self.type = type_name self.field = field_id + + +class ExecuteQuery: + """ + Request to execute a query in a retriable context. + + Backend should return EagerResult or a Error response. + + :param driver_id: The id of the driver where the cypher query has to run. + :param cypher: The cypher query which to run. + :param params: The cypher query params. + :param config: The configuration + :param config.database: The database where the query will run. + :param config.routing: The type of routing ("w" for Writers, + "r" for "Readers") + :param config.impersonatedUser: The user which will be impersonated + :param config.bookmarkManagerId: The id of the bookmark manager + used in the query. None or not define for using the default, + -1 for disabling the BookmarkManager + """ + + def __init__(self, driver_id, cypher, params, config): + self.driverId = driver_id + self.cypher = cypher + self.params = params + if config: + self.config = config diff --git a/nutkit/protocol/responses.py b/nutkit/protocol/responses.py index f982a2723..09b748150 100644 --- a/nutkit/protocol/responses.py +++ b/nutkit/protocol/responses.py @@ -495,6 +495,13 @@ def __init__(self, inUse, idle): self.idle = idle +class EagerResult: + def __init__(self, keys, records, summary): + self.keys = keys + self.records = [Record(**record) for record in records or []] + self.summary = Summary(**summary) + + class BaseError(Exception): """ Base class for all types of errors, should not be sent from backend. diff --git a/tests/stub/driver_execute_query/__init__.py b/tests/stub/driver_execute_query/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/stub/driver_execute_query/scripts/router.script b/tests/stub/driver_execute_query/scripts/router.script new file mode 100644 index 000000000..ffb8fc5a4 --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/router.script @@ -0,0 +1,10 @@ +!: BOLT 5.0 +!: ALLOW RESTART + +A: HELLO {"{}": "*"} +{+ + C: ROUTE "*" "*" "*" + S: SUCCESS {"rt": {"ttl": 1000, "db": "adb", "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9020"], "role":"WRITE"}]}} + ?: RESET ++} +?: GOODBYE diff --git a/tests/stub/driver_execute_query/scripts/router_invert_reader_and_writer_second_call.script b/tests/stub/driver_execute_query/scripts/router_invert_reader_and_writer_second_call.script new file mode 100644 index 000000000..eb8843d18 --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/router_invert_reader_and_writer_second_call.script @@ -0,0 +1,12 @@ +!: BOLT 5.0 +!: ALLOW RESTART + +A: HELLO {"{}": "*"} +C: ROUTE "*" "*" "*" +S: SUCCESS {"rt": {"ttl": 1000, "db": "adb", "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9020"], "role":"WRITE"}]}} +?: RESET +C: ROUTE "*" "*" "*" +S: SUCCESS {"rt": {"ttl": 1000, "db": "adb", "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9020"], "role":"READ"}, {"addresses": ["#HOST#:9010"], "role":"WRITE"}]}} +?: RESET + +?: GOODBYE diff --git a/tests/stub/driver_execute_query/scripts/router_with_db_name.script b/tests/stub/driver_execute_query/scripts/router_with_db_name.script new file mode 100644 index 000000000..b9e5d2de1 --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/router_with_db_name.script @@ -0,0 +1,8 @@ +!: BOLT 5.0 +!: ALLOW RESTART + +A: HELLO {"{}": "*"} +C: ROUTE "*" "*" {"db":"neo4j"} +S: SUCCESS {"rt": {"ttl": 1000, "db": "neo4j", "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9020"], "role":"WRITE"}]}} +*: RESET +?: GOODBYE diff --git a/tests/stub/driver_execute_query/scripts/router_with_impersonation.script b/tests/stub/driver_execute_query/scripts/router_with_impersonation.script new file mode 100644 index 000000000..fb2599403 --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/router_with_impersonation.script @@ -0,0 +1,8 @@ +!: BOLT 5.0 +!: ALLOW RESTART + +A: HELLO {"{}": "*"} +C: ROUTE "*" "*" {"imp_user": "that-other-dude"} +S: SUCCESS {"rt": {"ttl": 1000, "db": "adb", "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9020"], "role":"WRITE"}]}} +*: RESET +?: GOODBYE diff --git a/tests/stub/driver_execute_query/scripts/transaction_chaining.script b/tests/stub/driver_execute_query/scripts/transaction_chaining.script new file mode 100644 index 000000000..5e3be6248 --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/transaction_chaining.script @@ -0,0 +1,39 @@ +!: BOLT 5.0 + +A: HELLO {"{}": "*"} +*: RESET + +C: BEGIN {"[db]": "*"} +S: SUCCESS {} +C: RUN "CREATE (p:Person {name:$name}) RETURN p.name AS name" {"name": "the person"} {} +S: SUCCESS {"fields": ["name"]} +C: PULL {"n": 1000} +S: RECORD ["the person"] + SUCCESS {"type": "w"} +C: COMMIT +S: SUCCESS {"bookmark": "bm1"} +*: RESET + +{{ + C: BEGIN {"[db]": "*"} + S: SUCCESS {} + C: RUN "MATCH (p:Person {name:$name}) RETURN p.name AS name" {"name": "the person"} {} + S: SUCCESS {"fields": ["name"]} + C: PULL {"n": 1000} + S: SUCCESS {"type": "r"} + C: COMMIT + S: SUCCESS {"bookmark": "bm1"} +---- + C: BEGIN {"bookmarks": ["bm1"], "[db]": "*"} + S: SUCCESS {} + C: RUN "MATCH (p:Person {name:$name}) RETURN p.name AS name" {"name": "the person"} {} + S: SUCCESS {"fields": ["name"]} + C: PULL {"n": 1000} + S: RECORD ["the person"] + SUCCESS {"type": "r"} + C: COMMIT + S: SUCCESS {"bookmark": "bm2"} +}} +*: RESET + +?: GOODBYE diff --git a/tests/stub/driver_execute_query/scripts/transaction_chaining_custom_bmm.script b/tests/stub/driver_execute_query/scripts/transaction_chaining_custom_bmm.script new file mode 100644 index 000000000..3ee125b35 --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/transaction_chaining_custom_bmm.script @@ -0,0 +1,39 @@ +!: BOLT 5.0 + +A: HELLO {"{}": "*"} +*: RESET + +C: BEGIN {"bookmarks": ["other_db:bm1"], "[db]": "*"} +S: SUCCESS {} +C: RUN "CREATE (p:Person {name:$name}) RETURN p.name AS name" {"name": "a person"} {} +S: SUCCESS {"fields": ["name"]} +C: PULL {"n": 1000} +S: RECORD ["a person"] + SUCCESS {"type": "w"} +C: COMMIT +S: SUCCESS {"bookmark": "bm1"} +*: RESET + +{{ + C: BEGIN {"bookmarks": ["other_db:bm1"], "[db]": "*"} + S: SUCCESS {} + C: RUN "MATCH (p:Person {name:$name}) RETURN p.name AS name" {"name": "a person"} {} + S: SUCCESS {"fields": ["name"]} + C: PULL {"n": 1000} + S: SUCCESS {"type": "r"} + C: COMMIT + S: SUCCESS {"bookmark": "bm1"} +---- + C: BEGIN {"bookmarks{}": ["bm1", "other_db:bm1"], "[db]": "*"} + S: SUCCESS {} + C: RUN "MATCH (p:Person {name:$name}) RETURN p.name AS name" {"name": "a person"} {} + S: SUCCESS {"fields": ["name"]} + C: PULL {"n": 1000} + S: RECORD ["a person"] + SUCCESS {"type": "r"} + C: COMMIT + S: SUCCESS {"bookmark": "bm2"} +}} +*: RESET + +?: GOODBYE diff --git a/tests/stub/driver_execute_query/scripts/tx_return_1.script b/tests/stub/driver_execute_query/scripts/tx_return_1.script new file mode 100644 index 000000000..b47f8ab0c --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/tx_return_1.script @@ -0,0 +1,15 @@ +!: BOLT 5.0 + +A: HELLO {"{}": "*"} +*: RESET +C: BEGIN {"{}": "*"} +S: SUCCESS {} +C: RUN "RETURN 1 AS n" {} {} +S: SUCCESS {"fields": ["n"], "qid": 1} +C: PULL {"n": 1000} +S: RECORD [1] + SUCCESS {"type": "r"} +C: COMMIT +S: SUCCESS {"bookmark": "neo4j:bookmark:v1:tx424242"} +*: RESET +?: GOODBYE diff --git a/tests/stub/driver_execute_query/scripts/tx_return_1_disconnect_on_pull.script b/tests/stub/driver_execute_query/scripts/tx_return_1_disconnect_on_pull.script new file mode 100644 index 000000000..a92aad920 --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/tx_return_1_disconnect_on_pull.script @@ -0,0 +1,10 @@ +!: BOLT 5.0 + +A: HELLO {"{}": "*"} +*: RESET +C: BEGIN {"{}": "*"} +S: SUCCESS {} +C: RUN "RETURN 1 AS n" {} {} +S: SUCCESS {"fields": ["n"], "qid": 1} +C: PULL {"n": 1000} +S: diff --git a/tests/stub/driver_execute_query/scripts/tx_return_1_transaction_terminated_on_pull.script b/tests/stub/driver_execute_query/scripts/tx_return_1_transaction_terminated_on_pull.script new file mode 100644 index 000000000..149ac6d5c --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/tx_return_1_transaction_terminated_on_pull.script @@ -0,0 +1,12 @@ +!: BOLT 5.0 + +A: HELLO {"{}": "*"} +*: RESET +C: BEGIN {"{}": "*"} +S: SUCCESS {} +C: RUN "RETURN 1 AS n" {} {} +S: SUCCESS {"fields": ["n"], "qid": 1} +C: PULL {"n": 1000} +S: FAILURE {"code": "Neo.ClientError.Transaction.Terminated", "message": "message"} +*: RESET +?: GOODBYE diff --git a/tests/stub/driver_execute_query/scripts/tx_return_1_with_impersonation.script b/tests/stub/driver_execute_query/scripts/tx_return_1_with_impersonation.script new file mode 100644 index 000000000..1d71ef007 --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/tx_return_1_with_impersonation.script @@ -0,0 +1,15 @@ +!: BOLT 5.0 + +A: HELLO {"{}": "*"} +*: RESET +C: BEGIN {"imp_user": "that-other-dude", "db": "adb"} +S: SUCCESS {} +C: RUN "RETURN 1 AS n" {} {} +S: SUCCESS {"fields": ["n"], "qid": 1} +C: PULL {"n": 1000} +S: RECORD [1] + SUCCESS {"type": "r"} +C: COMMIT +S: SUCCESS {"bookmark": "neo4j:bookmark:v1:tx424242"} +*: RESET +?: GOODBYE diff --git a/tests/stub/driver_execute_query/scripts/tx_return_1_with_params.script b/tests/stub/driver_execute_query/scripts/tx_return_1_with_params.script new file mode 100644 index 000000000..dccdcd065 --- /dev/null +++ b/tests/stub/driver_execute_query/scripts/tx_return_1_with_params.script @@ -0,0 +1,15 @@ +!: BOLT 5.0 + +A: HELLO {"{}": "*"} +*: RESET +C: BEGIN {"{}": "*"} +S: SUCCESS {} +C: RUN "RETURN $a AS n" { "a": 1 } {} +S: SUCCESS {"fields": ["n"], "qid": 1} +C: PULL {"n": 1000} +S: RECORD [1] + SUCCESS {"type": "r", "db": "#DB#"} +C: COMMIT +S: SUCCESS {"bookmark": "neo4j:bookmark:v1:tx424242"} +*: RESET +?: GOODBYE diff --git a/tests/stub/driver_execute_query/test_driver_execute_query.py b/tests/stub/driver_execute_query/test_driver_execute_query.py new file mode 100644 index 000000000..13835f243 --- /dev/null +++ b/tests/stub/driver_execute_query/test_driver_execute_query.py @@ -0,0 +1,294 @@ +from nutkit.frontend import ( + BookmarkManager, + Driver, + Neo4jBookmarkManagerConfig, +) +import nutkit.protocol as types +from tests.shared import TestkitTestCase +from tests.stub.shared import StubServer + + +class TestDriverExecuteQuery(TestkitTestCase): + required_features = ( + types.Feature.BOLT_5_0, + types.Feature.API_BOOKMARK_MANAGER, + types.Feature.API_DRIVER_EXECUTE_QUERY + ) + + def setUp(self): + super().setUp() + self._router = StubServer(9000) + self._reader = StubServer(9010) + self._writer = StubServer(9020) + self._driver = None + + def tearDown(self): + if self._driver: + self._driver.close() + + self._router.reset() + self._reader.reset() + self._writer.reset() + return super().tearDown() + + def test_execute_query_without_params_and_config(self): + self._start_server(self._router, "router.script") + self._start_server(self._writer, "tx_return_1.script") + self._driver = self._new_driver() + + eager_result = self._driver.execute_query("RETURN 1 AS n") + + self._assert_eager_result( + eager_result, + keys=["n"], + records=[types.Record(values=[types.CypherInt(1)])], + query_type="r" + ) + + def test_execute_query_without_config(self): + self._start_server(self._router, "router.script") + self._start_server(self._writer, "tx_return_1_with_params.script") + self._driver = self._new_driver() + + eager_result = self._driver.execute_query("RETURN $a AS n", + {"a": types.CypherInt(1)}) + + self._assert_eager_result( + eager_result, + keys=["n"], + records=[types.Record(values=[types.CypherInt(1)])], + query_type="r" + ) + + def test_configure_routing_to_writers(self): + self._start_server(self._router, "router.script") + self._start_server(self._writer, "tx_return_1_with_params.script") + self._driver = self._new_driver() + + eager_result = self._driver.execute_query("RETURN $a AS n", + {"a": types.CypherInt(1)}, + routing="w") + + self._assert_eager_result( + eager_result, + keys=["n"], + records=[types.Record(values=[types.CypherInt(1)])], + query_type="r" + ) + + def test_configure_routing_to_readers(self): + self._start_server(self._router, "router.script") + self._start_server(self._reader, "tx_return_1_with_params.script") + self._driver = self._new_driver() + + eager_result = self._driver.execute_query("RETURN $a AS n", + {"a": types.CypherInt(1)}, + routing="r") + + self._assert_eager_result( + eager_result, + keys=["n"], + records=[types.Record(values=[types.CypherInt(1)])], + query_type="r" + ) + + def test_configure_database(self): + self._start_server(self._router, "router_with_db_name.script") + self._start_server( + self._writer, "tx_return_1_with_params.script", database="neo4j" + ) + self._driver = self._new_driver() + + eager_result = self._driver.execute_query("RETURN $a AS n", + {"a": types.CypherInt(1)}, + database="neo4j") + + self._assert_eager_result( + eager_result, + keys=["n"], + records=[types.Record(values=[types.CypherInt(1)])], + query_type="r" + ) + + def test_configure_impersonated_user(self): + self._start_server(self._router, "router_with_impersonation.script") + self._start_server( + self._writer, "tx_return_1_with_impersonation.script" + ) + self._driver = self._new_driver() + + eager_result = self._driver.execute_query( + "RETURN 1 AS n", + impersonated_user="that-other-dude" + ) + + self._assert_eager_result( + eager_result, + keys=["n"], + records=[types.Record(values=[types.CypherInt(1)])], + query_type="r" + ) + + def test_causal_consistency_between_query_executions(self): + self._start_server(self._router, "router.script") + self._start_server( + self._writer, "transaction_chaining.script" + ) + self._driver = self._new_driver() + + # CREATING NODE + eager_result = self._driver.execute_query( + "CREATE (p:Person {name:$name}) RETURN p.name AS name", + {"name": types.CypherString("the person")} + ) + + self._assert_eager_result( + eager_result, + keys=["name"], + records=[types.Record(values=[types.CypherString("the person")])], + query_type="w" + ) + + # READING SAME NODE + eager_result2 = self._driver.execute_query( + "MATCH (p:Person {name:$name}) RETURN p.name AS name", + {"name": types.CypherString("the person")} + ) + + self._assert_eager_result( + eager_result2, + keys=["name"], + records=[types.Record(values=[types.CypherString("the person")])], + query_type="r" + ) + + def test_disable_bookmark_manager(self): + self._start_server(self._router, "router.script") + self._start_server( + self._writer, "transaction_chaining.script" + ) + self._driver = self._new_driver() + + # CREATING NODE + eager_result = self._driver.execute_query( + "CREATE (p:Person {name:$name}) RETURN p.name AS name", + {"name": types.CypherString("the person")}, + bookmark_manager=None + ) + + self._assert_eager_result( + eager_result, + keys=["name"], + records=[types.Record(values=[types.CypherString("the person")])], + query_type="w" + ) + + # READING SAME NODE + eager_result2 = self._driver.execute_query( + "MATCH (p:Person {name:$name}) RETURN p.name AS name", + {"name": types.CypherString("the person")}, + bookmark_manager=None + ) + + self._assert_eager_result( + eager_result2, + keys=["name"], + records=[], + query_type="r" + ) + + def test_configure_custom_bookmark_manager(self): + self._start_server(self._router, "router.script") + self._start_server( + self._writer, "transaction_chaining_custom_bmm.script" + ) + bookmark_manager = BookmarkManager( + self._backend, + config=Neo4jBookmarkManagerConfig( + initial_bookmarks={"other_db": ["other_db:bm1"]} + ) + ) + self._driver = self._new_driver() + + # CREATING NODE + eager_result = self._driver.execute_query( + "CREATE (p:Person {name:$name}) RETURN p.name AS name", + {"name": types.CypherString("a person")}, + bookmark_manager=bookmark_manager + ) + + self._assert_eager_result( + eager_result, + keys=["name"], + records=[types.Record(values=[types.CypherString("a person")])], + query_type="w" + ) + + # READING SAME NODE + eager_result2 = self._driver.execute_query( + "MATCH (p:Person {name:$name}) RETURN p.name AS name", + {"name": types.CypherString("a person")}, + bookmark_manager=bookmark_manager + ) + + self._assert_eager_result( + eager_result2, + keys=["name"], + records=[types.Record(values=[types.CypherString("a person")])], + query_type="r" + ) + + bookmark_manager.close() + + def test_retry_on_retryable_error(self): + self._start_server( + self._router, "router_invert_reader_and_writer_second_call.script" + ) + self._start_server( + self._writer, "tx_return_1_disconnect_on_pull.script" + ) + self._start_server(self._reader, "tx_return_1.script") + self._driver = self._new_driver() + + eager_result = self._driver.execute_query("RETURN 1 AS n") + + self.assertEqual(["n"], eager_result.keys) + self.assertEqual([types.Record(values=[types.CypherInt(1)])], + eager_result.records) + self.assertIsNotNone(eager_result.summary) + + def test_thrown_non_retryable_error(self): + self._start_server(self._router, "router.script") + self._start_server( + self._writer, "tx_return_1_transaction_terminated_on_pull.script" + ) + + self._driver = self._new_driver() + + with self.assertRaises(types.DriverError) as exc: + self._driver.execute_query("RETURN 1 AS n") + + self.assertEqual(exc.exception.code, + "Neo.ClientError.Transaction.Terminated") + + def _assert_eager_result(self, eager_result, *, keys, records, query_type): + self.assertEqual(keys, eager_result.keys) + self.assertEqual(records, eager_result.records) + self.assertIsNotNone(eager_result.summary) + summary = eager_result.summary + self.assertEqual("Neo4j/5.0.0", summary.server_info.agent) + self.assertEqual("5.0", summary.server_info.protocol_version) + self.assertEqual(query_type, summary.query_type) + + def _start_server(self, server, script, database=""): + server.start(self.script_path(script), vars_={ + "#HOST#": self._router.host, + "#DB#": database + }) + + def _new_driver(self): + uri = "neo4j://%s" % self._router.address + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + driver = Driver(self._backend, uri, auth) + return driver