Skip to content

Commit 3a87cb0

Browse files
authored
fix: Logging fix for python 3.13 (#1745)
* t Logging fix for python 3.13 * Flake8 fixes * Added unit tests * Reverting uv change * Flake8 fixes * Minor fix
1 parent 8374548 commit 3a87cb0

File tree

2 files changed

+463
-20
lines changed

2 files changed

+463
-20
lines changed

workers/proxy_worker/dispatcher.py

Lines changed: 92 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@
4040
# Library worker import reloaded in init and reload request
4141
_library_worker = None
4242

43+
# Thread-local invocation ID registry for efficient lookup
44+
_thread_invocation_registry: typing.Dict[int, str] = {}
45+
_registry_lock = threading.Lock()
46+
47+
# Global current invocation tracker (as a fallback)
48+
_current_invocation_id: Optional[str] = None
49+
_current_invocation_lock = threading.Lock()
50+
4351

4452
class ContextEnabledTask(asyncio.Task):
4553
AZURE_INVOCATION_ID = '__azure_function_invocation_id__'
@@ -61,16 +69,63 @@ def set_azure_invocation_id(self, invocation_id: str) -> None:
6169
_invocation_id_local = threading.local()
6270

6371

72+
def set_thread_invocation_id(thread_id: int, invocation_id: str) -> None:
73+
"""Set the invocation ID for a specific thread"""
74+
with _registry_lock:
75+
_thread_invocation_registry[thread_id] = invocation_id
76+
77+
78+
def clear_thread_invocation_id(thread_id: int) -> None:
79+
"""Clear the invocation ID for a specific thread"""
80+
with _registry_lock:
81+
_thread_invocation_registry.pop(thread_id, None)
82+
83+
84+
def get_thread_invocation_id(thread_id: int) -> Optional[str]:
85+
"""Get the invocation ID for a specific thread"""
86+
with _registry_lock:
87+
return _thread_invocation_registry.get(thread_id)
88+
89+
90+
def set_current_invocation_id(invocation_id: str) -> None:
91+
"""Set the global current invocation ID"""
92+
global _current_invocation_id
93+
with _current_invocation_lock:
94+
_current_invocation_id = invocation_id
95+
96+
97+
def get_global_current_invocation_id() -> Optional[str]:
98+
"""Get the global current invocation ID"""
99+
with _current_invocation_lock:
100+
return _current_invocation_id
101+
102+
64103
def get_current_invocation_id() -> Optional[Any]:
65-
loop = asyncio._get_running_loop()
66-
if loop is not None:
67-
current_task = asyncio.current_task(loop)
68-
if current_task is not None:
69-
task_invocation_id = getattr(current_task,
70-
ContextEnabledTask.AZURE_INVOCATION_ID,
71-
None)
72-
if task_invocation_id is not None:
73-
return task_invocation_id
104+
# Check global current invocation first (most up-to-date)
105+
global_invocation_id = get_global_current_invocation_id()
106+
if global_invocation_id is not None:
107+
return global_invocation_id
108+
109+
# Check asyncio task context
110+
try:
111+
loop = asyncio._get_running_loop()
112+
if loop is not None:
113+
current_task = asyncio.current_task(loop)
114+
if current_task is not None:
115+
task_invocation_id = getattr(current_task,
116+
ContextEnabledTask.AZURE_INVOCATION_ID,
117+
None)
118+
if task_invocation_id is not None:
119+
return task_invocation_id
120+
except RuntimeError:
121+
# No event loop running
122+
pass
123+
124+
# Check the thread-local invocation ID registry
125+
current_thread_id = threading.get_ident()
126+
thread_invocation_id = get_thread_invocation_id(current_thread_id)
127+
if thread_invocation_id is not None:
128+
return thread_invocation_id
74129

75130
return getattr(_invocation_id_local, 'invocation_id', None)
76131

@@ -516,14 +571,32 @@ async def _handle__invocation_request(self, request):
516571
'invocation_id: %s, worker_id: %s',
517572
self.request_id, function_id, invocation_id, self.worker_id)
518573

519-
invocation_request = WorkerRequest(name="FunctionInvocationRequest",
520-
request=request,
521-
properties={
522-
"threadpool": self._sync_call_tp})
523-
invocation_response = await (
524-
_library_worker.invocation_request( # type: ignore[union-attr]
525-
invocation_request))
574+
# Set the global current invocation ID first (for all threads to access)
575+
set_current_invocation_id(invocation_id)
526576

527-
return protos.StreamingMessage(
528-
request_id=self.request_id,
529-
invocation_response=invocation_response)
577+
# Set the current `invocation_id` to the current task so
578+
# that our logging handler can find it.
579+
current_task = asyncio.current_task()
580+
if current_task is not None and isinstance(current_task, ContextEnabledTask):
581+
current_task.set_azure_invocation_id(invocation_id)
582+
583+
# Register the invocation ID for the current thread
584+
current_thread_id = threading.get_ident()
585+
set_thread_invocation_id(current_thread_id, invocation_id)
586+
587+
try:
588+
invocation_request = WorkerRequest(name="FunctionInvocationRequest",
589+
request=request,
590+
properties={
591+
"threadpool": self._sync_call_tp})
592+
invocation_response = await (
593+
_library_worker.invocation_request( # type: ignore[union-attr]
594+
invocation_request))
595+
596+
return protos.StreamingMessage(
597+
request_id=self.request_id,
598+
invocation_response=invocation_response)
599+
except Exception:
600+
# Clear thread registry on exception to prevent stale IDs
601+
clear_thread_invocation_id(current_thread_id)
602+
raise

0 commit comments

Comments
 (0)