-
Notifications
You must be signed in to change notification settings - Fork 6
[Fix] streaming: do not reset timeout for each emit, do not wait forever on close stream #197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the HTTP streaming flush mechanism to trigger immediately on the first emit rather than after a 500ms delay. The changes improve responsiveness while still maintaining rate-limiting through subsequent 500ms delays between flushes.
Key changes:
- Modified
HttpStream.emit()to trigger immediate flush on first call instead of using a timer - Updated concurrency control in
_flush()using manual lock acquire/release with early returns - Changed timer initialization from
get_event_loop()toget_running_loop()for better async safety - Added optional server parameter to
HttpPluginconstructor for dependency injection
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/apps/src/microsoft/teams/apps/http_stream.py | Refactored emit and flush logic to trigger immediate flush, replaced async context manager with manual lock handling |
| packages/apps/tests/test_http_stream.py | Updated tests to reflect immediate flush behavior, added concurrency test, adjusted timing expectations |
| packages/apps/src/microsoft/teams/apps/utils/timer.py | Changed from get_event_loop() to get_running_loop() for proper async context |
| packages/apps/src/microsoft/teams/apps/http_plugin.py | Added optional server parameter for dependency injection, updated server initialization logic |
| tests/echo/src/main.py | Added uvicorn server configuration and HttpPlugin with manual server setup (test code) |
Comments suppressed due to low confidence (2)
packages/apps/src/microsoft/teams/apps/http_stream.py:212
- Lock is acquired but not released when i == 0. The early return on line 212 bypasses the
finallyblock at line 231, causing a deadlock on subsequent flush attempts. Move this check before acquiring the lock or useself._lock.release()before returning.
if i == 0:
self._logger.debug("No activities to flush")
return
packages/apps/src/microsoft/teams/apps/http_stream.py:34
- Documentation is outdated. The new implementation triggers an immediate flush on the first emit() call (not after 0.5 seconds), and subsequent emits do not cancel/reschedule timeouts. Update to reflect the immediate flush behavior and that the timeout is only used for subsequent flushes when queue has remaining items.
1. emit() adds activities to a queue and cancels any pending flush timeout
2. emit() schedules _flush() to run after 0.5 seconds via Timeout
3. If another emit() happens before flush executes, the timeout is cancelled and rescheduled
70c86a3 to
8e2bccf
Compare
| client.send_call_count += 1 | ||
| client.send_times.append(datetime.now()) | ||
| client.sent_activities.append(activity) | ||
| await asyncio.sleep(0.05) # Simulate network delay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this necessary? (it'll slow our tests down that's why i'm asking)
| assert any(t >= start_time + timedelta(milliseconds=450) for t in mock_api_client.send_times), ( | ||
| "Should have waited approximately 500ms before sending" | ||
| ) | ||
| await asyncio.sleep(0.07) # Wait for the flush task to complete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the significance of this number? Is there a way we can do this with minimal waiting?
| stream.emit("Eleventh message") | ||
| stream.emit("Twelfth message") | ||
|
|
||
| await asyncio.sleep(0.07) # Wait for the flush task to complete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding all these magic numbers, can we simply mock call_later?
Basically you want to simulate how call_later will behave right? so mock that function, and have it trigger on some signal.
emit
emit
signal() <- this is your test to simulate the passing of some time. Not real time, but simulated time.
verify behavior
Essentially, you know time is going to progress. You're not testing the behavior of call_later. So this is a reasonable way to test stuff like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should try replacing the sleep statements in the tests with stubbing the http client and making it throw 429 at different times, ensuring that the streamer can recover from http rate limits, then also some other error codes to make sure it behaves as expected.
Right now the sleep statements will affect the overall test suite performance but also, this doesn't accurately simulate network rate limits
Issue : microsoft/teams.ts#374
Main change::
_wait_for_id_and_queuewith the logic that if flush fails, on stream close, do not wait forever for id to be set.streaming_bug_py.mp4