Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 38 additions & 31 deletions packages/apps/src/microsoft/teams/apps/http_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ class HttpStream(StreamerProtocol):
HTTP-based streaming implementation for Microsoft Teams activities.

Flow:
1. emit() adds activities to a queue and cancels any pending flush timeout
2. emit() schedules _flush() to run after 0.5 seconds via Timeout
3. If another emit() happens before flush executes, the timeout is cancelled and rescheduled
4. _flush() starts by cancelling any pending timeout, then processes up to 10 queued activities under a lock
5. _flush() combines text from MessageActivity and sends it as a Typing activity with streamType='streaming'
6. _flush() schedules another flush if more items remain in queue
7. close() waits for queue to empty, then sends final message with stream_type='stream_final'
1. emit() adds activities to a queue
2. _flush() processes up to 10 queued items under a lock.
3. Informative typing updates are sent immediately if no message started.
4. Message text are combined into a typing chunk.
5. Another flush is scheduled if more items remain.
6. close() waits for queue to empty, then sends final message with stream_type='stream_final'

The timeout cancellation ensures only one flush operation is scheduled at a time.
The delays between flushes is to ensure we dont hit API rate limits with Microsoft Teams.
Expand All @@ -61,8 +60,7 @@ def __init__(self, client: ApiClient, ref: ConversationReference, logger: Option
self._result: Optional[SentActivity] = None
self._lock = asyncio.Lock()
self._timeout: Optional[Timeout] = None
self._id_set_event = asyncio.Event()
self._queue_empty_event = asyncio.Event()
self._total_wait_timeout: float = 30.0

self._reset_state()

Expand Down Expand Up @@ -104,18 +102,14 @@ def emit(self, activity: Union[MessageActivityInput, TypingActivityInput, str])
Args:
activity: The activity to emit.
"""
if self._timeout is not None:
self._timeout.cancel()
self._timeout = None

if isinstance(activity, str):
activity = MessageActivityInput(text=activity, type="message")
self._queue.append(activity)

# Clear the queue empty event since we just added an item
self._queue_empty_event.clear()

self._timeout = Timeout(0.5, self._flush)
if not self._timeout:
loop = asyncio.get_running_loop()
loop.create_task(self._flush())

def update(self, text: str) -> None:
"""
Expand All @@ -126,6 +120,20 @@ def update(self, text: str) -> None:
"""
self.emit(TypingActivityInput().with_text(text).with_channel_data(ChannelData(stream_type="informative")))

async def _wait_for_id_and_queue(self):
"""Wait until _id is set and the queue is empty, with a total timeout."""

async def _poll():
while not self._id or self._queue:
self._logger.debug("waiting for _id to be set or queue to be empty")
await asyncio.sleep(0.1)

try:
await asyncio.wait_for(_poll(), timeout=self._total_wait_timeout)
return True
except asyncio.TimeoutError:
return False

async def close(self) -> Optional[SentActivity]:
# wait for lock to be free
if self._result is not None:
Expand All @@ -137,13 +145,10 @@ async def close(self) -> Optional[SentActivity]:
return None

# Wait until _id is set and queue is empty
if not self._id:
self._logger.debug("waiting for ID to be set")
await self._id_set_event.wait()

while self._queue:
self._logger.debug("waiting for queue to be empty...")
await self._queue_empty_event.wait()
result = await self._wait_for_id_and_queue()
if not result:
self._logger.warning("Timeout while waiting for _id to be set and queue to be empty, cannot close stream")
return None

if self._text == "" and self._attachments == []:
self._logger.warning("no text or attachments to send, cannot close stream")
Expand Down Expand Up @@ -171,10 +176,14 @@ async def _flush(self) -> None:
Flush the current activity queue.
"""
# If there are no items in the queue, nothing to flush
async with self._lock:
if self._lock.locked():
return

await self._lock.acquire()

try:
if not self._queue:
return

if self._timeout is not None:
self._timeout.cancel()
self._timeout = None
Expand Down Expand Up @@ -216,14 +225,14 @@ async def _flush(self) -> None:
to_send = TypingActivityInput(text=self._text)
await self._send_activity(to_send)

# Signal if queue is now empty
if not self._queue:
self._queue_empty_event.set()

# If more queued, schedule another flush
if self._queue and not self._timeout:
self._timeout = Timeout(0.5, self._flush)

finally:
# Reset flushing flag so future emits can trigger another flush
self._lock.release()

async def _send_activity(self, to_send: TypingActivityInput):
"""
Send an activity to the Teams conversation with the ID.
Expand All @@ -240,8 +249,6 @@ async def _send_activity(self, to_send: TypingActivityInput):
self._index += 1
if self._id is None:
self._id = res.id
# Signal that ID has been set
self._id_set_event.set()

async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput]) -> SentActivity:
"""
Expand Down
2 changes: 1 addition & 1 deletion packages/apps/src/microsoft/teams/apps/utils/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self, delay: float, callback: TimerCallback) -> None:
self._handle: Optional[asyncio.TimerHandle] = None
self._cancelled: bool = False

loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
self._handle = loop.call_later(delay, self._run)

def _run(self) -> None:
Expand Down
98 changes: 71 additions & 27 deletions packages/apps/tests/test_http_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# pyright: basic

import asyncio
from datetime import datetime, timedelta
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -35,13 +34,12 @@ def mock_api_client(self):
client.conversations = mock_conversations

client.send_call_count = 0
client.send_times = []
client.sent_activities = []

async def mock_send(activity):
client.send_call_count += 1
client.send_times.append(datetime.now())
client.sent_activities.append(activity)
await asyncio.sleep(0.05) # Simulate network delay
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary? (it'll slow our tests down that's why i'm asking)

return SentActivity(id=f"test-id-{client.send_call_count}", activity_params=activity)

client.conversations.activities().create = mock_send
Expand All @@ -63,37 +61,45 @@ def http_stream(self, mock_api_client, conversation_reference, mock_logger):
return HttpStream(mock_api_client, conversation_reference, mock_logger)

@pytest.mark.asyncio
async def test_stream_emit_message_flushes_after_500ms(self, mock_api_client, conversation_reference, mock_logger):
"""Test that messages are flushed after 500ms timeout."""
async def test_stream_emit_message_flushes_immediately(self, mock_api_client, conversation_reference, mock_logger):
"""Test that messages are flushed immediately."""

stream = HttpStream(mock_api_client, conversation_reference, mock_logger)
start_time = datetime.now()

stream.emit("Test message")
await asyncio.sleep(0.6) # Wait longer than 500ms timeout

assert mock_api_client.send_call_count > 0, "Should have sent at least one message"
assert any(t >= start_time + timedelta(milliseconds=450) for t in mock_api_client.send_times), (
"Should have waited approximately 500ms before sending"
)
await asyncio.sleep(0.07) # Wait for the flush task to complete
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the significance of this number? Is there a way we can do this with minimal waiting?

assert mock_api_client.send_call_count == 1

@pytest.mark.asyncio
async def test_stream_multiple_emits_restarts_timer(self, mock_api_client, conversation_reference, mock_logger):
async def test_stream_multiple_emits_timer_check(self, mock_api_client, conversation_reference, mock_logger):
"""Test that multiple emits reset the timer."""

stream = HttpStream(mock_api_client, conversation_reference, mock_logger)

stream.emit("First message")
await asyncio.sleep(0.3) # Wait less than 500ms

stream.emit("Second message") # This should reset the timer
await asyncio.sleep(0.3) # Still less than 500ms from second emit
assert mock_api_client.send_call_count == 0, "Should not have sent yet"
await asyncio.sleep(0.3) # Now over 500ms from second emit
assert mock_api_client.send_call_count > 0, "Should have sent messages after timer expired"
stream.emit("Second message")
stream.emit("Third message")
stream.emit("Fourth message")
stream.emit("Fifth message")
stream.emit("Sixth message")
stream.emit("Seventh message")
stream.emit("Eighth message")
stream.emit("Ninth message")
stream.emit("Tenth message")
stream.emit("Eleventh message")
stream.emit("Twelfth message")

await asyncio.sleep(0.07) # Wait for the flush task to complete
Copy link
Collaborator

@heyitsaamir heyitsaamir Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding all these magic numbers, can we simply mock call_later?

Basically you want to simulate how call_later will behave right? so mock that function, and have it trigger on some signal.

emit
emit
signal() <- this is your test to simulate the passing of some time. Not real time, but simulated time.
verify behavior

Essentially, you know time is going to progress. You're not testing the behavior of call_later. So this is a reasonable way to test stuff like this.

assert mock_api_client.send_call_count == 1 # First message should trigger flush immediately

stream.emit("Thirteenth message")
await asyncio.sleep(0.3) # Less than 500ms from first flush
assert mock_api_client.send_call_count == 1, "No new flush should have occurred yet"

await asyncio.sleep(0.3) # Now exceed 500ms from last emit
assert mock_api_client.send_call_count == 2, "Second flush should have occurred"

@pytest.mark.asyncio
async def test_stream_send_timeout_handled_gracefully(self, mock_api_client, conversation_reference, mock_logger):
async def test_stream_error_handled_gracefully(self, mock_api_client, conversation_reference, mock_logger):
"""Test that send timeouts are handled gracefully with retries."""
call_count = 0

Expand All @@ -104,14 +110,15 @@ async def mock_send_with_timeout(activity):
raise TimeoutError("Operation timed out")

# Succeed on second attempt
await asyncio.sleep(0.05) # Simulate delay
return SentActivity(id=f"success-after-timeout-{call_count}", activity_params=activity)

mock_api_client.conversations.activities().create = mock_send_with_timeout

stream = HttpStream(mock_api_client, conversation_reference, mock_logger)

stream.emit("Test message with timeout")
await asyncio.sleep(0.8) # Wait for flush and retries
await asyncio.sleep(0.6) # Wait for flush and 1 retry to complete

result = await stream.close()

Expand Down Expand Up @@ -148,7 +155,7 @@ async def test_stream_update_status_sends_typing_activity(
stream = HttpStream(mock_api_client, conversation_reference, mock_logger)

stream.update("Thinking...")
await asyncio.sleep(0.6) # Wait for the flush task to complete
await asyncio.sleep(0.07) # Wait for the flush task to complete

assert stream.count > 0 or len(mock_api_client.sent_activities) > 0, "Should have processed the update"
assert stream.sequence >= 2, "Should increment sequence after sending"
Expand All @@ -167,10 +174,9 @@ async def test_stream_sequence_of_update_and_emit(self, mock_api_client, convers
stream = HttpStream(mock_api_client, conversation_reference, mock_logger)

stream.update("Preparing response...")
await asyncio.sleep(0.6)

stream.emit("Final response message")
await asyncio.sleep(0.6)

await asyncio.sleep(0.5) # Wait for the flush task to complete

assert len(mock_api_client.sent_activities) >= 2, "Should have sent typing activity and message"

Expand All @@ -186,3 +192,41 @@ async def test_stream_sequence_of_update_and_emit(self, mock_api_client, convers

# Sequence numbers should have increased
assert stream.sequence >= 3, "Sequence should increment for both update and emit"

@pytest.mark.asyncio
async def test_stream_concurrent_emits_do_not_flush_simultaneously(
self, mock_api_client, conversation_reference, mock_logger
):
"""
Test that multiple concurrent emits do not allow simultaneous flush execution.
"""
concurrent_entries = 0
max_concurrent_entries = 0
lock = asyncio.Lock()

async def mock_send(activity):
nonlocal concurrent_entries, max_concurrent_entries
async with lock:
concurrent_entries += 1
max_concurrent_entries = max(max_concurrent_entries, concurrent_entries)
await asyncio.sleep(0.05) # simulate delay in sending
async with lock:
concurrent_entries -= 1
return activity

mock_api_client.conversations.activities().create = mock_send

stream = HttpStream(mock_api_client, conversation_reference, mock_logger)

# Schedule multiple emits concurrently
async def emit_task():
stream.emit("Concurrent message")

tasks = [asyncio.create_task(emit_task()) for _ in range(10)]
await asyncio.gather(*tasks)

# Wait for flushes to complete
await asyncio.sleep(0.07)

# Only one flush should have entered the critical section at a time
assert max_concurrent_entries == 1, f"Flush entered concurrently {max_concurrent_entries} times, expected 1"