From 98845f4a32cd6f66d356d3f2964fe645ea4bf9a4 Mon Sep 17 00:00:00 2001 From: CLFutureX Date: Fri, 7 Nov 2025 11:38:09 +0800 Subject: [PATCH 1/9] optimize asyncExecutor Signed-off-by: CLFutureX --- .../openhands/sdk/utils/async_executor.py | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index cf6911f4d2..9704eaacc1 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -4,8 +4,12 @@ import inspect import threading from collections.abc import Callable +import time from typing import Any +from openhands.sdk.logger import get_logger + +logger = get_logger(__name__) class AsyncExecutor: """ @@ -21,12 +25,22 @@ def __init__(self): self._loop: asyncio.AbstractEventLoop | None = None self._thread: threading.Thread | None = None self._lock = threading.Lock() + self._shutdown = threading.Event() def _ensure_loop(self) -> asyncio.AbstractEventLoop: """Ensure the background event loop is running.""" + if self._loop is not None and self._loop.is_running(): + return self._loop with self._lock: + if self._shutdown.is_set(): + raise RuntimeError("asyncExecutor has been shut down") + if self._loop is not None: - return self._loop + if self._loop.is_running(): + return self._loop + logger.warning("The loop is not empty, but it is not in a running state." \ + " Under normal circumstances, this should not happen.") + self._loop.close() loop = asyncio.new_event_loop() @@ -39,7 +53,7 @@ def _runner(): # Wait for loop to start while not loop.is_running(): - pass + time.sleep(0.01) self._loop = loop self._thread = t @@ -47,10 +61,14 @@ def _runner(): def _shutdown_loop(self) -> None: """Shutdown the background event loop.""" + if self._shutdown.is_set(): + logger.info("AsyncExecutor has been shutdown") + return with self._lock: loop, t = self._loop, self._thread self._loop = None self._thread = None + self._shutdown.set() if loop and loop.is_running(): try: @@ -59,6 +77,8 @@ def _shutdown_loop(self) -> None: pass if t and t.is_alive(): t.join(timeout=1.0) + if t.is_alive(): + logger.warning("AsyncExecutor thread did not terminate gracefully") def run_async( self, @@ -83,6 +103,8 @@ def run_async( TypeError: If awaitable_or_fn is not a coroutine or async function asyncio.TimeoutError: If the operation times out """ + if self._shutdown.is_set(): + raise RuntimeError("AsyncExecutor has been shut down") if inspect.iscoroutine(awaitable_or_fn): coro = awaitable_or_fn elif inspect.iscoroutinefunction(awaitable_or_fn): From 9575713d976641f83ae1712c8088a564bae00147 Mon Sep 17 00:00:00 2001 From: CLFutureX Date: Sat, 8 Nov 2025 21:06:45 +0800 Subject: [PATCH 2/9] update Signed-off-by: CLFutureX --- .../openhands/sdk/utils/async_executor.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index 9704eaacc1..6257c415fa 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -27,17 +27,16 @@ def __init__(self): self._lock = threading.Lock() self._shutdown = threading.Event() - def _ensure_loop(self) -> asyncio.AbstractEventLoop: + def _safe_execute_on_loop(self, callback: Callable[[asyncio.AbstractEventLoop], Any]) -> Any: """Ensure the background event loop is running.""" - if self._loop is not None and self._loop.is_running(): - return self._loop with self._lock: if self._shutdown.is_set(): raise RuntimeError("asyncExecutor has been shut down") if self._loop is not None: if self._loop.is_running(): - return self._loop + return callback(self._loop) + logger.warning("The loop is not empty, but it is not in a running state." \ " Under normal circumstances, this should not happen.") self._loop.close() @@ -57,7 +56,7 @@ def _runner(): self._loop = loop self._thread = t - return loop + return callback(self._loop) def _shutdown_loop(self) -> None: """Shutdown the background event loop.""" @@ -111,9 +110,10 @@ def run_async( coro = awaitable_or_fn(*args, **kwargs) else: raise TypeError("run_async expects a coroutine or async function") + def submit_task(loop: asyncio.AbstractEventLoop) -> Any: + return asyncio.run_coroutine_threadsafe(coro, loop) - loop = self._ensure_loop() - fut = asyncio.run_coroutine_threadsafe(coro, loop) + fut = self._safe_execute_on_loop(submit_task) return fut.result(timeout) def close(self): From 1bf460b41f52e2d23bfb7e59fcf883b80e94946d Mon Sep 17 00:00:00 2001 From: CLFutureX Date: Sat, 8 Nov 2025 21:14:19 +0800 Subject: [PATCH 3/9] update Signed-off-by: CLFutureX --- openhands-sdk/openhands/sdk/utils/async_executor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index 6257c415fa..743d54feb1 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -3,8 +3,8 @@ import asyncio import inspect import threading -from collections.abc import Callable import time +from collections.abc import Callable from typing import Any from openhands.sdk.logger import get_logger @@ -31,14 +31,14 @@ def _safe_execute_on_loop(self, callback: Callable[[asyncio.AbstractEventLoop], """Ensure the background event loop is running.""" with self._lock: if self._shutdown.is_set(): - raise RuntimeError("asyncExecutor has been shut down") + raise RuntimeError("AsyncExecutor has been shut down") if self._loop is not None: if self._loop.is_running(): return callback(self._loop) - logger.warning("The loop is not empty, but it is not in a running state." \ - " Under normal circumstances, this should not happen.") + logger.warning("The loop is not empty, but it is not in a running state. " + "Under normal circumstances, this should not happen.") self._loop.close() loop = asyncio.new_event_loop() From 7f944ca127012194d2acbccfdb6959af967631fe Mon Sep 17 00:00:00 2001 From: CLFutureX Date: Sat, 8 Nov 2025 21:24:54 +0800 Subject: [PATCH 4/9] update Signed-off-by: CLFutureX --- openhands-sdk/openhands/sdk/utils/async_executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index 743d54feb1..bc3614ee9b 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -1,6 +1,7 @@ """Reusable async-to-sync execution utility.""" import asyncio +import concurrent.futures import inspect import threading import time @@ -110,7 +111,7 @@ def run_async( coro = awaitable_or_fn(*args, **kwargs) else: raise TypeError("run_async expects a coroutine or async function") - def submit_task(loop: asyncio.AbstractEventLoop) -> Any: + def submit_task(loop: asyncio.AbstractEventLoop) -> concurrent.futures.Future[Any]: return asyncio.run_coroutine_threadsafe(coro, loop) fut = self._safe_execute_on_loop(submit_task) From 27ab7a99c1a4e2a072aff7a0c228de6caa14c52e Mon Sep 17 00:00:00 2001 From: CLFutureX Date: Mon, 10 Nov 2025 10:13:12 +0800 Subject: [PATCH 5/9] update Signed-off-by: CLFutureX --- openhands-sdk/openhands/sdk/utils/async_executor.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index bc3614ee9b..aa9b5dc205 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -40,7 +40,10 @@ def _safe_execute_on_loop(self, callback: Callable[[asyncio.AbstractEventLoop], logger.warning("The loop is not empty, but it is not in a running state. " "Under normal circumstances, this should not happen.") - self._loop.close() + try: + self._loop.close() + except RuntimeError as e: + logger.warning(f"Failed to close inactive loop: {e}") loop = asyncio.new_event_loop() @@ -80,6 +83,12 @@ def _shutdown_loop(self) -> None: if t.is_alive(): logger.warning("AsyncExecutor thread did not terminate gracefully") + if loop and not loop.is_closed(): + try: + loop.close() + except RuntimeError as e: + logger.warning(f"Failed to close event loop: {e}") + def run_async( self, awaitable_or_fn: Callable[..., Any] | Any, From 7840df6e995656cd41fa5592c7c4eb0c2f975d1f Mon Sep 17 00:00:00 2001 From: CLFutureX Date: Mon, 10 Nov 2025 11:46:38 +0800 Subject: [PATCH 6/9] update Signed-off-by: CLFutureX --- openhands-sdk/openhands/sdk/utils/async_executor.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index aa9b5dc205..653de2f07a 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -67,11 +67,13 @@ def _shutdown_loop(self) -> None: if self._shutdown.is_set(): logger.info("AsyncExecutor has been shutdown") return + + self._shutdown.set() + with self._lock: loop, t = self._loop, self._thread self._loop = None self._thread = None - self._shutdown.set() if loop and loop.is_running(): try: @@ -124,7 +126,14 @@ def submit_task(loop: asyncio.AbstractEventLoop) -> concurrent.futures.Future[An return asyncio.run_coroutine_threadsafe(coro, loop) fut = self._safe_execute_on_loop(submit_task) - return fut.result(timeout) + + try: + return fut.result(timeout) + except asyncio.TimeoutError: + fut.cancel() + raise + except concurrent.futures.CancelledError: + raise def close(self): """Close the async executor and cleanup resources.""" From 992e2a872ba10d395e895e80a6216187bc814bf3 Mon Sep 17 00:00:00 2001 From: CLFutureX Date: Mon, 10 Nov 2025 13:55:13 +0800 Subject: [PATCH 7/9] update Signed-off-by: CLFutureX --- .../openhands/sdk/utils/async_executor.py | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index 653de2f07a..0dc3ae69df 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -28,7 +28,9 @@ def __init__(self): self._lock = threading.Lock() self._shutdown = threading.Event() - def _safe_execute_on_loop(self, callback: Callable[[asyncio.AbstractEventLoop], Any]) -> Any: + def _safe_execute_on_loop( + self, callback: Callable[[asyncio.AbstractEventLoop], Any] + ) -> Any: """Ensure the background event loop is running.""" with self._lock: if self._shutdown.is_set(): @@ -38,8 +40,10 @@ def _safe_execute_on_loop(self, callback: Callable[[asyncio.AbstractEventLoop], if self._loop.is_running(): return callback(self._loop) - logger.warning("The loop is not empty, but it is not in a running state. " - "Under normal circumstances, this should not happen.") + logger.warning( + "The loop is not empty, but it is not in a running state. " + "Under normal circumstances, this should not happen." + ) try: self._loop.close() except RuntimeError as e: @@ -68,9 +72,10 @@ def _shutdown_loop(self) -> None: logger.info("AsyncExecutor has been shutdown") return - self._shutdown.set() - with self._lock: + if self._shutdown.is_set(): + return + self._shutdown.set() loop, t = self._loop, self._thread self._loop = None self._thread = None @@ -87,6 +92,12 @@ def _shutdown_loop(self) -> None: if loop and not loop.is_closed(): try: + if loop.is_running(): + tasks = asyncio.all_tasks(loop) + for task in tasks: + if not task.done(): + task.cancel() + loop.close() except RuntimeError as e: logger.warning(f"Failed to close event loop: {e}") @@ -122,7 +133,9 @@ def run_async( coro = awaitable_or_fn(*args, **kwargs) else: raise TypeError("run_async expects a coroutine or async function") - def submit_task(loop: asyncio.AbstractEventLoop) -> concurrent.futures.Future[Any]: + def submit_task( + loop: asyncio.AbstractEventLoop + ) -> concurrent.futures.Future[Any]: return asyncio.run_coroutine_threadsafe(coro, loop) fut = self._safe_execute_on_loop(submit_task) From aa37af218a418494bd6d6f86a1588faf1f9fe862 Mon Sep 17 00:00:00 2001 From: CLFutureX Date: Mon, 10 Nov 2025 14:34:32 +0800 Subject: [PATCH 8/9] update Signed-off-by: CLFutureX --- .../openhands/sdk/utils/async_executor.py | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index 0dc3ae69df..2e25e81300 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -5,8 +5,8 @@ import inspect import threading import time +from typing import Any, Coroutine from collections.abc import Callable -from typing import Any from openhands.sdk.logger import get_logger @@ -28,9 +28,9 @@ def __init__(self): self._lock = threading.Lock() self._shutdown = threading.Event() - def _safe_execute_on_loop( - self, callback: Callable[[asyncio.AbstractEventLoop], Any] - ) -> Any: + def _safe_submit_on_loop( + self, coro: Coroutine + ) -> concurrent.futures.Future: """Ensure the background event loop is running.""" with self._lock: if self._shutdown.is_set(): @@ -38,7 +38,7 @@ def _safe_execute_on_loop( if self._loop is not None: if self._loop.is_running(): - return callback(self._loop) + return asyncio.run_coroutine_threadsafe(coro, self._loop) logger.warning( "The loop is not empty, but it is not in a running state. " @@ -64,12 +64,12 @@ def _runner(): self._loop = loop self._thread = t - return callback(self._loop) + return asyncio.run_coroutine_threadsafe(coro, self._loop) def _shutdown_loop(self) -> None: """Shutdown the background event loop.""" if self._shutdown.is_set(): - logger.info("AsyncExecutor has been shutdown") + logger.info("AsyncExecutor has been shut down") return with self._lock: @@ -133,12 +133,8 @@ def run_async( coro = awaitable_or_fn(*args, **kwargs) else: raise TypeError("run_async expects a coroutine or async function") - def submit_task( - loop: asyncio.AbstractEventLoop - ) -> concurrent.futures.Future[Any]: - return asyncio.run_coroutine_threadsafe(coro, loop) - fut = self._safe_execute_on_loop(submit_task) + fut = self._safe_submit_on_loop(coro) try: return fut.result(timeout) From 783d90a0699727665139723758448a249e158209 Mon Sep 17 00:00:00 2001 From: openhands Date: Tue, 11 Nov 2025 05:19:56 +0000 Subject: [PATCH 9/9] style(async_executor): apply Ruff auto-fixes (format + lint) Co-authored-by: openhands --- .../openhands/sdk/utils/async_executor.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index 2e25e81300..bde1e4e293 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -5,13 +5,15 @@ import inspect import threading import time -from typing import Any, Coroutine -from collections.abc import Callable +from collections.abc import Callable, Coroutine +from typing import Any from openhands.sdk.logger import get_logger + logger = get_logger(__name__) + class AsyncExecutor: """ Manages a background event loop for executing async code from sync contexts. @@ -28,9 +30,7 @@ def __init__(self): self._lock = threading.Lock() self._shutdown = threading.Event() - def _safe_submit_on_loop( - self, coro: Coroutine - ) -> concurrent.futures.Future: + def _safe_submit_on_loop(self, coro: Coroutine) -> concurrent.futures.Future: """Ensure the background event loop is running.""" with self._lock: if self._shutdown.is_set(): @@ -42,7 +42,7 @@ def _safe_submit_on_loop( logger.warning( "The loop is not empty, but it is not in a running state. " - "Under normal circumstances, this should not happen." + "Under normal circumstances, this should not happen." ) try: self._loop.close() @@ -138,7 +138,7 @@ def run_async( try: return fut.result(timeout) - except asyncio.TimeoutError: + except TimeoutError: fut.cancel() raise except concurrent.futures.CancelledError: