|
42 | 42 |
|
43 | 43 |
|
44 | 44 | # Signature bytes for each message type |
45 | | -INIT = b"\x01" # 0000 0001 // INIT <user_agent> |
| 45 | +INIT = b"\x01" # 0000 0001 // INIT <user_agent> <auth> |
| 46 | +ACK_FAILURE = b"\x0E" # 0000 1110 // ACK_FAILURE |
46 | 47 | RESET = b"\x0F" # 0000 1111 // RESET |
47 | 48 | RUN = b"\x10" # 0001 0000 // RUN <statement> <parameters> |
48 | 49 | DISCARD_ALL = b"\x2F" # 0010 1111 // DISCARD * |
|
57 | 58 |
|
58 | 59 | message_names = { |
59 | 60 | INIT: "INIT", |
| 61 | + ACK_FAILURE: "ACK_FAILURE", |
60 | 62 | RESET: "RESET", |
61 | 63 | RUN: "RUN", |
62 | 64 | DISCARD_ALL: "DISCARD_ALL", |
@@ -224,7 +226,7 @@ def __init__(self, sock, **config): |
224 | 226 | self.der_encoded_server_certificate = config.get("der_encoded_server_certificate") |
225 | 227 |
|
226 | 228 | def on_failure(metadata): |
227 | | - raise ProtocolError(metadata.get("message", "Inititalisation failed")) |
| 229 | + raise ProtocolError(metadata.get("message", "INIT failed")) |
228 | 230 |
|
229 | 231 | response = Response(self) |
230 | 232 | response.on_failure = on_failure |
@@ -253,6 +255,23 @@ def append(self, signature, fields=(), response=None): |
253 | 255 | self.channel.flush(end_of_message=True) |
254 | 256 | self.responses.append(response) |
255 | 257 |
|
| 258 | + def acknowledge_failure(self): |
| 259 | + """ Add an ACK_FAILURE message to the outgoing queue, send |
| 260 | + it and consume all remaining messages. |
| 261 | + """ |
| 262 | + response = Response(self) |
| 263 | + |
| 264 | + def on_failure(metadata): |
| 265 | + raise ProtocolError("ACK_FAILURE failed") |
| 266 | + |
| 267 | + response.on_failure = on_failure |
| 268 | + |
| 269 | + self.append(ACK_FAILURE, response=response) |
| 270 | + self.send() |
| 271 | + fetch = self.fetch |
| 272 | + while not response.complete: |
| 273 | + fetch() |
| 274 | + |
256 | 275 | def reset(self): |
257 | 276 | """ Add a RESET message to the outgoing queue, send |
258 | 277 | it and consume all remaining messages. |
@@ -304,7 +323,7 @@ def fetch(self): |
304 | 323 | response.complete = True |
305 | 324 | self.responses.popleft() |
306 | 325 | if signature == FAILURE: |
307 | | - self.reset() |
| 326 | + self.acknowledge_failure() |
308 | 327 | handler_name = "on_%s" % message_names[signature].lower() |
309 | 328 | try: |
310 | 329 | handler = getattr(response, handler_name) |
|
0 commit comments