diff --git a/libraries/botbuilder-streaming/botbuilder/streaming/payload_transport/payload_receiver.py b/libraries/botbuilder-streaming/botbuilder/streaming/payload_transport/payload_receiver.py index df054ced8..092c0f1cb 100644 --- a/libraries/botbuilder-streaming/botbuilder/streaming/payload_transport/payload_receiver.py +++ b/libraries/botbuilder-streaming/botbuilder/streaming/payload_transport/payload_receiver.py @@ -88,6 +88,7 @@ async def disconnect(self, event_args: DisconnectedEventArgs = None): async def _receive_packets(self): is_closed = False + disconnect_args = None while self._receiver and self._receiver.is_connected and not is_closed: # receive a single packet diff --git a/libraries/botbuilder-streaming/botbuilder/streaming/payload_transport/send_queue.py b/libraries/botbuilder-streaming/botbuilder/streaming/payload_transport/send_queue.py index 07d197496..4163e7558 100644 --- a/libraries/botbuilder-streaming/botbuilder/streaming/payload_transport/send_queue.py +++ b/libraries/botbuilder-streaming/botbuilder/streaming/payload_transport/send_queue.py @@ -27,7 +27,7 @@ async def _process(self): while True: try: while True: - await sleep(1) + await sleep(0.2) item = await self._queue.get() try: await self._action(item) diff --git a/libraries/botbuilder-streaming/tests/test_payload_receiver.py b/libraries/botbuilder-streaming/tests/test_payload_receiver.py new file mode 100644 index 000000000..315bd23a7 --- /dev/null +++ b/libraries/botbuilder-streaming/tests/test_payload_receiver.py @@ -0,0 +1,70 @@ +from typing import List + +import aiounittest + +from botbuilder.streaming import PayloadStream +from botbuilder.streaming.payload_transport import PayloadReceiver +from botbuilder.streaming.transport import TransportReceiverBase + + +class MockTransportReceiver(TransportReceiverBase): + def __init__(self, mock_header: bytes, mock_payload: bytes): + self._is_connected = True + self._mock_gen = self._mock_receive(mock_header, mock_payload) + + def _mock_receive(self, mock_header: bytes, mock_payload: bytes): + yield mock_header + yield mock_payload + + @property + def is_connected(self): + if self._is_connected: + self._is_connected = False + return True + return False + + async def close(self): + return + + async def receive(self, buffer: object, offset: int, count: int) -> int: + resp_buffer = list(next(self._mock_gen)) + for index, val in enumerate(resp_buffer): + buffer[index] = val + return len(resp_buffer) + + +class MockStream(PayloadStream): + # pylint: disable=super-init-not-called + def __init__(self): + self.buffer = None + self._producer_length = 0 # total length + + def give_buffer(self, buffer: List[int]): + self.buffer = buffer + + +class TestBotFrameworkHttpClient(aiounittest.AsyncTestCase): + async def test_connect(self): + mock_header = b"S.000004.e35ed534-0808-4acf-af1e-24aa81d2b31d.1\n" + mock_payload = b"test" + + mock_receiver = MockTransportReceiver(mock_header, mock_payload) + mock_stream = MockStream() + + receive_action_called = False + + def mock_get_stream(header): # pylint: disable=unused-argument + return mock_stream + + def mock_receive_action(header, stream, offset): + nonlocal receive_action_called + assert header.type == "S" + assert len(stream.buffer) == offset + receive_action_called = True + + sut = PayloadReceiver() + sut.subscribe(mock_get_stream, mock_receive_action) + await sut.connect(mock_receiver) + + assert bytes(mock_stream.buffer) == mock_payload + assert receive_action_called