Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions neo4j/v1/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@


# Signature bytes for each message type
INIT = b"\x01" # 0000 0001 // INIT <user_agent>
INIT = b"\x01" # 0000 0001 // INIT <user_agent> <auth>
ACK_FAILURE = b"\x0E" # 0000 1110 // ACK_FAILURE
RESET = b"\x0F" # 0000 1111 // RESET
RUN = b"\x10" # 0001 0000 // RUN <statement> <parameters>
DISCARD_ALL = b"\x2F" # 0010 1111 // DISCARD *
Expand All @@ -57,6 +58,7 @@

message_names = {
INIT: "INIT",
ACK_FAILURE: "ACK_FAILURE",
RESET: "RESET",
RUN: "RUN",
DISCARD_ALL: "DISCARD_ALL",
Expand Down Expand Up @@ -224,7 +226,7 @@ def __init__(self, sock, **config):
self.der_encoded_server_certificate = config.get("der_encoded_server_certificate")

def on_failure(metadata):
raise ProtocolError(metadata.get("message", "Inititalisation failed"))
raise ProtocolError(metadata.get("message", "INIT failed"))

response = Response(self)
response.on_failure = on_failure
Expand Down Expand Up @@ -253,6 +255,23 @@ def append(self, signature, fields=(), response=None):
self.channel.flush(end_of_message=True)
self.responses.append(response)

def acknowledge_failure(self):
""" Add an ACK_FAILURE message to the outgoing queue, send
it and consume all remaining messages.
"""
response = Response(self)

def on_failure(metadata):
raise ProtocolError("ACK_FAILURE failed")

response.on_failure = on_failure

self.append(ACK_FAILURE, response=response)
self.send()
fetch = self.fetch
while not response.complete:
fetch()

def reset(self):
""" Add a RESET message to the outgoing queue, send
it and consume all remaining messages.
Expand Down Expand Up @@ -304,7 +323,7 @@ def fetch(self):
response.complete = True
self.responses.popleft()
if signature == FAILURE:
self.reset()
self.acknowledge_failure()
handler_name = "on_%s" % message_names[signature].lower()
try:
handler = getattr(response, handler_name)
Expand Down