@@ -489,7 +489,7 @@ cdef class ReadBuffer:
489489
490490 self ._ensure_first_buf()
491491
492- cdef int32_t has_message (self ) except - 1 :
492+ cdef int32_t take_message (self ) except - 1 :
493493 cdef:
494494 const char * cbuf
495495
@@ -525,8 +525,24 @@ cdef class ReadBuffer:
525525 self ._current_message_ready = 1
526526 return 1
527527
528- cdef inline int32_t has_message_type(self , char mtype) except - 1 :
529- return self .has_message() and self .get_message_type() == mtype
528+ cdef inline int32_t take_message_type(self , char mtype) except - 1 :
529+ cdef const char * buf0
530+
531+ if self ._current_message_ready:
532+ return self ._current_message_type == mtype
533+ elif self ._length >= 1 :
534+ self ._ensure_first_buf()
535+ buf0 = cpython.PyBytes_AS_STRING(self ._buf0)
536+
537+ return buf0[self ._pos0] == mtype and self .take_message()
538+ else :
539+ return 0
540+
541+ cdef int32_t put_message(self ) except - 1 :
542+ if not self ._current_message_ready:
543+ raise BufferError(' cannot put message: no message taken' )
544+ self ._current_message_ready = False
545+ return 0
530546
531547 cdef inline const char * try_consume_message(self , ssize_t* len ):
532548 cdef:
@@ -541,7 +557,7 @@ cdef class ReadBuffer:
541557 buf = self ._try_read_bytes(buf_len)
542558 if buf != NULL :
543559 len [0 ] = buf_len
544- self ._discard_message ()
560+ self ._finish_message ()
545561 return buf
546562
547563 cdef Memory consume_message(self ):
@@ -551,7 +567,7 @@ cdef class ReadBuffer:
551567 mem = self .read(self ._current_message_len_unread)
552568 else :
553569 mem = None
554- self ._discard_message ()
570+ self ._finish_message ()
555571 return mem
556572
557573 cdef bytearray consume_messages(self , char mtype):
@@ -562,7 +578,7 @@ cdef class ReadBuffer:
562578 ssize_t total_bytes = 0
563579 bytearray result
564580
565- if not self .has_message_type (mtype):
581+ if not self .take_message_type (mtype):
566582 return None
567583
568584 # consume_messages is a volume-oriented method, so
@@ -571,26 +587,24 @@ cdef class ReadBuffer:
571587 result = PyByteArray_FromStringAndSize(NULL , self ._length)
572588 buf = PyByteArray_AsString(result)
573589
574- while self .has_message_type (mtype):
590+ while self .take_message_type (mtype):
575591 nbytes = self ._current_message_len_unread
576592 self ._read(buf, nbytes)
577593 buf += nbytes
578594 total_bytes += nbytes
579- self ._discard_message ()
595+ self ._finish_message ()
580596
581597 # Clamp the result to an actual size read.
582598 PyByteArray_Resize(result, total_bytes)
583599
584600 return result
585601
586- cdef discard_message(self ):
587- if self ._current_message_type == 0 :
588- # Already discarded
602+ cdef finish_message(self ):
603+ if self ._current_message_type == 0 or not self ._current_message_ready:
604+ # The message has already been finished (e.g by consume_message()),
605+ # or has been put back by put_message().
589606 return
590607
591- if not self ._current_message_ready:
592- raise BufferError(' no message to discard' )
593-
594608 if self ._current_message_len_unread:
595609 if ASYNCPG_DEBUG:
596610 mtype = chr (self ._current_message_type)
@@ -602,9 +616,9 @@ cdef class ReadBuffer:
602616 mtype,
603617 (< Memory> discarded).as_bytes()))
604618
605- self ._discard_message ()
619+ self ._finish_message ()
606620
607- cdef inline _discard_message (self ):
621+ cdef inline _finish_message (self ):
608622 self ._current_message_type = 0
609623 self ._current_message_len = 0
610624 self ._current_message_ready = 0
0 commit comments