diff --git a/neo4j/v1/connection.py b/neo4j/v1/connection.py index cccdd26f9..42f9f0985 100644 --- a/neo4j/v1/connection.py +++ b/neo4j/v1/connection.py @@ -42,7 +42,8 @@ # Signature bytes for each message type -INIT = b"\x01" # 0000 0001 // INIT +INIT = b"\x01" # 0000 0001 // INIT +ACK_FAILURE = b"\x0E" # 0000 1110 // ACK_FAILURE RESET = b"\x0F" # 0000 1111 // RESET RUN = b"\x10" # 0001 0000 // RUN DISCARD_ALL = b"\x2F" # 0010 1111 // DISCARD * @@ -57,6 +58,7 @@ message_names = { INIT: "INIT", + ACK_FAILURE: "ACK_FAILURE", RESET: "RESET", RUN: "RUN", DISCARD_ALL: "DISCARD_ALL", @@ -224,7 +226,7 @@ def __init__(self, sock, **config): self.der_encoded_server_certificate = config.get("der_encoded_server_certificate") def on_failure(metadata): - raise ProtocolError(metadata.get("message", "Inititalisation failed")) + raise ProtocolError(metadata.get("message", "INIT failed")) response = Response(self) response.on_failure = on_failure @@ -253,6 +255,23 @@ def append(self, signature, fields=(), response=None): self.channel.flush(end_of_message=True) self.responses.append(response) + def acknowledge_failure(self): + """ Add an ACK_FAILURE message to the outgoing queue, send + it and consume all remaining messages. + """ + response = Response(self) + + def on_failure(metadata): + raise ProtocolError("ACK_FAILURE failed") + + response.on_failure = on_failure + + self.append(ACK_FAILURE, response=response) + self.send() + fetch = self.fetch + while not response.complete: + fetch() + def reset(self): """ Add a RESET message to the outgoing queue, send it and consume all remaining messages. @@ -304,7 +323,7 @@ def fetch(self): response.complete = True self.responses.popleft() if signature == FAILURE: - self.reset() + self.acknowledge_failure() handler_name = "on_%s" % message_names[signature].lower() try: handler = getattr(response, handler_name)