From 8e2bccf8a3050ad2e469e03298e8b190b0fd43ae Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 30 Oct 2025 18:38:32 -0700 Subject: [PATCH 1/8] streaming logic fix --- .../src/microsoft/teams/apps/http_stream.py | 22 +++-- .../src/microsoft/teams/apps/utils/timer.py | 2 +- packages/apps/tests/test_http_stream.py | 98 ++++++++++++++----- 3 files changed, 87 insertions(+), 35 deletions(-) diff --git a/packages/apps/src/microsoft/teams/apps/http_stream.py b/packages/apps/src/microsoft/teams/apps/http_stream.py index 55c00af1..699b133a 100644 --- a/packages/apps/src/microsoft/teams/apps/http_stream.py +++ b/packages/apps/src/microsoft/teams/apps/http_stream.py @@ -104,9 +104,6 @@ def emit(self, activity: Union[MessageActivityInput, TypingActivityInput, str]) Args: activity: The activity to emit. """ - if self._timeout is not None: - self._timeout.cancel() - self._timeout = None if isinstance(activity, str): activity = MessageActivityInput(text=activity, type="message") @@ -115,7 +112,9 @@ def emit(self, activity: Union[MessageActivityInput, TypingActivityInput, str]) # Clear the queue empty event since we just added an item self._queue_empty_event.clear() - self._timeout = Timeout(0.5, self._flush) + if not self._timeout: + loop = asyncio.get_running_loop() + loop.create_task(self._flush()) def update(self, text: str) -> None: """ @@ -171,10 +170,15 @@ async def _flush(self) -> None: Flush the current activity queue. """ # If there are no items in the queue, nothing to flush - async with self._lock: - if not self._queue: - return + if self._lock.locked(): + return + + await self._lock.acquire() + if not self._queue: + return + + try: if self._timeout is not None: self._timeout.cancel() self._timeout = None @@ -224,6 +228,10 @@ async def _flush(self) -> None: if self._queue and not self._timeout: self._timeout = Timeout(0.5, self._flush) + finally: + # Reset flushing flag so future emits can trigger another flush + self._lock.release() + async def _send_activity(self, to_send: TypingActivityInput): """ Send an activity to the Teams conversation with the ID. diff --git a/packages/apps/src/microsoft/teams/apps/utils/timer.py b/packages/apps/src/microsoft/teams/apps/utils/timer.py index 2aa27487..c8423478 100644 --- a/packages/apps/src/microsoft/teams/apps/utils/timer.py +++ b/packages/apps/src/microsoft/teams/apps/utils/timer.py @@ -23,7 +23,7 @@ def __init__(self, delay: float, callback: TimerCallback) -> None: self._handle: Optional[asyncio.TimerHandle] = None self._cancelled: bool = False - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() self._handle = loop.call_later(delay, self._run) def _run(self) -> None: diff --git a/packages/apps/tests/test_http_stream.py b/packages/apps/tests/test_http_stream.py index 1aa050fc..d3b18a36 100644 --- a/packages/apps/tests/test_http_stream.py +++ b/packages/apps/tests/test_http_stream.py @@ -5,7 +5,6 @@ # pyright: basic import asyncio -from datetime import datetime, timedelta from unittest.mock import MagicMock import pytest @@ -35,13 +34,12 @@ def mock_api_client(self): client.conversations = mock_conversations client.send_call_count = 0 - client.send_times = [] client.sent_activities = [] async def mock_send(activity): client.send_call_count += 1 - client.send_times.append(datetime.now()) client.sent_activities.append(activity) + await asyncio.sleep(0.05) # Simulate network delay return SentActivity(id=f"test-id-{client.send_call_count}", activity_params=activity) client.conversations.activities().create = mock_send @@ -63,37 +61,45 @@ def http_stream(self, mock_api_client, conversation_reference, mock_logger): return HttpStream(mock_api_client, conversation_reference, mock_logger) @pytest.mark.asyncio - async def test_stream_emit_message_flushes_after_500ms(self, mock_api_client, conversation_reference, mock_logger): - """Test that messages are flushed after 500ms timeout.""" + async def test_stream_emit_message_flushes_immediately(self, mock_api_client, conversation_reference, mock_logger): + """Test that messages are flushed immediately.""" stream = HttpStream(mock_api_client, conversation_reference, mock_logger) - start_time = datetime.now() - stream.emit("Test message") - await asyncio.sleep(0.6) # Wait longer than 500ms timeout - - assert mock_api_client.send_call_count > 0, "Should have sent at least one message" - assert any(t >= start_time + timedelta(milliseconds=450) for t in mock_api_client.send_times), ( - "Should have waited approximately 500ms before sending" - ) + await asyncio.sleep(0.07) # Wait for the flush task to complete + assert mock_api_client.send_call_count == 1 @pytest.mark.asyncio - async def test_stream_multiple_emits_restarts_timer(self, mock_api_client, conversation_reference, mock_logger): + async def test_stream_multiple_emits_timer_check(self, mock_api_client, conversation_reference, mock_logger): """Test that multiple emits reset the timer.""" stream = HttpStream(mock_api_client, conversation_reference, mock_logger) stream.emit("First message") - await asyncio.sleep(0.3) # Wait less than 500ms - - stream.emit("Second message") # This should reset the timer - await asyncio.sleep(0.3) # Still less than 500ms from second emit - assert mock_api_client.send_call_count == 0, "Should not have sent yet" - await asyncio.sleep(0.3) # Now over 500ms from second emit - assert mock_api_client.send_call_count > 0, "Should have sent messages after timer expired" + stream.emit("Second message") + stream.emit("Third message") + stream.emit("Fourth message") + stream.emit("Fifth message") + stream.emit("Sixth message") + stream.emit("Seventh message") + stream.emit("Eighth message") + stream.emit("Ninth message") + stream.emit("Tenth message") + stream.emit("Eleventh message") + stream.emit("Twelfth message") + + await asyncio.sleep(0.07) # Wait for the flush task to complete + assert mock_api_client.send_call_count == 1 # First message should trigger flush immediately + + stream.emit("Thirteenth message") + await asyncio.sleep(0.3) # Less than 500ms from first flush + assert mock_api_client.send_call_count == 1, "No new flush should have occurred yet" + + await asyncio.sleep(0.3) # Now exceed 500ms from last emit + assert mock_api_client.send_call_count == 2, "Second flush should have occurred" @pytest.mark.asyncio - async def test_stream_send_timeout_handled_gracefully(self, mock_api_client, conversation_reference, mock_logger): + async def test_stream_error_handled_gracefully(self, mock_api_client, conversation_reference, mock_logger): """Test that send timeouts are handled gracefully with retries.""" call_count = 0 @@ -104,6 +110,7 @@ async def mock_send_with_timeout(activity): raise TimeoutError("Operation timed out") # Succeed on second attempt + await asyncio.sleep(0.05) # Simulate delay return SentActivity(id=f"success-after-timeout-{call_count}", activity_params=activity) mock_api_client.conversations.activities().create = mock_send_with_timeout @@ -111,7 +118,7 @@ async def mock_send_with_timeout(activity): stream = HttpStream(mock_api_client, conversation_reference, mock_logger) stream.emit("Test message with timeout") - await asyncio.sleep(0.8) # Wait for flush and retries + await asyncio.sleep(0.6) # Wait for flush and 1 retry to complete result = await stream.close() @@ -148,7 +155,7 @@ async def test_stream_update_status_sends_typing_activity( stream = HttpStream(mock_api_client, conversation_reference, mock_logger) stream.update("Thinking...") - await asyncio.sleep(0.6) # Wait for the flush task to complete + await asyncio.sleep(0.07) # Wait for the flush task to complete assert stream.count > 0 or len(mock_api_client.sent_activities) > 0, "Should have processed the update" assert stream.sequence >= 2, "Should increment sequence after sending" @@ -167,10 +174,9 @@ async def test_stream_sequence_of_update_and_emit(self, mock_api_client, convers stream = HttpStream(mock_api_client, conversation_reference, mock_logger) stream.update("Preparing response...") - await asyncio.sleep(0.6) - stream.emit("Final response message") - await asyncio.sleep(0.6) + + await asyncio.sleep(0.5) # Wait for the flush task to complete assert len(mock_api_client.sent_activities) >= 2, "Should have sent typing activity and message" @@ -186,3 +192,41 @@ async def test_stream_sequence_of_update_and_emit(self, mock_api_client, convers # Sequence numbers should have increased assert stream.sequence >= 3, "Sequence should increment for both update and emit" + + @pytest.mark.asyncio + async def test_stream_concurrent_emits_do_not_flush_simultaneously( + self, mock_api_client, conversation_reference, mock_logger + ): + """ + Test that multiple concurrent emits do not allow simultaneous flush execution. + """ + concurrent_entries = 0 + max_concurrent_entries = 0 + lock = asyncio.Lock() + + async def mock_send(activity): + nonlocal concurrent_entries, max_concurrent_entries + async with lock: + concurrent_entries += 1 + max_concurrent_entries = max(max_concurrent_entries, concurrent_entries) + await asyncio.sleep(0.05) # simulate delay in sending + async with lock: + concurrent_entries -= 1 + return activity + + mock_api_client.conversations.activities().create = mock_send + + stream = HttpStream(mock_api_client, conversation_reference, mock_logger) + + # Schedule multiple emits concurrently + async def emit_task(): + stream.emit("Concurrent message") + + tasks = [asyncio.create_task(emit_task()) for _ in range(10)] + await asyncio.gather(*tasks) + + # Wait for flushes to complete + await asyncio.sleep(0.07) + + # Only one flush should have entered the critical section at a time + assert max_concurrent_entries == 1, f"Flush entered concurrently {max_concurrent_entries} times, expected 1" From 4361f138335887a9143728a92f38c57bcc4ad4f0 Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 30 Oct 2025 18:44:22 -0700 Subject: [PATCH 2/8] streaming logic fix --- packages/apps/src/microsoft/teams/apps/http_stream.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/apps/src/microsoft/teams/apps/http_stream.py b/packages/apps/src/microsoft/teams/apps/http_stream.py index 699b133a..cc89022d 100644 --- a/packages/apps/src/microsoft/teams/apps/http_stream.py +++ b/packages/apps/src/microsoft/teams/apps/http_stream.py @@ -175,10 +175,9 @@ async def _flush(self) -> None: await self._lock.acquire() - if not self._queue: - return - try: + if not self._queue: + return if self._timeout is not None: self._timeout.cancel() self._timeout = None From 3103518bbd8d108485649d6b334422014c21fbd2 Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 30 Oct 2025 19:19:30 -0700 Subject: [PATCH 3/8] prevent hanging if flush fails and stream closes --- .../src/microsoft/teams/apps/http_stream.py | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/packages/apps/src/microsoft/teams/apps/http_stream.py b/packages/apps/src/microsoft/teams/apps/http_stream.py index cc89022d..3c2fc524 100644 --- a/packages/apps/src/microsoft/teams/apps/http_stream.py +++ b/packages/apps/src/microsoft/teams/apps/http_stream.py @@ -29,13 +29,12 @@ class HttpStream(StreamerProtocol): HTTP-based streaming implementation for Microsoft Teams activities. Flow: - 1. emit() adds activities to a queue and cancels any pending flush timeout - 2. emit() schedules _flush() to run after 0.5 seconds via Timeout - 3. If another emit() happens before flush executes, the timeout is cancelled and rescheduled - 4. _flush() starts by cancelling any pending timeout, then processes up to 10 queued activities under a lock - 5. _flush() combines text from MessageActivity and sends it as a Typing activity with streamType='streaming' - 6. _flush() schedules another flush if more items remain in queue - 7. close() waits for queue to empty, then sends final message with stream_type='stream_final' + 1. emit() adds activities to a queue + 2. _flush() processes up to 10 queued items under a lock. + 3. Informative typing updates are sent immediately if no message started. + 4. Message text are combined into a typing chunk. + 5. Another flush is scheduled if more items remain. + 6. close() waits for queue to empty, then sends final message with stream_type='stream_final' The timeout cancellation ensures only one flush operation is scheduled at a time. The delays between flushes is to ensure we dont hit API rate limits with Microsoft Teams. @@ -125,6 +124,12 @@ def update(self, text: str) -> None: """ self.emit(TypingActivityInput().with_text(text).with_channel_data(ChannelData(stream_type="informative"))) + async def _wait_for_queue_empty(self): + """Helper to wait until queue is empty.""" + while self._queue: + self._logger.debug("waiting for queue to be empty...") + await self._queue_empty_event.wait() + async def close(self) -> Optional[SentActivity]: # wait for lock to be free if self._result is not None: @@ -138,11 +143,15 @@ async def close(self) -> Optional[SentActivity]: # Wait until _id is set and queue is empty if not self._id: self._logger.debug("waiting for ID to be set") - await self._id_set_event.wait() + try: + await asyncio.wait_for(self._id_set_event.wait(), timeout=30.0) + except asyncio.TimeoutError: + self._logger.warning("_id was never set, closing stream anyway") - while self._queue: - self._logger.debug("waiting for queue to be empty...") - await self._queue_empty_event.wait() + try: + await asyncio.wait_for(self._wait_for_queue_empty(), timeout=30.0) + except asyncio.TimeoutError: + self._logger.warning("Queue did not empty within 30 seconds; closing anyway") if self._text == "" and self._attachments == []: self._logger.warning("no text or attachments to send, cannot close stream") From 9d0ff3662e6cac422d87aa704b75edb12bff1227 Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 30 Oct 2025 19:36:24 -0700 Subject: [PATCH 4/8] simple loop, no event needed --- .../src/microsoft/teams/apps/http_stream.py | 41 +++++++------------ 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/packages/apps/src/microsoft/teams/apps/http_stream.py b/packages/apps/src/microsoft/teams/apps/http_stream.py index 3c2fc524..d68b9343 100644 --- a/packages/apps/src/microsoft/teams/apps/http_stream.py +++ b/packages/apps/src/microsoft/teams/apps/http_stream.py @@ -60,8 +60,6 @@ def __init__(self, client: ApiClient, ref: ConversationReference, logger: Option self._result: Optional[SentActivity] = None self._lock = asyncio.Lock() self._timeout: Optional[Timeout] = None - self._id_set_event = asyncio.Event() - self._queue_empty_event = asyncio.Event() self._reset_state() @@ -108,9 +106,6 @@ def emit(self, activity: Union[MessageActivityInput, TypingActivityInput, str]) activity = MessageActivityInput(text=activity, type="message") self._queue.append(activity) - # Clear the queue empty event since we just added an item - self._queue_empty_event.clear() - if not self._timeout: loop = asyncio.get_running_loop() loop.create_task(self._flush()) @@ -124,11 +119,19 @@ def update(self, text: str) -> None: """ self.emit(TypingActivityInput().with_text(text).with_channel_data(ChannelData(stream_type="informative"))) - async def _wait_for_queue_empty(self): - """Helper to wait until queue is empty.""" - while self._queue: - self._logger.debug("waiting for queue to be empty...") - await self._queue_empty_event.wait() + async def _wait_for_id_and_queue(self, timeout: float = 5.0, interval: float = 0.1): + """Wait until _id is set and the queue is empty, with a total timeout.""" + + async def _poll(): + while not self._id or self._queue: + self._logger.debug("waiting for _id to be set or queue to be empty") + await asyncio.sleep(interval) + + try: + await asyncio.wait_for(_poll(), timeout=timeout) + except asyncio.TimeoutError: + self._logger.warning("Timeout while waiting for _id to be set and queue to be empty, returning None") + return None async def close(self) -> Optional[SentActivity]: # wait for lock to be free @@ -141,17 +144,7 @@ async def close(self) -> Optional[SentActivity]: return None # Wait until _id is set and queue is empty - if not self._id: - self._logger.debug("waiting for ID to be set") - try: - await asyncio.wait_for(self._id_set_event.wait(), timeout=30.0) - except asyncio.TimeoutError: - self._logger.warning("_id was never set, closing stream anyway") - - try: - await asyncio.wait_for(self._wait_for_queue_empty(), timeout=30.0) - except asyncio.TimeoutError: - self._logger.warning("Queue did not empty within 30 seconds; closing anyway") + await self._wait_for_id_and_queue(timeout=30.0, interval=0.1) if self._text == "" and self._attachments == []: self._logger.warning("no text or attachments to send, cannot close stream") @@ -228,10 +221,6 @@ async def _flush(self) -> None: to_send = TypingActivityInput(text=self._text) await self._send_activity(to_send) - # Signal if queue is now empty - if not self._queue: - self._queue_empty_event.set() - # If more queued, schedule another flush if self._queue and not self._timeout: self._timeout = Timeout(0.5, self._flush) @@ -256,8 +245,6 @@ async def _send_activity(self, to_send: TypingActivityInput): self._index += 1 if self._id is None: self._id = res.id - # Signal that ID has been set - self._id_set_event.set() async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput]) -> SentActivity: """ From ba020f47d0065ea5da2abd7a37ae38dbd5b12da1 Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 30 Oct 2025 19:38:10 -0700 Subject: [PATCH 5/8] no args --- packages/apps/src/microsoft/teams/apps/http_stream.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/apps/src/microsoft/teams/apps/http_stream.py b/packages/apps/src/microsoft/teams/apps/http_stream.py index d68b9343..05c8a07b 100644 --- a/packages/apps/src/microsoft/teams/apps/http_stream.py +++ b/packages/apps/src/microsoft/teams/apps/http_stream.py @@ -119,9 +119,12 @@ def update(self, text: str) -> None: """ self.emit(TypingActivityInput().with_text(text).with_channel_data(ChannelData(stream_type="informative"))) - async def _wait_for_id_and_queue(self, timeout: float = 5.0, interval: float = 0.1): + async def _wait_for_id_and_queue(self): """Wait until _id is set and the queue is empty, with a total timeout.""" + timeout = 30.0 # total timeout + interval = 0.1 # polling interval + async def _poll(): while not self._id or self._queue: self._logger.debug("waiting for _id to be set or queue to be empty") @@ -144,7 +147,7 @@ async def close(self) -> Optional[SentActivity]: return None # Wait until _id is set and queue is empty - await self._wait_for_id_and_queue(timeout=30.0, interval=0.1) + await self._wait_for_id_and_queue() if self._text == "" and self._attachments == []: self._logger.warning("no text or attachments to send, cannot close stream") From 90779fb616cc1a17f6d9880b0689f3bc057a4e9b Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 30 Oct 2025 19:39:31 -0700 Subject: [PATCH 6/8] simple loop, no event needed --- packages/apps/src/microsoft/teams/apps/http_stream.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/apps/src/microsoft/teams/apps/http_stream.py b/packages/apps/src/microsoft/teams/apps/http_stream.py index 05c8a07b..97eb8363 100644 --- a/packages/apps/src/microsoft/teams/apps/http_stream.py +++ b/packages/apps/src/microsoft/teams/apps/http_stream.py @@ -60,6 +60,7 @@ def __init__(self, client: ApiClient, ref: ConversationReference, logger: Option self._result: Optional[SentActivity] = None self._lock = asyncio.Lock() self._timeout: Optional[Timeout] = None + self._total_wait_timeout: float = 30.0 self._reset_state() @@ -122,16 +123,13 @@ def update(self, text: str) -> None: async def _wait_for_id_and_queue(self): """Wait until _id is set and the queue is empty, with a total timeout.""" - timeout = 30.0 # total timeout - interval = 0.1 # polling interval - async def _poll(): while not self._id or self._queue: self._logger.debug("waiting for _id to be set or queue to be empty") - await asyncio.sleep(interval) + await asyncio.sleep(0.1) try: - await asyncio.wait_for(_poll(), timeout=timeout) + await asyncio.wait_for(_poll(), timeout=self._total_wait_timeout) except asyncio.TimeoutError: self._logger.warning("Timeout while waiting for _id to be set and queue to be empty, returning None") return None From 4895231b34b7515d07c3c553b09c338a2473845c Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 30 Oct 2025 19:41:02 -0700 Subject: [PATCH 7/8] logger warning --- packages/apps/src/microsoft/teams/apps/http_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/apps/src/microsoft/teams/apps/http_stream.py b/packages/apps/src/microsoft/teams/apps/http_stream.py index 97eb8363..57278aee 100644 --- a/packages/apps/src/microsoft/teams/apps/http_stream.py +++ b/packages/apps/src/microsoft/teams/apps/http_stream.py @@ -131,7 +131,7 @@ async def _poll(): try: await asyncio.wait_for(_poll(), timeout=self._total_wait_timeout) except asyncio.TimeoutError: - self._logger.warning("Timeout while waiting for _id to be set and queue to be empty, returning None") + self._logger.warning("Timeout while waiting for _id to be set and queue to be empty, cannot close stream") return None async def close(self) -> Optional[SentActivity]: From 12ae29e649c58a179e15aa2b9eb36b800ea4984a Mon Sep 17 00:00:00 2001 From: Mehak Bindra Date: Thu, 30 Oct 2025 19:49:40 -0700 Subject: [PATCH 8/8] return if timeout --- packages/apps/src/microsoft/teams/apps/http_stream.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/apps/src/microsoft/teams/apps/http_stream.py b/packages/apps/src/microsoft/teams/apps/http_stream.py index 57278aee..08fa5658 100644 --- a/packages/apps/src/microsoft/teams/apps/http_stream.py +++ b/packages/apps/src/microsoft/teams/apps/http_stream.py @@ -130,9 +130,9 @@ async def _poll(): try: await asyncio.wait_for(_poll(), timeout=self._total_wait_timeout) + return True except asyncio.TimeoutError: - self._logger.warning("Timeout while waiting for _id to be set and queue to be empty, cannot close stream") - return None + return False async def close(self) -> Optional[SentActivity]: # wait for lock to be free @@ -145,7 +145,10 @@ async def close(self) -> Optional[SentActivity]: return None # Wait until _id is set and queue is empty - await self._wait_for_id_and_queue() + result = await self._wait_for_id_and_queue() + if not result: + self._logger.warning("Timeout while waiting for _id to be set and queue to be empty, cannot close stream") + return None if self._text == "" and self._attachments == []: self._logger.warning("no text or attachments to send, cannot close stream")