@@ -44,21 +44,64 @@ cdef class CoreProtocol:
4444 if mtype == b' S' :
4545 # ParameterStatus
4646 self ._parse_msg_parameter_status()
47- continue
47+
4848 elif mtype == b' A' :
4949 # NotificationResponse
5050 self ._parse_msg_notification()
51- continue
51+
5252 elif mtype == b' N' :
5353 # 'N' - NoticeResponse
5454 self ._on_notice(self ._parse_msg_error_response(False ))
55- continue
5655
57- if state == PROTOCOL_AUTH:
56+ elif mtype == b' E' :
57+ # ErrorResponse
58+ self ._parse_msg_error_response(True )
59+ self ._push_result()
60+
61+ elif mtype == b' Z' :
62+ # ReadyForQuery
63+ # Auth and SimpleQuery subprotocols use
64+ # ReadyForQuery as the final result indicator,
65+ # _but_ SimpleQuery sends *both* ErrorResponse and
66+ # ReadyForQuery, so there's a check for error state.
67+ # In client-side exceptional states, ReadyForQuery
68+ # serves as a synchronization point, indicating
69+ # when it is safe to push the error into the
70+ # result waiter.
71+ self ._parse_msg_ready_for_query()
72+
73+ if (state == PROTOCOL_AUTH or
74+ state == PROTOCOL_CANCELLED or
75+ state == PROTOCOL_ERROR_CONSUME or
76+ (state == PROTOCOL_SIMPLE_QUERY and
77+ self .result_type == RESULT_OK)):
78+ self ._push_result()
79+
80+ elif state == PROTOCOL_BIND_EXECUTE_MANY:
81+ if self .result_type == RESULT_FAILED:
82+ self ._push_result()
83+ else :
84+ try :
85+ buf = < WriteBuffer> next(self ._execute_iter)
86+ except StopIteration :
87+ self ._push_result()
88+ except Exception as e:
89+ self .result_type = RESULT_FAILED
90+ self .result = e
91+ self ._push_result()
92+ else :
93+ # Next iteration over the executemany()
94+ # arg sequence.
95+ self ._send_bind_message(
96+ self ._execute_portal_name,
97+ self ._execute_stmt_name,
98+ buf, 0 )
99+
100+ elif state == PROTOCOL_AUTH:
58101 self ._process__auth(mtype)
59102
60- elif state == PROTOCOL_PREPARE :
61- self ._process__prepare (mtype)
103+ elif state == PROTOCOL_PARSE_DESCRIBE :
104+ self ._process__parse_describe (mtype)
62105
63106 elif state == PROTOCOL_BIND_EXECUTE:
64107 self ._process__bind_execute(mtype)
@@ -93,42 +136,26 @@ cdef class CoreProtocol:
93136
94137 elif state == PROTOCOL_CANCELLED:
95138 # discard all messages until the sync message
96- if mtype == b' E' :
97- self ._parse_msg_error_response(True )
98- elif mtype == b' Z' :
99- self ._parse_msg_ready_for_query()
100- self ._push_result()
101- else :
102- self .buffer.consume_message()
139+ self .buffer.consume_message()
103140
104141 elif state == PROTOCOL_ERROR_CONSUME:
105142 # Error in protocol (on asyncpg side);
106143 # discard all messages until sync message
107-
108- if mtype == b' Z' :
109- # Sync point, self to push the result
110- if self .result_type != RESULT_FAILED:
111- self .result_type = RESULT_FAILED
112- self .result = apg_exc.InternalClientError(
113- ' unknown error in protocol implementation' )
114-
115- self ._push_result()
116-
117- else :
118- self .buffer.consume_message()
144+ self .buffer.consume_message()
119145
120146 else :
121147 raise apg_exc.InternalClientError(
122148 ' protocol is in an unknown state {}' .format(state))
123149
124150 except Exception as ex:
151+ self .state = PROTOCOL_ERROR_CONSUME
125152 self .result_type = RESULT_FAILED
126153 self .result = ex
127154
128155 if mtype == b' Z' :
156+ # This should only happen if _parse_msg_ready_for_query()
157+ # has failed.
129158 self ._push_result()
130- else :
131- self .state = PROTOCOL_ERROR_CONSUME
132159
133160 finally :
134161 if self ._skip_discard:
@@ -153,43 +180,27 @@ cdef class CoreProtocol:
153180 # BackendKeyData
154181 self ._parse_msg_backend_key_data()
155182
156- elif mtype == b' E' :
157- # ErrorResponse
158- self .con_status = CONNECTION_BAD
159- self ._parse_msg_error_response(True )
160- self ._push_result()
161-
162- elif mtype == b' Z' :
163- # ReadyForQuery
164- self ._parse_msg_ready_for_query()
165- self .con_status = CONNECTION_OK
166- self ._push_result()
167-
168- cdef _process__prepare(self , char mtype):
169- if mtype == b' t' :
170- # Parameters description
171- self .result_param_desc = self .buffer.consume_message().as_bytes()
183+ # push_result() will be initiated by handling
184+ # ReadyForQuery or ErrorResponse in the main loop.
172185
173- elif mtype == b' 1' :
186+ cdef _process__parse_describe(self , char mtype):
187+ if mtype == b' 1' :
174188 # ParseComplete
175189 self .buffer.consume_message()
176190
191+ elif mtype == b' t' :
192+ # ParameterDescription
193+ self .result_param_desc = self .buffer.consume_message().as_bytes()
194+
177195 elif mtype == b' T' :
178- # Row description
196+ # RowDescription
179197 self .result_row_desc = self .buffer.consume_message().as_bytes()
180-
181- elif mtype == b' E' :
182- # ErrorResponse
183- self ._parse_msg_error_response(True )
184-
185- elif mtype == b' Z' :
186- # ReadyForQuery
187- self ._parse_msg_ready_for_query()
188198 self ._push_result()
189199
190200 elif mtype == b' n' :
191201 # NoData
192202 self .buffer.consume_message()
203+ self ._push_result()
193204
194205 cdef _process__bind_execute(self , char mtype):
195206 if mtype == b' D' :
@@ -199,28 +210,22 @@ cdef class CoreProtocol:
199210 elif mtype == b' s' :
200211 # PortalSuspended
201212 self .buffer.consume_message()
213+ self ._push_result()
202214
203215 elif mtype == b' C' :
204216 # CommandComplete
205217 self .result_execute_completed = True
206218 self ._parse_msg_command_complete()
207-
208- elif mtype == b' E' :
209- # ErrorResponse
210- self ._parse_msg_error_response(True )
219+ self ._push_result()
211220
212221 elif mtype == b' 2' :
213222 # BindComplete
214223 self .buffer.consume_message()
215224
216- elif mtype == b' Z' :
217- # ReadyForQuery
218- self ._parse_msg_ready_for_query()
219- self ._push_result()
220-
221225 elif mtype == b' I' :
222226 # EmptyQueryResponse
223227 self .buffer.consume_message()
228+ self ._push_result()
224229
225230 cdef _process__bind_execute_many(self , char mtype):
226231 cdef WriteBuffer buf
@@ -237,64 +242,24 @@ cdef class CoreProtocol:
237242 # CommandComplete
238243 self ._parse_msg_command_complete()
239244
240- elif mtype == b' E' :
241- # ErrorResponse
242- self ._parse_msg_error_response(True )
243-
244245 elif mtype == b' 2' :
245246 # BindComplete
246247 self .buffer.consume_message()
247248
248- elif mtype == b' Z' :
249- # ReadyForQuery
250- self ._parse_msg_ready_for_query()
251- if self .result_type == RESULT_FAILED:
252- self ._push_result()
253- else :
254- try :
255- buf = < WriteBuffer> next(self ._execute_iter)
256- except StopIteration :
257- self ._push_result()
258- except Exception as e:
259- self .result_type = RESULT_FAILED
260- self .result = e
261- self ._push_result()
262- else :
263- # Next iteration over the executemany() arg sequence
264- self ._send_bind_message(
265- self ._execute_portal_name, self ._execute_stmt_name,
266- buf, 0 )
267-
268249 elif mtype == b' I' :
269250 # EmptyQueryResponse
270251 self .buffer.consume_message()
271252
272253 cdef _process__bind(self , char mtype):
273- if mtype == b' E' :
274- # ErrorResponse
275- self ._parse_msg_error_response(True )
276-
277- elif mtype == b' 2' :
254+ if mtype == b' 2' :
278255 # BindComplete
279256 self .buffer.consume_message()
280-
281- elif mtype == b' Z' :
282- # ReadyForQuery
283- self ._parse_msg_ready_for_query()
284257 self ._push_result()
285258
286259 cdef _process__close_stmt_portal(self , char mtype):
287- if mtype == b' E' :
288- # ErrorResponse
289- self ._parse_msg_error_response(True )
290-
291- elif mtype == b' 3' :
260+ if mtype == b' 3' :
292261 # CloseComplete
293262 self .buffer.consume_message()
294-
295- elif mtype == b' Z' :
296- # ReadyForQuery
297- self ._parse_msg_ready_for_query()
298263 self ._push_result()
299264
300265 cdef _process__simple_query(self , char mtype):
@@ -304,42 +269,21 @@ cdef class CoreProtocol:
304269 # 'T' - RowDescription
305270 self .buffer.consume_message()
306271
307- elif mtype == b' E' :
308- # ErrorResponse
309- self ._parse_msg_error_response(True )
310-
311- elif mtype == b' Z' :
312- # ReadyForQuery
313- self ._parse_msg_ready_for_query()
314- self ._push_result()
315-
316272 elif mtype == b' C' :
317273 # CommandComplete
318274 self ._parse_msg_command_complete()
319-
320275 else :
321276 # We don't really care about COPY IN etc
322277 self .buffer.consume_message()
323278
324279 cdef _process__copy_out(self , char mtype):
325- if mtype == b' E' :
326- self ._parse_msg_error_response(True )
327-
328- elif mtype == b' H' :
280+ if mtype == b' H' :
329281 # CopyOutResponse
330282 self ._set_state(PROTOCOL_COPY_OUT_DATA)
331283 self .buffer.consume_message()
332284
333- elif mtype == b' Z' :
334- # ReadyForQuery
335- self ._parse_msg_ready_for_query()
336- self ._push_result()
337-
338285 cdef _process__copy_out_data(self , char mtype):
339- if mtype == b' E' :
340- self ._parse_msg_error_response(True )
341-
342- elif mtype == b' d' :
286+ if mtype == b' d' :
343287 # CopyData
344288 self ._parse_copy_data_msgs()
345289
@@ -351,37 +295,18 @@ cdef class CoreProtocol:
351295 elif mtype == b' C' :
352296 # CommandComplete
353297 self ._parse_msg_command_complete()
354-
355- elif mtype == b' Z' :
356- # ReadyForQuery
357- self ._parse_msg_ready_for_query()
358298 self ._push_result()
359299
360300 cdef _process__copy_in(self , char mtype):
361- if mtype == b' E' :
362- self ._parse_msg_error_response(True )
363-
364- elif mtype == b' G' :
301+ if mtype == b' G' :
365302 # CopyInResponse
366303 self ._set_state(PROTOCOL_COPY_IN_DATA)
367304 self .buffer.consume_message()
368305
369- elif mtype == b' Z' :
370- # ReadyForQuery
371- self ._parse_msg_ready_for_query()
372- self ._push_result()
373-
374306 cdef _process__copy_in_data(self , char mtype):
375- if mtype == b' E' :
376- self ._parse_msg_error_response(True )
377-
378- elif mtype == b' C' :
307+ if mtype == b' C' :
379308 # CommandComplete
380309 self ._parse_msg_command_complete()
381-
382- elif mtype == b' Z' :
383- # ReadyForQuery
384- self ._parse_msg_ready_for_query()
385310 self ._push_result()
386311
387312 cdef _parse_msg_command_complete(self ):
@@ -739,7 +664,7 @@ cdef class CoreProtocol:
739664 WriteBuffer buf
740665
741666 self ._ensure_connected()
742- self ._set_state(PROTOCOL_PREPARE )
667+ self ._set_state(PROTOCOL_PARSE_DESCRIBE )
743668
744669 buf = WriteBuffer.new_message(b' P' )
745670 buf.write_str(stmt_name, self .encoding)
0 commit comments