@@ -44,21 +44,56 @@ cdef class CoreProtocol:
4444 if mtype == b' S' :
4545 # ParameterStatus
4646 self ._parse_msg_parameter_status()
47- continue
47+
4848 elif mtype == b' A' :
4949 # NotificationResponse
5050 self ._parse_msg_notification()
51- continue
51+
5252 elif mtype == b' N' :
5353 # 'N' - NoticeResponse
5454 self ._on_notice(self ._parse_msg_error_response(False ))
55- continue
5655
57- if state == PROTOCOL_AUTH:
56+ elif mtype == b' E' :
57+ # ErrorResponse
58+ self ._parse_msg_error_response(True )
59+ # In all cases, except Auth, ErrorResponse will
60+ # be followed by a ReadyForQuery, which is when
61+ # _push_result() will be called.
62+ if state == PROTOCOL_AUTH:
63+ self ._push_result()
64+
65+ elif mtype == b' Z' :
66+ # ReadyForQuery
67+ self ._parse_msg_ready_for_query()
68+
69+ if state != PROTOCOL_BIND_EXECUTE_MANY:
70+ self ._push_result()
71+
72+ else :
73+ if self .result_type == RESULT_FAILED:
74+ self ._push_result()
75+ else :
76+ try :
77+ buf = < WriteBuffer> next(self ._execute_iter)
78+ except StopIteration :
79+ self ._push_result()
80+ except Exception as e:
81+ self .result_type = RESULT_FAILED
82+ self .result = e
83+ self ._push_result()
84+ else :
85+ # Next iteration over the executemany()
86+ # arg sequence.
87+ self ._send_bind_message(
88+ self ._execute_portal_name,
89+ self ._execute_stmt_name,
90+ buf, 0 )
91+
92+ elif state == PROTOCOL_AUTH:
5893 self ._process__auth(mtype)
5994
60- elif state == PROTOCOL_PREPARE :
61- self ._process__prepare (mtype)
95+ elif state == PROTOCOL_PARSE_DESCRIBE :
96+ self ._process__parse_describe (mtype)
6297
6398 elif state == PROTOCOL_BIND_EXECUTE:
6499 self ._process__bind_execute(mtype)
@@ -93,42 +128,26 @@ cdef class CoreProtocol:
93128
94129 elif state == PROTOCOL_CANCELLED:
95130 # discard all messages until the sync message
96- if mtype == b' E' :
97- self ._parse_msg_error_response(True )
98- elif mtype == b' Z' :
99- self ._parse_msg_ready_for_query()
100- self ._push_result()
101- else :
102- self .buffer.consume_message()
131+ self .buffer.consume_message()
103132
104133 elif state == PROTOCOL_ERROR_CONSUME:
105134 # Error in protocol (on asyncpg side);
106135 # discard all messages until sync message
107-
108- if mtype == b' Z' :
109- # Sync point, self to push the result
110- if self .result_type != RESULT_FAILED:
111- self .result_type = RESULT_FAILED
112- self .result = apg_exc.InternalClientError(
113- ' unknown error in protocol implementation' )
114-
115- self ._push_result()
116-
117- else :
118- self .buffer.consume_message()
136+ self .buffer.consume_message()
119137
120138 else :
121139 raise apg_exc.InternalClientError(
122140 ' protocol is in an unknown state {}' .format(state))
123141
124142 except Exception as ex:
143+ self .state = PROTOCOL_ERROR_CONSUME
125144 self .result_type = RESULT_FAILED
126145 self .result = ex
127146
128147 if mtype == b' Z' :
148+ # This should only happen if _parse_msg_ready_for_query()
149+ # has failed.
129150 self ._push_result()
130- else :
131- self .state = PROTOCOL_ERROR_CONSUME
132151
133152 finally :
134153 if self ._skip_discard:
@@ -153,43 +172,27 @@ cdef class CoreProtocol:
153172 # BackendKeyData
154173 self ._parse_msg_backend_key_data()
155174
156- elif mtype == b' E' :
157- # ErrorResponse
158- self .con_status = CONNECTION_BAD
159- self ._parse_msg_error_response(True )
160- self ._push_result()
161-
162- elif mtype == b' Z' :
163- # ReadyForQuery
164- self ._parse_msg_ready_for_query()
165- self .con_status = CONNECTION_OK
166- self ._push_result()
167-
168- cdef _process__prepare(self , char mtype):
169- if mtype == b' t' :
170- # Parameters description
171- self .result_param_desc = self .buffer.consume_message().as_bytes()
175+ # push_result() will be initiated by handling
176+ # ReadyForQuery or ErrorResponse in the main loop.
172177
173- elif mtype == b' 1' :
178+ cdef _process__parse_describe(self , char mtype):
179+ if mtype == b' 1' :
174180 # ParseComplete
175181 self .buffer.consume_message()
176182
183+ elif mtype == b' t' :
184+ # ParameterDescription
185+ self .result_param_desc = self .buffer.consume_message().as_bytes()
186+
177187 elif mtype == b' T' :
178- # Row description
188+ # RowDescription
179189 self .result_row_desc = self .buffer.consume_message().as_bytes()
180-
181- elif mtype == b' E' :
182- # ErrorResponse
183- self ._parse_msg_error_response(True )
184-
185- elif mtype == b' Z' :
186- # ReadyForQuery
187- self ._parse_msg_ready_for_query()
188190 self ._push_result()
189191
190192 elif mtype == b' n' :
191193 # NoData
192194 self .buffer.consume_message()
195+ self ._push_result()
193196
194197 cdef _process__bind_execute(self , char mtype):
195198 if mtype == b' D' :
@@ -199,28 +202,22 @@ cdef class CoreProtocol:
199202 elif mtype == b' s' :
200203 # PortalSuspended
201204 self .buffer.consume_message()
205+ self ._push_result()
202206
203207 elif mtype == b' C' :
204208 # CommandComplete
205209 self .result_execute_completed = True
206210 self ._parse_msg_command_complete()
207-
208- elif mtype == b' E' :
209- # ErrorResponse
210- self ._parse_msg_error_response(True )
211+ self ._push_result()
211212
212213 elif mtype == b' 2' :
213214 # BindComplete
214215 self .buffer.consume_message()
215216
216- elif mtype == b' Z' :
217- # ReadyForQuery
218- self ._parse_msg_ready_for_query()
219- self ._push_result()
220-
221217 elif mtype == b' I' :
222218 # EmptyQueryResponse
223219 self .buffer.consume_message()
220+ self ._push_result()
224221
225222 cdef _process__bind_execute_many(self , char mtype):
226223 cdef WriteBuffer buf
@@ -237,64 +234,24 @@ cdef class CoreProtocol:
237234 # CommandComplete
238235 self ._parse_msg_command_complete()
239236
240- elif mtype == b' E' :
241- # ErrorResponse
242- self ._parse_msg_error_response(True )
243-
244237 elif mtype == b' 2' :
245238 # BindComplete
246239 self .buffer.consume_message()
247240
248- elif mtype == b' Z' :
249- # ReadyForQuery
250- self ._parse_msg_ready_for_query()
251- if self .result_type == RESULT_FAILED:
252- self ._push_result()
253- else :
254- try :
255- buf = < WriteBuffer> next(self ._execute_iter)
256- except StopIteration :
257- self ._push_result()
258- except Exception as e:
259- self .result_type = RESULT_FAILED
260- self .result = e
261- self ._push_result()
262- else :
263- # Next iteration over the executemany() arg sequence
264- self ._send_bind_message(
265- self ._execute_portal_name, self ._execute_stmt_name,
266- buf, 0 )
267-
268241 elif mtype == b' I' :
269242 # EmptyQueryResponse
270243 self .buffer.consume_message()
271244
272245 cdef _process__bind(self , char mtype):
273- if mtype == b' E' :
274- # ErrorResponse
275- self ._parse_msg_error_response(True )
276-
277- elif mtype == b' 2' :
246+ if mtype == b' 2' :
278247 # BindComplete
279248 self .buffer.consume_message()
280-
281- elif mtype == b' Z' :
282- # ReadyForQuery
283- self ._parse_msg_ready_for_query()
284249 self ._push_result()
285250
286251 cdef _process__close_stmt_portal(self , char mtype):
287- if mtype == b' E' :
288- # ErrorResponse
289- self ._parse_msg_error_response(True )
290-
291- elif mtype == b' 3' :
252+ if mtype == b' 3' :
292253 # CloseComplete
293254 self .buffer.consume_message()
294-
295- elif mtype == b' Z' :
296- # ReadyForQuery
297- self ._parse_msg_ready_for_query()
298255 self ._push_result()
299256
300257 cdef _process__simple_query(self , char mtype):
@@ -304,42 +261,21 @@ cdef class CoreProtocol:
304261 # 'T' - RowDescription
305262 self .buffer.consume_message()
306263
307- elif mtype == b' E' :
308- # ErrorResponse
309- self ._parse_msg_error_response(True )
310-
311- elif mtype == b' Z' :
312- # ReadyForQuery
313- self ._parse_msg_ready_for_query()
314- self ._push_result()
315-
316264 elif mtype == b' C' :
317265 # CommandComplete
318266 self ._parse_msg_command_complete()
319-
320267 else :
321268 # We don't really care about COPY IN etc
322269 self .buffer.consume_message()
323270
324271 cdef _process__copy_out(self , char mtype):
325- if mtype == b' E' :
326- self ._parse_msg_error_response(True )
327-
328- elif mtype == b' H' :
272+ if mtype == b' H' :
329273 # CopyOutResponse
330274 self ._set_state(PROTOCOL_COPY_OUT_DATA)
331275 self .buffer.consume_message()
332276
333- elif mtype == b' Z' :
334- # ReadyForQuery
335- self ._parse_msg_ready_for_query()
336- self ._push_result()
337-
338277 cdef _process__copy_out_data(self , char mtype):
339- if mtype == b' E' :
340- self ._parse_msg_error_response(True )
341-
342- elif mtype == b' d' :
278+ if mtype == b' d' :
343279 # CopyData
344280 self ._parse_copy_data_msgs()
345281
@@ -351,37 +287,18 @@ cdef class CoreProtocol:
351287 elif mtype == b' C' :
352288 # CommandComplete
353289 self ._parse_msg_command_complete()
354-
355- elif mtype == b' Z' :
356- # ReadyForQuery
357- self ._parse_msg_ready_for_query()
358290 self ._push_result()
359291
360292 cdef _process__copy_in(self , char mtype):
361- if mtype == b' E' :
362- self ._parse_msg_error_response(True )
363-
364- elif mtype == b' G' :
293+ if mtype == b' G' :
365294 # CopyInResponse
366295 self ._set_state(PROTOCOL_COPY_IN_DATA)
367296 self .buffer.consume_message()
368297
369- elif mtype == b' Z' :
370- # ReadyForQuery
371- self ._parse_msg_ready_for_query()
372- self ._push_result()
373-
374298 cdef _process__copy_in_data(self , char mtype):
375- if mtype == b' E' :
376- self ._parse_msg_error_response(True )
377-
378- elif mtype == b' C' :
299+ if mtype == b' C' :
379300 # CommandComplete
380301 self ._parse_msg_command_complete()
381-
382- elif mtype == b' Z' :
383- # ReadyForQuery
384- self ._parse_msg_ready_for_query()
385302 self ._push_result()
386303
387304 cdef _parse_msg_command_complete(self ):
@@ -739,7 +656,7 @@ cdef class CoreProtocol:
739656 WriteBuffer buf
740657
741658 self ._ensure_connected()
742- self ._set_state(PROTOCOL_PREPARE )
659+ self ._set_state(PROTOCOL_PARSE_DESCRIBE )
743660
744661 buf = WriteBuffer.new_message(b' P' )
745662 buf.write_str(stmt_name, self .encoding)
0 commit comments